Fix race issue caused by multiple gathering

resolves #707
This commit is contained in:
Konstantin Itskov
2019-06-07 17:54:09 -04:00
committed by Sean DuBois
parent e357abf415
commit 8c7f769e86
7 changed files with 70 additions and 42 deletions

View File

@@ -4,6 +4,7 @@ package webrtc
import ( import (
"github.com/pion/sdp/v2" "github.com/pion/sdp/v2"
"github.com/pion/webrtc/v2/internal/ice" "github.com/pion/webrtc/v2/internal/ice"
) )
@@ -46,6 +47,7 @@ func (api *API) NewICEGatherer(opts ICEGatherOptions) (*ICEGatherer, error) {
api.settingEngine.timeout.ICEPrflxAcceptanceMinWait, api.settingEngine.timeout.ICEPrflxAcceptanceMinWait,
api.settingEngine.timeout.ICERelayAcceptanceMinWait, api.settingEngine.timeout.ICERelayAcceptanceMinWait,
api.settingEngine.LoggerFactory, api.settingEngine.LoggerFactory,
api.settingEngine.candidates.ICETrickle,
api.settingEngine.candidates.ICENetworkTypes, api.settingEngine.candidates.ICENetworkTypes,
opts, opts,
) )

View File

@@ -3,7 +3,6 @@
package ice package ice
import ( import (
"errors"
"sync" "sync"
"time" "time"
@@ -54,6 +53,7 @@ func NewGatherer(
prflxAcceptanceMinWait, prflxAcceptanceMinWait,
relayAcceptanceMinWait *time.Duration, relayAcceptanceMinWait *time.Duration,
loggerFactory logging.LoggerFactory, loggerFactory logging.LoggerFactory,
agentIsTrickle bool,
networkTypes []NetworkType, networkTypes []NetworkType,
opts GatherOptions, opts GatherOptions,
) (*Gatherer, error) { ) (*Gatherer, error) {
@@ -68,7 +68,7 @@ func NewGatherer(
} }
} }
candidateTypes := []ice.CandidateType{} candidateTypes := make([]ice.CandidateType, 0)
if opts.ICEGatherPolicy == TransportPolicyRelay { if opts.ICEGatherPolicy == TransportPolicyRelay {
candidateTypes = append(candidateTypes, ice.CandidateTypeRelay) candidateTypes = append(candidateTypes, ice.CandidateTypeRelay)
} }
@@ -82,6 +82,7 @@ func NewGatherer(
keepaliveInterval: keepaliveInterval, keepaliveInterval: keepaliveInterval,
loggerFactory: loggerFactory, loggerFactory: loggerFactory,
log: loggerFactory.NewLogger("ice"), log: loggerFactory.NewLogger("ice"),
agentIsTrickle: agentIsTrickle,
networkTypes: networkTypes, networkTypes: networkTypes,
candidateTypes: candidateTypes, candidateTypes: candidateTypes,
candidateSelectionTimeout: candidateSelectionTimeout, candidateSelectionTimeout: candidateSelectionTimeout,
@@ -95,18 +96,13 @@ func NewGatherer(
func (g *Gatherer) createAgent() error { func (g *Gatherer) createAgent() error {
g.lock.Lock() g.lock.Lock()
defer g.lock.Unlock() defer g.lock.Unlock()
agentIsTrickle := g.onLocalCandidateHdlr != nil || g.onStateChangeHdlr != nil
if g.agent != nil { if g.agent != nil {
if !g.agentIsTrickle && agentIsTrickle {
return errors.New("ICEAgent created without OnCandidate or StateChange handler, but now has one set")
}
return nil return nil
} }
config := &ice.AgentConfig{ config := &ice.AgentConfig{
Trickle: agentIsTrickle, Trickle: g.agentIsTrickle,
Urls: g.validatedServers, Urls: g.validatedServers,
PortMin: g.portMin, PortMin: g.portMin,
PortMax: g.portMax, PortMax: g.portMax,
@@ -136,8 +132,7 @@ func (g *Gatherer) createAgent() error {
} }
g.agent = agent g.agent = agent
g.agentIsTrickle = agentIsTrickle if !g.agentIsTrickle {
if !agentIsTrickle {
g.state = GathererStateComplete g.state = GathererStateComplete
} }
@@ -262,6 +257,29 @@ func (g *Gatherer) getAgent() *ice.Agent {
return g.agent return g.agent
} }
// SignalCandidates imitates gathering process to backward support old tricle
// false behavior.
func (g *Gatherer) SignalCandidates() error {
candidates, err := g.GetLocalCandidates()
if err != nil {
return err
}
g.lock.Lock()
onLocalCandidateHdlr := g.onLocalCandidateHdlr
g.lock.Unlock()
if onLocalCandidateHdlr != nil {
for i := range candidates {
go onLocalCandidateHdlr(&candidates[i])
}
// Call the handler one last time with nil. This is a signal that candidate
// gathering is complete.
go onLocalCandidateHdlr(nil)
}
return nil
}
// AgentIsTrickle returns true if agent is in trickle mode. // AgentIsTrickle returns true if agent is in trickle mode.
func (g *Gatherer) AgentIsTrickle() bool { func (g *Gatherer) AgentIsTrickle() bool {
return g.agentIsTrickle return g.agentIsTrickle

View File

@@ -22,7 +22,7 @@ func TestGatherer_Success(t *testing.T) {
ICEServers: []Server{{URLs: []string{"stun:stun.l.google.com:19302"}}}, ICEServers: []Server{{URLs: []string{"stun:stun.l.google.com:19302"}}},
} }
gatherer, err := NewGatherer(0, 0, nil, nil, nil, nil, nil, nil, nil, logging.NewDefaultLoggerFactory(), nil, opts) gatherer, err := NewGatherer(0, 0, nil, nil, nil, nil, nil, nil, nil, logging.NewDefaultLoggerFactory(), false, nil, opts)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@@ -9,6 +9,7 @@ import (
"github.com/pion/ice" "github.com/pion/ice"
"github.com/pion/logging" "github.com/pion/logging"
"github.com/pion/webrtc/v2/internal/mux" "github.com/pion/webrtc/v2/internal/mux"
) )
@@ -260,8 +261,13 @@ func (t *Transport) NewEndpoint(f mux.MatchFunc) *mux.Endpoint {
} }
func (t *Transport) ensureGatherer() error { func (t *Transport) ensureGatherer() error {
if t.gatherer == nil || t.gatherer.getAgent() == nil { if t.gatherer == nil {
return errors.New("gatherer not started") return errors.New("gatherer not started")
} else if t.gatherer.getAgent() == nil && t.gatherer.AgentIsTrickle() {
// Special case for trickle=true. (issue-707)
if err := t.gatherer.createAgent(); err != nil {
return err
}
} }
return nil return nil

View File

@@ -7,12 +7,11 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/elliptic" "crypto/elliptic"
"crypto/rand" "crypto/rand"
mathRand "math/rand"
"regexp"
"fmt" "fmt"
"io" "io"
mathRand "math/rand"
"net" "net"
"regexp"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -22,6 +21,7 @@ import (
"github.com/pion/logging" "github.com/pion/logging"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/sdp/v2" "github.com/pion/sdp/v2"
"github.com/pion/webrtc/v2/internal/util" "github.com/pion/webrtc/v2/internal/util"
"github.com/pion/webrtc/v2/pkg/rtcerr" "github.com/pion/webrtc/v2/pkg/rtcerr"
) )
@@ -120,11 +120,16 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
return nil, err return nil, err
} }
gatherer, err := pc.createICEGatherer() pc.iceGatherer, err = pc.createICEGatherer()
if err != nil { if err != nil {
return nil, err return nil, err
} }
pc.iceGatherer = gatherer
if !pc.iceGatherer.AgentIsTrickle() {
if err = pc.iceGatherer.Gather(); err != nil {
return nil, err
}
}
// Create the ice transport // Create the ice transport
iceTransport := pc.createICETransport() iceTransport := pc.createICETransport()
@@ -410,12 +415,6 @@ func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription
return SessionDescription{}, err return SessionDescription{}, err
} }
if !pc.iceGatherer.AgentIsTrickle() {
if err = pc.iceGatherer.Gather(); err != nil {
return SessionDescription{}, err
}
}
candidates, err := pc.iceGatherer.GetLocalCandidates() candidates, err := pc.iceGatherer.GetLocalCandidates()
if err != nil { if err != nil {
return SessionDescription{}, err return SessionDescription{}, err
@@ -429,8 +428,8 @@ func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription
} }
if pc.configuration.SDPSemantics == SDPSemanticsPlanB { if pc.configuration.SDPSemantics == SDPSemanticsPlanB {
video := []*RTPTransceiver{} video := make([]*RTPTransceiver, 0)
audio := []*RTPTransceiver{} audio := make([]*RTPTransceiver, 0)
for _, t := range pc.GetTransceivers() { for _, t := range pc.GetTransceivers() {
switch t.kind { switch t.kind {
case RTPCodecTypeVideo: case RTPCodecTypeVideo:
@@ -475,14 +474,14 @@ func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription
m.WithPropertyAttribute("setup:actpass") m.WithPropertyAttribute("setup:actpass")
} }
sdp, err := d.Marshal() sdpBytes, err := d.Marshal()
if err != nil { if err != nil {
return SessionDescription{}, err return SessionDescription{}, err
} }
desc := SessionDescription{ desc := SessionDescription{
Type: SDPTypeOffer, Type: SDPTypeOffer,
SDP: string(sdp), SDP: string(sdpBytes),
parsed: d, parsed: d,
} }
pc.lastOffer = desc.SDP pc.lastOffer = desc.SDP
@@ -589,12 +588,6 @@ func (pc *PeerConnection) addAnswerMediaTransceivers(d *sdp.SessionDescription)
return nil, err return nil, err
} }
if !pc.iceGatherer.AgentIsTrickle() {
if err = pc.iceGatherer.Gather(); err != nil {
return nil, err
}
}
candidates, err := pc.iceGatherer.GetLocalCandidates() candidates, err := pc.iceGatherer.GetLocalCandidates()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -693,14 +686,14 @@ func (pc *PeerConnection) CreateAnswer(options *AnswerOptions) (SessionDescripti
return SessionDescription{}, err return SessionDescription{}, err
} }
sdp, err := d.Marshal() sdpBytes, err := d.Marshal()
if err != nil { if err != nil {
return SessionDescription{}, err return SessionDescription{}, err
} }
desc := SessionDescription{ desc := SessionDescription{
Type: SDPTypeAnswer, Type: SDPTypeAnswer,
SDP: string(sdp), SDP: string(sdpBytes),
parsed: d, parsed: d,
} }
pc.lastAnswer = desc.SDP pc.lastAnswer = desc.SDP
@@ -835,6 +828,13 @@ func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error {
} }
if !pc.iceGatherer.AgentIsTrickle() { if !pc.iceGatherer.AgentIsTrickle() {
// To support all unittests which are following the future trickle=true
// setup while also support the old trickle=false synchronous gathering
// process this is necessary to avoid calling Garther() in multiple
// pleces; which causes race conditions. (issue-707)
if err := pc.iceGatherer.SignalCandidates(); err != nil {
return err
}
return nil return nil
} }
return pc.iceGatherer.Gather() return pc.iceGatherer.Gather()
@@ -861,12 +861,6 @@ func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error {
return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} return &rtcerr.InvalidStateError{Err: ErrConnectionClosed}
} }
if !pc.iceGatherer.AgentIsTrickle() {
if err := pc.iceGatherer.Gather(); err != nil {
return err
}
}
desc.parsed = &sdp.SessionDescription{} desc.parsed = &sdp.SessionDescription{}
if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil { if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil {
return err return err

View File

@@ -7,8 +7,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/pion/webrtc/v2/pkg/rtcerr"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/pion/webrtc/v2/pkg/rtcerr"
) )
// newPair creates two new peer connections (an offerer and an answerer) // newPair creates two new peer connections (an offerer and an answerer)

View File

@@ -30,6 +30,7 @@ type SettingEngine struct {
ICERelayAcceptanceMinWait *time.Duration ICERelayAcceptanceMinWait *time.Duration
} }
candidates struct { candidates struct {
ICETrickle bool
ICENetworkTypes []NetworkType ICENetworkTypes []NetworkType
} }
LoggerFactory logging.LoggerFactory LoggerFactory logging.LoggerFactory
@@ -62,6 +63,12 @@ func (e *SettingEngine) SetEphemeralUDPPortRange(portMin, portMax uint16) error
return nil return nil
} }
// SetTrickle configures whether or not the ice agent should gather candidates
// via the trickle method or synchronously.
func (e *SettingEngine) SetTrickle(trickle bool) {
e.candidates.ICETrickle = trickle
}
// SetNetworkTypes configures what types of candidate networks are supported // SetNetworkTypes configures what types of candidate networks are supported
// during local and server reflexive gathering. // during local and server reflexive gathering.
func (e *SettingEngine) SetNetworkTypes(candidateTypes []NetworkType) { func (e *SettingEngine) SetNetworkTypes(candidateTypes []NetworkType) {