diff --git a/internal/network/manager.go b/internal/network/manager.go index 48b2e5b9..a25b54e6 100644 --- a/internal/network/manager.go +++ b/internal/network/manager.go @@ -65,9 +65,9 @@ func NewManager(bufferTransportGenerator BufferTransportGenerator, dataChannelEv m.IceAgent = ice.NewAgent(m.iceOutboundHandler) for _, i := range localInterfaces() { - p, err := newPort(i+":0", m) - if err != nil { - return nil, err + p, portErr := newPort(i+":0", m) + if portErr != nil { + return nil, portErr } m.ports = append(m.ports, p) diff --git a/internal/network/port-receive.go b/internal/network/port-receive.go index fa67d77d..e25513ad 100644 --- a/internal/network/port-receive.go +++ b/internal/network/port-receive.go @@ -135,33 +135,31 @@ func (p *port) networkLoop() { }() for { - select { - case in, socketOpen := <-incomingPackets: - if !socketOpen { - // incomingPackets channel has closed, this port is finished processing - dtls.RemoveListener(p.listeningAddr.String()) - return - } - - if len(in.buffer) == 0 { - fmt.Println("Inbound buffer is not long enough to demux") - continue - } - - // https://tools.ietf.org/html/rfc5764#page-14 - if 127 < in.buffer[0] && in.buffer[0] < 192 { - p.handleSRTP(in.buffer) - } else if 19 < in.buffer[0] && in.buffer[0] < 64 { - p.handleDTLS(in.buffer, in.srcAddr.String()) - } else if in.buffer[0] < 2 { - p.m.IceAgent.HandleInbound(in.buffer, p.listeningAddr, in.srcAddr) - } - - p.m.certPairLock.RLock() - if p.m.isOffer == false && p.m.certPair == nil { - p.m.dtlsState.DoHandshake(p.listeningAddr.String(), in.srcAddr.String()) - } - p.m.certPairLock.RUnlock() + in, socketOpen := <-incomingPackets + if !socketOpen { + // incomingPackets channel has closed, this port is finished processing + dtls.RemoveListener(p.listeningAddr.String()) + return } + + if len(in.buffer) == 0 { + fmt.Println("Inbound buffer is not long enough to demux") + continue + } + + // https://tools.ietf.org/html/rfc5764#page-14 + if 127 < in.buffer[0] && in.buffer[0] < 192 { + p.handleSRTP(in.buffer) + } else if 19 < in.buffer[0] && in.buffer[0] < 64 { + p.handleDTLS(in.buffer, in.srcAddr.String()) + } else if in.buffer[0] < 2 { + p.m.IceAgent.HandleInbound(in.buffer, p.listeningAddr, in.srcAddr) + } + + p.m.certPairLock.RLock() + if !p.m.isOffer && p.m.certPair == nil { + p.m.dtlsState.DoHandshake(p.listeningAddr.String(), in.srcAddr.String()) + } + p.m.certPairLock.RUnlock() } } diff --git a/internal/network/port-send.go b/internal/network/port-send.go index 75a59f71..5e5016f1 100644 --- a/internal/network/port-send.go +++ b/internal/network/port-send.go @@ -8,7 +8,7 @@ import ( "github.com/pions/webrtc/pkg/rtp" ) -func (p *port) sendRTP(packet *rtp.Packet, dst *net.UDPAddr) { +func (p *port) sendRTP(packet *rtp.Packet, dst net.Addr) { p.m.certPairLock.RLock() defer p.m.certPairLock.RUnlock() if p.m.certPair == nil { @@ -44,13 +44,13 @@ func (p *port) sendRTP(packet *rtp.Packet, dst *net.UDPAddr) { } } -func (p *port) sendICE(buf []byte, dst *net.UDPAddr) { +func (p *port) sendICE(buf []byte, dst net.Addr) { if _, err := p.conn.WriteTo(buf, nil, dst); err != nil { fmt.Printf("Failed to send packet: %s \n", err.Error()) } } -func (p *port) sendSCTP(buf []byte, dst *net.UDPAddr) { +func (p *port) sendSCTP(buf []byte, dst fmt.Stringer) { _, err := p.m.dtlsState.Send(buf, p.listeningAddr.String(), dst.String()) if err != nil { fmt.Println(err) diff --git a/internal/sdp/ice.go b/internal/sdp/ice.go index df6286d7..9611e1bc 100644 --- a/internal/sdp/ice.go +++ b/internal/sdp/ice.go @@ -8,8 +8,8 @@ import ( "github.com/pions/webrtc/pkg/ice" ) -// ICECandidateBuild takes a candidate strings and returns a ice.Candidate or nil if it fails to parse -func ICECandidateBuild(raw string) ice.Candidate { +// ICECandidateUnmarshal takes a candidate strings and returns a ice.Candidate or nil if it fails to parse +func ICECandidateUnmarshal(raw string) ice.Candidate { split := strings.Fields(raw) if len(split) < 8 { fmt.Printf("Attribute not long enough to be ICE candidate (%d) %s \n", len(split), raw) @@ -49,3 +49,29 @@ func ICECandidateBuild(raw string) ice.Candidate { return nil } } + +func iceSrflxCandidateString(c *ice.CandidateSrflx, component int) string { + return fmt.Sprintf("udpcandidate %d udp %d %s %d typ srflx raddr %s rport %d generation 0", + component, c.CandidateBase.Priority(ice.SrflxCandidatePreference, uint16(component)), c.CandidateBase.Address, c.CandidateBase.Port, c.RemoteAddress, c.RemotePort) +} + +func iceHostCandidateString(c *ice.CandidateHost, component int) string { + return fmt.Sprintf("udpcandidate %d udp %d %s %d typ host generation 0", + component, c.CandidateBase.Priority(ice.HostCandidatePreference, uint16(component)), c.CandidateBase.Address, c.CandidateBase.Port) +} + +// ICECandidateMarshal takes a candidate and returns a string representation +func ICECandidateMarshal(c ice.Candidate) []string { + out := make([]string, 0) + + switch c := c.(type) { + case *ice.CandidateSrflx: + out = append(out, iceSrflxCandidateString(c, 0)) + out = append(out, iceSrflxCandidateString(c, 1)) + case *ice.CandidateHost: + out = append(out, iceHostCandidateString(c, 0)) + out = append(out, iceHostCandidateString(c, 1)) + } + + return out +} diff --git a/pkg/ice/agent.go b/pkg/ice/agent.go index aeb027b3..577c13df 100644 --- a/pkg/ice/agent.go +++ b/pkg/ice/agent.go @@ -30,7 +30,7 @@ type Agent struct { LocalUfrag string LocalPwd string - localCandidates []Candidate + LocalCandidates []Candidate remoteUfrag string remotePwd string @@ -50,16 +50,15 @@ const ( // NewAgent creates a new Agent func NewAgent(outboundCallback OutboundCallback) *Agent { - a := &Agent{ + return &Agent{ tieBreaker: rand.Uint64(), outboundCallback: outboundCallback, + gatheringState: GatheringStateComplete, // TODO trickle-ice + connectionState: ConnectionStateNew, LocalUfrag: util.RandSeq(16), LocalPwd: util.RandSeq(32), } - if a.isControlling { - } - return a } // Start starts the agent @@ -89,7 +88,7 @@ func (a *Agent) pingCandidate(local, remote Candidate) { &stun.Username{Username: a.remoteUfrag + ":" + a.LocalUfrag}, &stun.UseCandidate{}, &stun.IceControlling{TieBreaker: a.tieBreaker}, - &stun.Priority{Priority: uint32(local.GetBase().priority(hostCandidatePreference, 1))}, + &stun.Priority{Priority: uint32(local.GetBase().Priority(HostCandidatePreference, 1))}, &stun.MessageIntegrity{ Key: []byte(a.remotePwd), }, @@ -111,19 +110,20 @@ func (a *Agent) pingCandidate(local, remote Candidate) { func (a *Agent) agentControllingTaskLoop() { // TODO this should be dynamic, and grow when the connection is stable t := time.NewTicker(agentTickerBaseInterval) + a.connectionState = ConnectionStateChecking for { select { case <-t.C: a.Lock() if a.selectedPair.remote == nil || a.selectedPair.local == nil { - for _, localCandidate := range a.localCandidates { + for _, localCandidate := range a.LocalCandidates { for _, remoteCandidate := range a.remoteCandidates { a.pingCandidate(localCandidate, remoteCandidate) } } } else { - if time.Now().Sub(a.selectedPair.remote.GetBase().LastSeen) > stunTimeout { + if time.Since(a.selectedPair.remote.GetBase().LastSeen) > stunTimeout { a.selectedPair.remote = nil a.selectedPair.local = nil } else { @@ -149,7 +149,7 @@ func (a *Agent) AddRemoteCandidate(c Candidate) { func (a *Agent) AddLocalCandidate(c Candidate) { a.Lock() defer a.Unlock() - a.localCandidates = append(a.localCandidates, c) + a.LocalCandidates = append(a.LocalCandidates, c) } // Close cleans up the Agent @@ -157,19 +157,6 @@ func (a *Agent) Close() { close(a.taskLoopChan) } -// LocalCandidates generates the string representation of the -// local candidates that can be used in the SDP -func (a *Agent) LocalCandidates() (rtrn []string) { - a.Lock() - defer a.Unlock() - - for _, c := range a.localCandidates { - rtrn = append(rtrn, c.String(1)) - rtrn = append(rtrn, c.String(2)) - } - return rtrn -} - func getTransportAddrCandidate(candidates []Candidate, addr *stun.TransportAddr) Candidate { for _, c := range candidates { if c.GetBase().Address == addr.IP.String() && c.GetBase().Port == addr.Port { @@ -209,7 +196,7 @@ func (a *Agent) sendBindingSuccess(m *stun.Message, local *stun.TransportAddr, r } func (a *Agent) handleInboundControlled(m *stun.Message, local *stun.TransportAddr, remote *net.UDPAddr, localCandidate, remoteCandidate Candidate) { - if _, isControlled := m.GetOneAttribute(stun.AttrIceControlled); isControlled && a.isControlling == false { + if _, isControlled := m.GetOneAttribute(stun.AttrIceControlled); isControlled && !a.isControlling { panic("inbound isControlled && a.isControlling == false") } @@ -221,9 +208,9 @@ func (a *Agent) handleInboundControlled(m *stun.Message, local *stun.TransportAd } func (a *Agent) handleInboundControlling(m *stun.Message, local *stun.TransportAddr, remote *net.UDPAddr, localCandidate, remoteCandidate Candidate) { - if _, isControlling := m.GetOneAttribute(stun.AttrIceControlling); isControlling && a.isControlling == true { + if _, isControlling := m.GetOneAttribute(stun.AttrIceControlling); isControlling && a.isControlling { panic("inbound isControlling && a.isControlling == true") - } else if _, useCandidate := m.GetOneAttribute(stun.AttrUseCandidate); useCandidate && a.isControlling == true { + } else if _, useCandidate := m.GetOneAttribute(stun.AttrUseCandidate); useCandidate && a.isControlling { panic("useCandidate && a.isControlling == true") } @@ -243,7 +230,7 @@ func (a *Agent) HandleInbound(buf []byte, local *stun.TransportAddr, remote *net a.Lock() defer a.Unlock() - localCandidate := getTransportAddrCandidate(a.localCandidates, local) + localCandidate := getTransportAddrCandidate(a.LocalCandidates, local) if localCandidate == nil { // TODO debug // fmt.Printf("Could not find local candidate for %s:%d ", local.IP.String(), local.Port) diff --git a/pkg/ice/candidate.go b/pkg/ice/candidate.go index be5b5788..84e68b92 100644 --- a/pkg/ice/candidate.go +++ b/pkg/ice/candidate.go @@ -1,20 +1,19 @@ package ice import ( - "fmt" "math/rand" "time" ) +// Preference enums when generate Priority const ( - hostCandidatePreference uint16 = 126 - srflxCandidatePreference uint16 = 100 + HostCandidatePreference uint16 = 126 + SrflxCandidatePreference uint16 = 100 ) // Candidate represents an ICE candidate type Candidate interface { GetBase() *CandidateBase - String(component int) string } // CandidateBase represents an ICE candidate, a base with enough attributes @@ -26,7 +25,8 @@ type CandidateBase struct { LastSeen time.Time } -func (c *CandidateBase) priority(typePreference uint16, component uint16) uint16 { +// Priority computes the priority for this ICE Candidate +func (c *CandidateBase) Priority(typePreference uint16, component uint16) uint16 { localPreference := uint16(rand.Uint32() / 2) return (2^24)*typePreference + (2^8)*localPreference + @@ -38,12 +38,6 @@ type CandidateHost struct { CandidateBase } -// String for CandidateHost -func (c *CandidateHost) String(component int) string { - return fmt.Sprintf("udpcandidate %d udp %d %s %d typ host generation 0", - component, c.CandidateBase.priority(hostCandidatePreference, uint16(component)), c.CandidateBase.Address, c.CandidateBase.Port) -} - // GetBase returns the CandidateBase, attributes shared between all Candidates func (c *CandidateHost) GetBase() *CandidateBase { return &c.CandidateBase @@ -66,12 +60,6 @@ type CandidateSrflx struct { RemotePort int } -// String for CandidateSrflx -func (c *CandidateSrflx) String(component int) string { - return fmt.Sprintf("udpcandidate %d udp %d %s %d typ srflx raddr %s rport %d generation 0", - component, c.CandidateBase.priority(srflxCandidatePreference, uint16(component)), c.CandidateBase.Address, c.CandidateBase.Port, c.RemoteAddress, c.RemotePort) -} - // GetBase returns the CandidateBase, attributes shared between all Candidates func (c *CandidateSrflx) GetBase() *CandidateBase { return &c.CandidateBase diff --git a/rtcconfiguration.go b/rtcconfiguration.go index 3a85bec6..decc964b 100644 --- a/rtcconfiguration.go +++ b/rtcconfiguration.go @@ -1,6 +1,7 @@ package webrtc import ( + "fmt" "time" "github.com/pions/webrtc/pkg/ice" @@ -243,7 +244,10 @@ func (r *RTCPeerConnection) setICEServers(config RTCConfiguration) error { if err != nil { return err } - r.networkManager.AddURL(&url) + err = r.networkManager.AddURL(&url) + if err != nil { + fmt.Println(err) + } } } return nil diff --git a/rtcpeerconnection.go b/rtcpeerconnection.go index 6caafd6e..e7dd6056 100644 --- a/rtcpeerconnection.go +++ b/rtcpeerconnection.go @@ -65,6 +65,7 @@ type RTCPeerConnection struct { // ICE OnICEConnectionStateChange func(iceConnectionState ice.ConnectionState) + IceConnectionState ice.ConnectionState config RTCConfiguration @@ -179,10 +180,10 @@ func (r *RTCPeerConnection) iceStateChange(newState ice.ConnectionState) { r.Lock() defer r.Unlock() - // if r.OnICEConnectionStateChange != nil && r.iceState != newState { - // r.OnICEConnectionStateChange(newState) - // } - // r.iceState = newState + if r.OnICEConnectionStateChange != nil { + r.OnICEConnectionStateChange(newState) + } + r.IceConnectionState = newState } func (r *RTCPeerConnection) dataChannelEventHandler(e network.DataChannelEvent) { diff --git a/signaling.go b/signaling.go index 4a371a7c..8cfe4c6b 100644 --- a/signaling.go +++ b/signaling.go @@ -123,7 +123,7 @@ func (r *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) err for _, m := range r.remoteDescription.MediaDescriptions { for _, a := range m.Attributes { if strings.HasPrefix(a, "candidate") { - if c := sdp.ICECandidateBuild(a); c != nil { + if c := sdp.ICECandidateUnmarshal(a); c != nil { r.networkManager.IceAgent.AddRemoteCandidate(c) } else { fmt.Printf("Tried to parse ICE candidate, but failed %s ", a) @@ -140,6 +140,17 @@ func (r *RTCPeerConnection) SetRemoteDescription(desc RTCSessionDescription) err return nil } +func (r *RTCPeerConnection) generateLocalCandidates() []string { + r.networkManager.IceAgent.RLock() + defer r.networkManager.IceAgent.RUnlock() + + candidates := make([]string, 0) + for _, c := range r.networkManager.IceAgent.LocalCandidates { + candidates = append(candidates, sdp.ICECandidateMarshal(c)...) + } + return candidates +} + // CreateOffer starts the RTCPeerConnection and generates the localDescription func (r *RTCPeerConnection) CreateOffer(options *RTCOfferOptions) (RTCSessionDescription, error) { useIdentity := r.idpLoginURL != nil @@ -151,8 +162,8 @@ func (r *RTCPeerConnection) CreateOffer(options *RTCOfferOptions) (RTCSessionDes return RTCSessionDescription{}, &InvalidStateError{Err: ErrConnectionClosed} } - candidates := r.networkManager.IceAgent.LocalCandidates() d := sdp.NewJSEPSessionDescription(r.networkManager.DTLSFingerprint(), useIdentity) + candidates := r.generateLocalCandidates() r.addRTPMediaSections(d, []RTCRtpCodecType{RTCRtpCodecTypeAudio, RTCRtpCodecTypeVideo}, candidates) r.addDataMediaSection(d, candidates) @@ -195,7 +206,7 @@ func (r *RTCPeerConnection) CreateAnswer(options *RTCAnswerOptions) (RTCSessionD } } - candidates := r.networkManager.IceAgent.LocalCandidates() + candidates := r.generateLocalCandidates() d := sdp.NewJSEPSessionDescription(r.networkManager.DTLSFingerprint(), useIdentity) r.addRTPMediaSections(d, mediaSectionsToAdd, candidates)