diff --git a/examples/data-channels-create/main.go b/examples/data-channels-create/main.go index 36d64e7b..88161a4c 100644 --- a/examples/data-channels-create/main.go +++ b/examples/data-channels-create/main.go @@ -4,6 +4,7 @@ import ( "bufio" "encoding/base64" "fmt" + "io" "math/rand" "os" "time" @@ -34,22 +35,25 @@ func main() { // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { - fmt.Printf("Connection State has changed %s \n", connectionState.String()) + fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) + } - // TODO: find the correct place for this - if connectionState == ice.ConnectionStateConnected { - time.AfterFunc(3*time.Second, func() { - fmt.Println("sending openchannel") - err := dataChannel.SendOpenChannelMessage() - if err != nil { - fmt.Println("faild to send openchannel", err) - } - }) + dataChannel.Lock() + + // Register channel opening handling + dataChannel.OnOpen = func() { + fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", dataChannel.Label, dataChannel.ID) + for { + time.Sleep(5 * time.Second) + message := randSeq(15) + fmt.Printf("Sending %s \n", message) + + err := dataChannel.Send(datachannel.PayloadString{Data: []byte(message)}) + check(err) } } // Register the Onmessage to handle incoming messages - dataChannel.Lock() dataChannel.Onmessage = func(payload datachannel.Payload) { switch p := payload.(type) { case *datachannel.PayloadString: @@ -60,6 +64,7 @@ func main() { fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), dataChannel.Label) } } + dataChannel.Unlock() // Create an offer to send to the browser @@ -69,7 +74,7 @@ func main() { // Output the offer in base64 so we can paste it in browser fmt.Println(base64.StdEncoding.EncodeToString([]byte(offer.Sdp))) - // Wait for the offer to be pasted + // Wait for the answer to be pasted sd := mustReadStdin() // Set the remote SessionDescription @@ -82,16 +87,8 @@ func main() { err = peerConnection.SetRemoteDescription(answer) check(err) - // Send messages every 5 seconds - fmt.Println("Random messages will now be sent to any connected DataChannels every 5 seconds") - for { - time.Sleep(5 * time.Second) - message := randSeq(15) - fmt.Printf("Sending %s \n", message) - - err := dataChannel.Send(datachannel.PayloadString{Data: []byte(message)}) - check(err) - } + // Block forever + select {} } // randSeq is used to generate a random message @@ -109,12 +106,15 @@ func randSeq(n int) string { func mustReadStdin() string { reader := bufio.NewReader(os.Stdin) rawSd, err := reader.ReadString('\n') - check(err) + if err != io.EOF { + check(err) + } fmt.Println("") - sd, err := base64.StdEncoding.DecodeString(rawSd) + sd, err := base64.StdEncoding.DecodeString(rawSd) check(err) + return string(sd) } diff --git a/examples/data-channels/main.go b/examples/data-channels/main.go index e64dbc5c..375d9b62 100644 --- a/examples/data-channels/main.go +++ b/examples/data-channels/main.go @@ -7,7 +7,6 @@ import ( "io" "math/rand" "os" - "sync" "time" "github.com/pions/webrtc" @@ -15,61 +14,52 @@ import ( "github.com/pions/webrtc/pkg/ice" ) -func randSeq(n int) string { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - b := make([]rune, n) - for i := range b { - b[i] = letters[r.Intn(len(letters))] - } - return string(b) -} - func main() { - reader := bufio.NewReader(os.Stdin) - rawSd, err := reader.ReadString('\n') - if err != nil && err != io.EOF { - panic(err) - } - - fmt.Println("") - sd, err := base64.StdEncoding.DecodeString(rawSd) - if err != nil { - panic(err) - } + // Wait for the offer to be pasted + sd := mustReadStdin() /* Everything below is the pion-WebRTC API, thanks for using it! */ - // Create a new RTCPeerConnection - peerConnection, err := webrtc.New(webrtc.RTCConfiguration{ + // Prepare the configuration + config := webrtc.RTCConfiguration{ IceServers: []webrtc.RTCIceServer{ { URLs: []string{"stun:stun.l.google.com:19302"}, }, }, - }) - if err != nil { - panic(err) } + // Create a new RTCPeerConnection + peerConnection, err := webrtc.New(config) + check(err) + // Set the handler for ICE connection state // This will notify you when the peer has connected/disconnected peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { - fmt.Printf("Connection State has changed %s \n", connectionState.String()) + fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) } - datachannels := make([]*webrtc.RTCDataChannel, 0) - var dataChannelsLock sync.RWMutex - + // Register data channel creation handling peerConnection.OnDataChannel = func(d *webrtc.RTCDataChannel) { - dataChannelsLock.Lock() - datachannels = append(datachannels, d) - dataChannelsLock.Unlock() - fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID) d.Lock() defer d.Unlock() + + // Register channel opening handling + d.OnOpen = func() { + fmt.Printf("Data channel '%s'-'%d' open. Random messages will now be sent to any connected DataChannels every 5 seconds\n", d.Label, d.ID) + for { + time.Sleep(5 * time.Second) + message := randSeq(15) + fmt.Printf("Sending %s \n", message) + + err := d.Send(datachannel.PayloadString{Data: []byte(message)}) + check(err) + } + } + + // Register message handling d.Onmessage = func(payload datachannel.Payload) { switch p := payload.(type) { case *datachannel.PayloadString: @@ -87,31 +77,50 @@ func main() { Type: webrtc.RTCSdpTypeOffer, Sdp: string(sd), } - if err := peerConnection.SetRemoteDescription(offer); err != nil { - panic(err) - } + + err = peerConnection.SetRemoteDescription(offer) + check(err) // Sets the LocalDescription, and starts our UDP listeners answer, err := peerConnection.CreateAnswer(nil) - if err != nil { - panic(err) - } + check(err) // Get the LocalDescription and take it to base64 so we can paste in browser fmt.Println(base64.StdEncoding.EncodeToString([]byte(answer.Sdp))) - fmt.Println("Random messages will now be sent to any connected DataChannels every 5 seconds") - for { - time.Sleep(5 * time.Second) - message := randSeq(15) - fmt.Printf("Sending %s \n", message) - dataChannelsLock.RLock() - for _, d := range datachannels { - err := d.Send(datachannel.PayloadString{Data: []byte(message)}) - if err != nil { - panic(err) - } - } - dataChannelsLock.RUnlock() + // Block forever + select {} +} + +func randSeq(n int) string { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + b := make([]rune, n) + for i := range b { + b[i] = letters[r.Intn(len(letters))] + } + return string(b) +} + +// mustReadStdin blocks untill input is received from stdin +func mustReadStdin() string { + reader := bufio.NewReader(os.Stdin) + rawSd, err := reader.ReadString('\n') + if err != io.EOF { + check(err) + } + + fmt.Println("") + + sd, err := base64.StdEncoding.DecodeString(rawSd) + check(err) + + return string(sd) +} + +// check is used to panic in an error occurs. +func check(err error) { + if err != nil { + panic(err) } } diff --git a/examples/pion-to-pion/answer/main.go b/examples/pion-to-pion/answer/main.go index e604b5e7..b1bb2a48 100644 --- a/examples/pion-to-pion/answer/main.go +++ b/examples/pion-to-pion/answer/main.go @@ -45,13 +45,6 @@ func buildPeerConnection() *webrtc.RTCPeerConnection { peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { fmt.Printf("Connection State has changed %s \n", connectionState.String()) - if connectionState == ice.ConnectionStateConnected { - fmt.Println("sending openchannel") - err := d.SendOpenChannelMessage() - if err != nil { - fmt.Println("faild to send openchannel", err) - } - } } return peerConnection diff --git a/internal/network/manager.go b/internal/network/manager.go index deff7926..89fe89ca 100644 --- a/internal/network/manager.go +++ b/internal/network/manager.go @@ -59,7 +59,7 @@ func NewManager(btg BufferTransportGenerator, dcet DataChannelEventHandler, ntf return nil, err } - m.sctpAssociation = sctp.NewAssocation(m.dataChannelOutboundHandler, m.dataChannelInboundHandler, nil) + m.sctpAssociation = sctp.NewAssocation(m.dataChannelOutboundHandler, m.dataChannelInboundHandler, m.handleSCTPState) m.IceAgent = ice.NewAgent(m.iceOutboundHandler, m.iceNotifier) for _, i := range localInterfaces() { @@ -87,6 +87,13 @@ func (m *Manager) handleDTLSState(state dtls.ConnectionState) { } } +func (m *Manager) handleSCTPState(state sctp.AssociationState) { + if state == sctp.Established { + // Temporary way to signal sending OpenChannel messages + m.dataChannelEventHandler(&DataChannelOpen{}) + } +} + // AddURL takes an ICE Url, allocates any state and adds the candidate func (m *Manager) AddURL(url *ice.URL) error { diff --git a/internal/network/network.go b/internal/network/network.go index abddf15b..970d074a 100644 --- a/internal/network/network.go +++ b/internal/network/network.go @@ -51,3 +51,11 @@ type DataChannelMessage struct { func (d *DataChannelMessage) StreamIdentifier() uint16 { return d.streamIdentifier } + +// DataChannelOpen is emitted when all channels should be opened +type DataChannelOpen struct{} + +// StreamIdentifier returns the streamIdentifier +func (d *DataChannelOpen) StreamIdentifier() uint16 { + return 0 +} diff --git a/rtcdatachannel.go b/rtcdatachannel.go index a757c1d4..dc806883 100644 --- a/rtcdatachannel.go +++ b/rtcdatachannel.go @@ -99,6 +99,10 @@ type RTCDataChannel struct { // arrival over the sctp transport from a remote peer. OnMessage func(datachannel.Payload) + // OnOpen designates an event handler which is invoked when + // the underlying data transport has been established (or re-established). + OnOpen func() + // Deprecated: Will be removed when networkManager is deprecated. rtcPeerConnection *RTCPeerConnection } @@ -122,8 +126,7 @@ type RTCDataChannel struct { // return &rtcerr.OperationError{Err: ErrMaxDataChannelID} // } -// SendOpenChannelMessage is a test to send OpenChannel manually -func (d *RTCDataChannel) SendOpenChannelMessage() error { +func (d *RTCDataChannel) sendOpenChannelMessage() error { if err := d.rtcPeerConnection.networkManager.SendOpenChannelMessage(*d.ID, d.Label); err != nil { return &rtcerr.UnknownError{Err: err} } @@ -138,3 +141,12 @@ func (d *RTCDataChannel) Send(p datachannel.Payload) error { } return nil } + +func (d *RTCDataChannel) doOnOpen() { + d.RLock() + onOpen := d.OnOpen + d.RUnlock() + if onOpen != nil { + onOpen() + } +} diff --git a/rtcpeerconnection.go b/rtcpeerconnection.go index 897676a1..160041c6 100644 --- a/rtcpeerconnection.go +++ b/rtcpeerconnection.go @@ -809,10 +809,15 @@ func (pc *RTCPeerConnection) dataChannelEventHandler(e network.DataChannelEvent) switch event := e.(type) { case *network.DataChannelCreated: id := event.StreamIdentifier() - newDataChannel := &RTCDataChannel{ID: &id, Label: event.Label, rtcPeerConnection: pc} + newDataChannel := &RTCDataChannel{ID: &id, Label: event.Label, rtcPeerConnection: pc, ReadyState: RTCDataChannelStateOpen} pc.dataChannels[e.StreamIdentifier()] = newDataChannel if pc.OnDataChannel != nil { - go pc.OnDataChannel(newDataChannel) + go func() { + pc.OnDataChannel(newDataChannel) // This should actually be called when processing the SDP answer. + if newDataChannel.OnOpen != nil { + go newDataChannel.doOnOpen() + } + }() } else { fmt.Println("OnDataChannel is unset, discarding message") } @@ -830,6 +835,20 @@ func (pc *RTCPeerConnection) dataChannelEventHandler(e network.DataChannelEvent) fmt.Printf("No datachannel found for streamIdentifier %d \n", e.StreamIdentifier()) } + case *network.DataChannelOpen: + for _, dc := range pc.dataChannels { + dc.Lock() + err := dc.sendOpenChannelMessage() + if err != nil { + fmt.Println("failed to send openchannel", err) + dc.Unlock() + continue + } + dc.ReadyState = RTCDataChannelStateOpen + dc.Unlock() + + go dc.doOnOpen() // TODO: move to ChannelAck handling + } default: fmt.Printf("Unhandled DataChannelEvent %v \n", event) }