From aa22a92b18a91e7f3a360ac88b038991cc09d25e Mon Sep 17 00:00:00 2001 From: backkem Date: Mon, 13 Aug 2018 23:15:09 +0200 Subject: [PATCH] datachannel: handle data before final ICE pair selection, fix agent deadlock --- examples/data-channels-create/main.go | 20 +++++-- internal/network/manager.go | 26 +++++++-- pkg/ice/agent.go | 81 ++++++++++++++++++--------- rtcdatachannel.go | 15 ++++- 4 files changed, 101 insertions(+), 41 deletions(-) diff --git a/examples/data-channels-create/main.go b/examples/data-channels-create/main.go index fcb609ce..a6f954df 100644 --- a/examples/data-channels-create/main.go +++ b/examples/data-channels-create/main.go @@ -37,17 +37,25 @@ func main() { panic(err) } - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { - fmt.Printf("Connection State has changed %s \n", connectionState.String()) - } - d, err := peerConnection.CreateDataChannel("data", nil) if err != nil { panic(err) } + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peerConnection.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { + // TODO: find the correct place of this + fmt.Printf("Connection State has changed %s \n", connectionState.String()) + if connectionState == ice.ConnectionStateConnected { + fmt.Println("sending openchannel") + err := d.SendOpenChannelMessage() + if err != nil { + fmt.Println("faild to send openchannel", err) + } + } + } + fmt.Printf("New DataChannel %s %d\n", d.Label, d.ID) d.Lock() diff --git a/internal/network/manager.go b/internal/network/manager.go index d81d0c88..515baf70 100644 --- a/internal/network/manager.go +++ b/internal/network/manager.go @@ -245,18 +245,32 @@ func (m *Manager) dataChannelInboundHandler(data []byte, streamIdentifier uint16 } func (m *Manager) dataChannelOutboundHandler(raw []byte) { - m.portsLock.Lock() - defer m.portsLock.Unlock() - local, remote := m.IceAgent.SelectedPair() - if local == nil || remote == nil { + if remote == nil || local == nil { + // Send data on any valid pair + fmt.Println("dataChannelOutboundHandler: no valid candidates, dropping packet") return } + + m.portsLock.Lock() + defer m.portsLock.Unlock() + p, err := m.GetPort(local) + if err != nil { + fmt.Println("dataChannelOutboundHandler: no valid port for candidate, dropping packet") + return + + } + p.sendSCTP(raw, remote) +} + +// GetPort looks up a local port by address +func (m *Manager) GetPort(local *stun.TransportAddr) (*port, error) { for _, p := range m.ports { if p.listeningAddr.Equal(local) { - p.sendSCTP(raw, remote) + return p, nil } } + return nil, errors.New("port not found") } func (m *Manager) iceOutboundHandler(raw []byte, local *stun.TransportAddr, remote *net.UDPAddr) { @@ -289,4 +303,4 @@ func (m *Manager) SendOpenChannelMessage(streamIdentifier uint16, label string) return fmt.Errorf("Error sending ChannelOpen %v", err) } return nil -} \ No newline at end of file +} diff --git a/pkg/ice/agent.go b/pkg/ice/agent.go index 882fb567..f9cc15e2 100644 --- a/pkg/ice/agent.go +++ b/pkg/ice/agent.go @@ -15,6 +15,31 @@ import ( // OutboundCallback is the user defined Callback that is called when ICE traffic needs to sent type OutboundCallback func(raw []byte, local *stun.TransportAddr, remote *net.UDPAddr) +func NewCandidatePair(local, remote Candidate) CandidatePair { + return CandidatePair{ + remote: remote, + local: local, + } +} + +// CandidatePair represents a combination of a local and remote candidate +type CandidatePair struct { + // lastUpdateTime ? + remote Candidate + local Candidate +} + +// GetAddrs returns network addresses for the candidate pair +func (c CandidatePair) GetAddrs() (local *stun.TransportAddr, remote *net.UDPAddr) { + return &stun.TransportAddr{ + IP: net.ParseIP(c.local.GetBase().Address), + Port: c.local.GetBase().Port, + }, &net.UDPAddr{ + IP: net.ParseIP(c.remote.GetBase().Address), + Port: c.remote.GetBase().Port, + } +} + // Agent represents the ICE agent type Agent struct { sync.RWMutex @@ -38,11 +63,8 @@ type Agent struct { remotePwd string remoteCandidates []Candidate - selectedPair struct { - // lastUpdateTime ? - remote Candidate - local Candidate - } + selectedPair CandidatePair + validPairs []CandidatePair } const ( @@ -113,13 +135,25 @@ func (a *Agent) pingCandidate(local, remote Candidate) { func (a *Agent) updateConnectionState(newState ConnectionState) { a.connectionState = newState - a.iceNotifier(a.connectionState) + // Call handler async since we may be holding the agent lock + // and the handler may also require it + go a.iceNotifier(a.connectionState) } -func (a *Agent) setSelectedPair(local, remote Candidate) { - a.selectedPair.remote = remote - a.selectedPair.local = local - a.updateConnectionState(ConnectionStateConnected) +func (a *Agent) setValidPair(local, remote Candidate, selected bool) { + p := NewCandidatePair(local, remote) + + if selected { + a.selectedPair = p + a.validPairs = nil + // TODO: only set state to connected on selecting final pair? + a.updateConnectionState(ConnectionStateConnected) + } else { + // keep track of pairs with succesfull bindings since any of them + // can be used for communication untill the final pair is selected: + // https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-20#section-12 + a.validPairs = append(a.validPairs, p) + } } func (a *Agent) agentTaskLoop() { @@ -244,9 +278,9 @@ func (a *Agent) handleInboundControlled(m *stun.Message, local *stun.TransportAd return } - if _, useCandidateFound := m.GetOneAttribute(stun.AttrUseCandidate); useCandidateFound { - a.setSelectedPair(localCandidate, remoteCandidate) - } + _, useCandidateFound := m.GetOneAttribute(stun.AttrUseCandidate) + a.setValidPair(localCandidate, remoteCandidate, useCandidateFound) + a.sendBindingSuccess(m, local, remote) } @@ -259,12 +293,10 @@ func (a *Agent) handleInboundControlling(m *stun.Message, local *stun.TransportA return } - if m.Class == stun.ClassSuccessResponse && m.Method == stun.MethodBinding { - //Binding success! - if a.selectedPair.remote == nil && a.selectedPair.local == nil { - a.setSelectedPair(localCandidate, remoteCandidate) - } - } else { + final := m.Class == stun.ClassSuccessResponse && m.Method == stun.MethodBinding + a.setValidPair(localCandidate, remoteCandidate, final) + + if !final { a.sendBindingSuccess(m, local, remote) } } @@ -309,14 +341,11 @@ func (a *Agent) SelectedPair() (local *stun.TransportAddr, remote *net.UDPAddr) defer a.RUnlock() if a.selectedPair.remote == nil || a.selectedPair.local == nil { + for _, p := range a.validPairs { + return p.GetAddrs() + } return nil, nil } - return &stun.TransportAddr{ - IP: net.ParseIP(a.selectedPair.local.GetBase().Address), - Port: a.selectedPair.local.GetBase().Port, - }, &net.UDPAddr{ - IP: net.ParseIP(a.selectedPair.remote.GetBase().Address), - Port: a.selectedPair.remote.GetBase().Port, - } + return a.selectedPair.GetAddrs() } diff --git a/rtcdatachannel.go b/rtcdatachannel.go index 40df1d5a..889745ba 100644 --- a/rtcdatachannel.go +++ b/rtcdatachannel.go @@ -115,7 +115,7 @@ func (r *RTCPeerConnection) CreateDataChannel(label string, options *RTCDataChan r.dataChannels[id] = res // Send opening message - r.networkManager.SendOpenChannelMessage(id, label) + // r.networkManager.SendOpenChannelMessage(id, label) return res, nil } @@ -135,9 +135,18 @@ func (r *RTCPeerConnection) generateDataChannelID(client bool) (uint16, error) { return 0, &OperationError{Err: ErrMaxDataChannels} } +// SendOpenChannelMessage is a test to send OpenChannel manually +func (d *RTCDataChannel) SendOpenChannelMessage() error { + if err := d.rtcPeerConnection.networkManager.SendOpenChannelMessage(d.ID, d.Label); err != nil { + return &UnknownError{Err: err} + } + return nil + +} + // Send sends the passed message to the DataChannel peer -func (r *RTCDataChannel) Send(p datachannel.Payload) error { - if err := r.rtcPeerConnection.networkManager.SendDataChannelMessage(p, r.ID); err != nil { +func (d *RTCDataChannel) Send(p datachannel.Payload) error { + if err := d.rtcPeerConnection.networkManager.SendDataChannelMessage(p, d.ID); err != nil { return &UnknownError{Err: err} } return nil