Add ICE Trickle support

Resolves pion/ice#51

Co-authored-by: Konstantin Itskov <konstantin.itskov@kovits.com>
This commit is contained in:
Sean DuBois
2019-05-29 01:00:34 -07:00
parent 5e6149d8af
commit 1d721199ef
7 changed files with 210 additions and 125 deletions

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.12
require (
github.com/pion/datachannel v1.4.3
github.com/pion/dtls v1.3.5
github.com/pion/ice v0.3.2
github.com/pion/ice v0.3.3
github.com/pion/logging v0.2.1
github.com/pion/quic v0.1.1
github.com/pion/rtcp v1.2.0

4
go.sum
View File

@@ -26,8 +26,8 @@ github.com/pion/datachannel v1.4.3 h1:tqS6YiqqAiFCxGGhvn1K7fHEzemK9Aov025dE/isGF
github.com/pion/datachannel v1.4.3/go.mod h1:SpMJbuu8v+qbA94m6lWQwSdCf8JKQvgmdSHDNtcbe+w=
github.com/pion/dtls v1.3.5 h1:mBioifvh6JSE9pD4FtJh5WoizygoqkOJNJyS5Ns+y1U=
github.com/pion/dtls v1.3.5/go.mod h1:CjlPLfQdsTg3G4AEXjJp8FY5bRweBlxHrgoFrN+fQsk=
github.com/pion/ice v0.3.2 h1:wBm0F9an2y+mpIlmn2sC4sHVjZnCl0K9zY23R3ijYmA=
github.com/pion/ice v0.3.2/go.mod h1:T57BaxW8oBC+CuV1+ZAAVm8/UsnpQB/S/hII+Y2Nyn0=
github.com/pion/ice v0.3.3 h1:ysSx7pDczIJx8XyYpFI2zoqtYhFD+B1cQdtY2ol5lT4=
github.com/pion/ice v0.3.3/go.mod h1:T57BaxW8oBC+CuV1+ZAAVm8/UsnpQB/S/hII+Y2Nyn0=
github.com/pion/logging v0.2.1 h1:LwASkBKZ+2ysGJ+jLv1E/9H1ge0k1nTfi1X+5zirkDk=
github.com/pion/logging v0.2.1/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/quic v0.1.1 h1:D951FV+TOqI9A0rTF7tHx0Loooqz+nyzjEyj8o3PuMA=

View File

@@ -21,7 +21,8 @@ type ICEGatherer struct {
validatedServers []*ice.URL
agent *ice.Agent
agentIsTrickle bool
agent *ice.Agent
portMin uint16
portMax uint16
@@ -29,7 +30,11 @@ type ICEGatherer struct {
connectionTimeout *time.Duration
keepaliveInterval *time.Duration
loggerFactory logging.LoggerFactory
log logging.LeveledLogger
networkTypes []NetworkType
onLocalCandidateHdlr func(candidate *ICECandidate)
onStateChangeHdlr func(state ICEGathererState)
}
// NewICEGatherer creates a new NewICEGatherer.
@@ -66,24 +71,27 @@ func NewICEGatherer(
connectionTimeout: connectionTimeout,
keepaliveInterval: keepaliveInterval,
loggerFactory: loggerFactory,
log: loggerFactory.NewLogger("ice"),
networkTypes: networkTypes,
candidateTypes: candidateTypes,
}, nil
}
// State indicates the current state of the ICE gatherer.
func (g *ICEGatherer) State() ICEGathererState {
g.lock.RLock()
defer g.lock.RUnlock()
return g.state
}
// Gather ICE candidates.
func (g *ICEGatherer) Gather() error {
func (g *ICEGatherer) createAgent() error {
g.lock.Lock()
defer g.lock.Unlock()
agentIsTrickle := g.onLocalCandidateHdlr != nil || g.onStateChangeHdlr != nil
if g.agent != nil {
if !g.agentIsTrickle && agentIsTrickle {
return errors.New("ICEAgent created without OnCandidate or StateChange handler, but now has one set")
}
return nil
}
config := &ice.AgentConfig{
Trickle: agentIsTrickle,
Urls: g.validatedServers,
PortMin: g.portMin,
PortMax: g.portMax,
@@ -108,11 +116,49 @@ func (g *ICEGatherer) Gather() error {
}
g.agent = agent
g.state = ICEGathererStateComplete
g.agentIsTrickle = agentIsTrickle
if agentIsTrickle {
g.state = ICEGathererStateComplete
}
return nil
}
// Gather ICE candidates.
func (g *ICEGatherer) Gather() error {
if err := g.createAgent(); err != nil {
return err
}
g.lock.Lock()
onLocalCandidateHdlr := g.onLocalCandidateHdlr
isTrickle := g.agentIsTrickle
agent := g.agent
g.lock.Unlock()
if !isTrickle {
return nil
}
g.setState(ICEGathererStateGathering)
if err := agent.OnCandidate(func(candidate ice.Candidate) {
if candidate != nil {
c, err := newICECandidateFromICE(candidate)
if err != nil {
g.log.Warnf("Failed to convert ice.Candidate: %s", err)
return
}
onLocalCandidateHdlr(&c)
} else {
g.setState(ICEGathererStateComplete)
onLocalCandidateHdlr(nil)
}
}); err != nil {
return err
}
return agent.GatherCandidates()
}
// Close prunes all local candidates, and closes the ports.
func (g *ICEGatherer) Close() error {
g.lock.Lock()
@@ -133,14 +179,11 @@ func (g *ICEGatherer) Close() error {
// GetLocalParameters returns the ICE parameters of the ICEGatherer.
func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) {
g.lock.RLock()
defer g.lock.RUnlock()
if g.agent == nil {
return ICEParameters{}, errors.New("gatherer not started")
if err := g.createAgent(); err != nil {
return ICEParameters{}, err
}
frag, pwd := g.agent.GetLocalUserCredentials()
return ICEParameters{
UsernameFragment: frag,
Password: pwd,
@@ -150,13 +193,9 @@ func (g *ICEGatherer) GetLocalParameters() (ICEParameters, error) {
// GetLocalCandidates returns the sequence of valid local candidates associated with the ICEGatherer.
func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) {
g.lock.RLock()
defer g.lock.RUnlock()
if g.agent == nil {
return nil, errors.New("gatherer not started")
if err := g.createAgent(); err != nil {
return nil, err
}
iceCandidates, err := g.agent.GetLocalCandidates()
if err != nil {
return nil, err
@@ -164,3 +203,41 @@ func (g *ICEGatherer) GetLocalCandidates() ([]ICECandidate, error) {
return newICECandidatesFromICE(iceCandidates)
}
// OnLocalCandidate sets an event handler which fires when a new local ICE candidate is available
func (g *ICEGatherer) OnLocalCandidate(f func(*ICECandidate)) {
g.lock.Lock()
defer g.lock.Unlock()
g.onLocalCandidateHdlr = f
}
// OnStateChange fires any time the ICEGatherer changes
func (g *ICEGatherer) OnStateChange(f func(ICEGathererState)) {
g.lock.Lock()
defer g.lock.Unlock()
g.onStateChangeHdlr = f
}
// State indicates the current state of the ICE gatherer.
func (g *ICEGatherer) State() ICEGathererState {
g.lock.RLock()
defer g.lock.RUnlock()
return g.state
}
func (g *ICEGatherer) setState(s ICEGathererState) {
g.lock.Lock()
g.state = s
hdlr := g.onStateChangeHdlr
g.lock.Unlock()
if hdlr != nil {
go hdlr(s)
}
}
func (g *ICEGatherer) getAgent() *ice.Agent {
g.lock.RLock()
defer g.lock.RUnlock()
return g.agent
}

View File

@@ -260,8 +260,7 @@ func (t *ICETransport) NewEndpoint(f mux.MatchFunc) *mux.Endpoint {
}
func (t *ICETransport) ensureGatherer() error {
if t.gatherer == nil ||
t.gatherer.agent == nil {
if t.gatherer == nil || t.gatherer.getAgent() == nil {
return errors.New("gatherer not started")
}

View File

@@ -43,7 +43,6 @@ type PeerConnection struct {
currentRemoteDescription *SessionDescription
pendingRemoteDescription *SessionDescription
signalingState SignalingState
iceGatheringState ICEGatheringState
iceConnectionState ICEConnectionState
connectionState PeerConnectionState
@@ -69,8 +68,6 @@ type PeerConnection struct {
onICEConnectionStateChangeHandler func(ICEConnectionState)
onTrackHandler func(*Track, *RTPReceiver)
onDataChannelHandler func(*DataChannel)
onICECandidateHandler func(*ICECandidate)
onICEGatheringStateChangeHandler func()
iceGatherer *ICEGatherer
iceTransport *ICETransport
@@ -111,7 +108,6 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
lastAnswer: "",
signalingState: SignalingStateStable,
iceConnectionState: ICEConnectionStateNew,
iceGatheringState: ICEGatheringStateNew,
connectionState: PeerConnectionStateNew,
dataChannels: make(map[uint16]*DataChannel),
@@ -124,19 +120,12 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
return nil, err
}
// For now we eagerly allocate and start the gatherer
gatherer, err := pc.createICEGatherer()
if err != nil {
return nil, err
}
pc.iceGatherer = gatherer
err = pc.gather()
if err != nil {
return nil, err
}
// Create the ice transport
iceTransport := pc.createICETransport()
pc.iceTransport = iceTransport
@@ -252,57 +241,14 @@ func (pc *PeerConnection) OnDataChannel(f func(*DataChannel)) {
// OnICECandidate sets an event handler which is invoked when a new ICE
// candidate is found.
// BUG: trickle ICE is not supported so this event is triggered immediately when
// SetLocalDescription is called. Typically, you only need to use this method
// if you want API compatibility with the JavaScript/Wasm bindings.
func (pc *PeerConnection) OnICECandidate(f func(*ICECandidate)) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.onICECandidateHandler = f
pc.iceGatherer.OnLocalCandidate(f)
}
// OnICEGatheringStateChange sets an event handler which is invoked when the
// ICE candidate gathering state has changed.
// BUG: trickle ICE is not supported so this event is triggered immediately when
// SetLocalDescription is called. Typically, you only need to use this method
// if you want API compatibility with the JavaScript/Wasm bindings.
func (pc *PeerConnection) OnICEGatheringStateChange(f func()) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.onICEGatheringStateChangeHandler = f
}
// signalICECandidateGatheringComplete should be called after ICE candidate
// gathering is complete. It triggers the appropriate event handlers in order to
// emulate a trickle ICE process.
func (pc *PeerConnection) signalICECandidateGatheringComplete() error {
pc.mu.Lock()
defer pc.mu.Unlock()
// Call onICECandidateHandler for all candidates.
if pc.onICECandidateHandler != nil {
candidates, err := pc.iceGatherer.GetLocalCandidates()
if err != nil {
return err
}
for i := range candidates {
go pc.onICECandidateHandler(&candidates[i])
}
// Call the handler one last time with nil. This is a signal that candidate
// gathering is complete.
go pc.onICECandidateHandler(nil)
}
pc.iceGatheringState = ICEGatheringStateComplete
// Also trigger the onICEGatheringStateChangeHandler
if pc.onICEGatheringStateChangeHandler != nil {
// Note: Gathering is already done at this point, but some clients might
// still expect the state change handler to be triggered.
go pc.onICEGatheringStateChangeHandler()
}
return nil
func (pc *PeerConnection) OnICEGatheringStateChange(f func(ICEGathererState)) {
pc.iceGatherer.OnStateChange(f)
}
// OnTrack sets an event handler which is called when remote track
@@ -464,6 +410,12 @@ func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription
return SessionDescription{}, err
}
if !pc.iceGatherer.agentIsTrickle {
if err = pc.iceGatherer.Gather(); err != nil {
return SessionDescription{}, err
}
}
candidates, err := pc.iceGatherer.GetLocalCandidates()
if err != nil {
return SessionDescription{}, err
@@ -549,10 +501,6 @@ func (pc *PeerConnection) createICEGatherer() (*ICEGatherer, error) {
return g, nil
}
func (pc *PeerConnection) gather() error {
return pc.iceGatherer.Gather()
}
func (pc *PeerConnection) createICETransport() *ICETransport {
t := pc.api.NewICETransport(pc.iceGatherer)
@@ -641,6 +589,12 @@ func (pc *PeerConnection) addAnswerMediaTransceivers(d *sdp.SessionDescription)
return nil, err
}
if !pc.iceGatherer.agentIsTrickle {
if err = pc.iceGatherer.Gather(); err != nil {
return nil, err
}
}
candidates, err := pc.iceGatherer.GetLocalCandidates()
if err != nil {
return nil, err
@@ -872,8 +826,6 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error {
}
}
// TODO: Initiate ICE candidate gathering?
desc.parsed = &sdp.SessionDescription{}
if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil {
return err
@@ -882,14 +834,10 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error {
return err
}
// Call the appropriate event handlers to signal that ICE candidate gathering
// is complete. In reality it completed a while ago, but triggering these
// events helps maintain API compatibility with the JavaScript/Wasm bindings.
if err := pc.signalICECandidateGatheringComplete(); err != nil {
return err
if !pc.iceGatherer.agentIsTrickle {
return nil
}
return nil
return pc.iceGatherer.Gather()
}
// LocalDescription returns pendingLocalDescription if it is not null and
@@ -897,8 +845,8 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error {
// determine if setLocalDescription has already been called.
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-localdescription
func (pc *PeerConnection) LocalDescription() *SessionDescription {
if pc.pendingLocalDescription != nil {
return pc.pendingLocalDescription
if localDescription := pc.PendingLocalDescription(); localDescription != nil {
return localDescription
}
return pc.currentLocalDescription
}
@@ -913,6 +861,12 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error {
return &rtcerr.InvalidStateError{Err: ErrConnectionClosed}
}
if !pc.iceGatherer.agentIsTrickle {
if err := pc.iceGatherer.Gather(); err != nil {
return err
}
}
desc.parsed = &sdp.SessionDescription{}
if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil {
return err
@@ -1729,18 +1683,8 @@ func (pc *PeerConnection) addTransceiverSDP(d *sdp.SessionDescription, midValue
}
media = media.WithPropertyAttribute(t.Direction.String())
for _, c := range candidates {
sdpCandidate := iceCandidateToSDP(c)
sdpCandidate.ExtensionAttributes = append(sdpCandidate.ExtensionAttributes, sdp.ICECandidateAttribute{Key: "generation", Value: "0"})
sdpCandidate.Component = 1
media.WithICECandidate(sdpCandidate)
sdpCandidate.Component = 2
media.WithICECandidate(sdpCandidate)
}
if len(candidates) != 0 {
media.WithPropertyAttribute("end-of-candidates")
}
addCandidatesToMediaDescriptions(candidates, media)
d.WithMedia(media)
return nil
@@ -1768,16 +1712,7 @@ func (pc *PeerConnection) addDataMediaSection(d *sdp.SessionDescription, midValu
WithPropertyAttribute("sctpmap:5000 webrtc-datachannel 1024").
WithICECredentials(iceParams.UsernameFragment, iceParams.Password)
for _, c := range candidates {
sdpCandidate := iceCandidateToSDP(c)
sdpCandidate.ExtensionAttributes = append(sdpCandidate.ExtensionAttributes, sdp.ICECandidateAttribute{Key: "generation", Value: "0"})
sdpCandidate.Component = 1
media.WithICECandidate(sdpCandidate)
sdpCandidate.Component = 2
media.WithICECandidate(sdpCandidate)
}
media.WithPropertyAttribute("end-of-candidates")
addCandidatesToMediaDescriptions(candidates, media)
d.WithMedia(media)
}
@@ -1812,12 +1747,39 @@ func (pc *PeerConnection) newRTPTransceiver(
return t
}
func (pc *PeerConnection) populateLocalCandidates(orig *SessionDescription) *SessionDescription {
if orig == nil {
return nil
} else if pc.iceGatherer == nil {
return orig
}
candidates, err := pc.iceGatherer.GetLocalCandidates()
if err != nil {
return orig
}
parsed := pc.pendingLocalDescription.parsed
for _, m := range parsed.MediaDescriptions {
addCandidatesToMediaDescriptions(candidates, m)
}
sdp, err := parsed.Marshal()
if err != nil {
return orig
}
return &SessionDescription{
SDP: string(sdp),
Type: pc.pendingLocalDescription.Type,
}
}
// CurrentLocalDescription represents the local description that was
// successfully negotiated the last time the PeerConnection transitioned
// into the stable state plus any local candidates that have been generated
// by the ICEAgent since the offer or answer was created.
func (pc *PeerConnection) CurrentLocalDescription() *SessionDescription {
return pc.currentLocalDescription
return pc.populateLocalCandidates(pc.currentLocalDescription)
}
// PendingLocalDescription represents a local description that is in the
@@ -1825,7 +1787,7 @@ func (pc *PeerConnection) CurrentLocalDescription() *SessionDescription {
// generated by the ICEAgent since the offer or answer was created. If the
// PeerConnection is in the stable state, the value is null.
func (pc *PeerConnection) PendingLocalDescription() *SessionDescription {
return pc.pendingLocalDescription
return pc.populateLocalCandidates(pc.pendingLocalDescription)
}
// CurrentRemoteDescription represents the last remote description that was
@@ -1854,7 +1816,14 @@ func (pc *PeerConnection) SignalingState() SignalingState {
// ICEGatheringState attribute returns the ICE gathering state of the
// PeerConnection instance.
func (pc *PeerConnection) ICEGatheringState() ICEGatheringState {
return pc.iceGatheringState
switch pc.iceGatherer.State() {
case ICEGathererStateNew:
return ICEGatheringStateNew
case ICEGathererStateGathering:
return ICEGatheringStateGathering
default:
return ICEGatheringStateComplete
}
}
// ConnectionState attribute returns the connection state of the
@@ -1862,3 +1831,17 @@ func (pc *PeerConnection) ICEGatheringState() ICEGatheringState {
func (pc *PeerConnection) ConnectionState() PeerConnectionState {
return pc.connectionState
}
func addCandidatesToMediaDescriptions(candidates []ICECandidate, m *sdp.MediaDescription) {
for _, c := range candidates {
sdpCandidate := iceCandidateToSDP(c)
sdpCandidate.ExtensionAttributes = append(sdpCandidate.ExtensionAttributes, sdp.ICECandidateAttribute{Key: "generation", Value: "0"})
sdpCandidate.Component = 1
m.WithICECandidate(sdpCandidate)
sdpCandidate.Component = 2
m.WithICECandidate(sdpCandidate)
}
if len(candidates) != 0 {
m.WithPropertyAttribute("end-of-candidates")
}
}

View File

@@ -351,7 +351,6 @@ func TestPeerConnection_PeropertyGetters(t *testing.T) {
currentRemoteDescription: &SessionDescription{},
pendingRemoteDescription: &SessionDescription{},
signalingState: SignalingStateHaveLocalOffer,
iceGatheringState: ICEGatheringStateGathering,
iceConnectionState: ICEConnectionStateChecking,
connectionState: PeerConnectionStateConnecting,
}
@@ -361,7 +360,6 @@ func TestPeerConnection_PeropertyGetters(t *testing.T) {
assert.Equal(t, pc.currentRemoteDescription, pc.CurrentRemoteDescription(), "should match")
assert.Equal(t, pc.pendingRemoteDescription, pc.PendingRemoteDescription(), "should match")
assert.Equal(t, pc.signalingState, pc.SignalingState(), "should match")
assert.Equal(t, pc.iceGatheringState, pc.ICEGatheringState(), "should match")
assert.Equal(t, pc.iceConnectionState, pc.ICEConnectionState(), "should match")
assert.Equal(t, pc.connectionState, pc.ConnectionState(), "should match")
}

View File

@@ -389,3 +389,31 @@ func TestPeerConnection_EventHandlers(t *testing.T) {
t.Fatalf("timed out waiting for one or more events handlers to be called (these *were* called: %+v)", wasCalled)
}
}
func TestMultipleOfferAnswer(t *testing.T) {
nonTricklePeerConn, err := NewPeerConnection(Configuration{})
if err != nil {
t.Errorf("New PeerConnection: got error: %v", err)
}
if _, err = nonTricklePeerConn.CreateOffer(nil); err != nil {
t.Errorf("First Offer: got error: %v", err)
}
if _, err = nonTricklePeerConn.CreateOffer(nil); err != nil {
t.Errorf("Second Offer: got error: %v", err)
}
tricklePeerConn, err := NewPeerConnection(Configuration{})
if err != nil {
t.Errorf("New PeerConnection: got error: %v", err)
}
tricklePeerConn.OnICECandidate(func(i *ICECandidate) {
})
if _, err = tricklePeerConn.CreateOffer(nil); err != nil {
t.Errorf("First Offer: got error: %v", err)
}
if _, err = tricklePeerConn.CreateOffer(nil); err != nil {
t.Errorf("Second Offer: got error: %v", err)
}
}