diff --git a/agent.go b/agent.go index ba66be2..daf58af 100644 --- a/agent.go +++ b/agent.go @@ -55,6 +55,7 @@ type bindingRequest struct { type Agent struct { onConnectionStateChangeHdlr func(ConnectionState) onSelectedCandidatePairChangeHdlr func(Candidate, Candidate) + onCandidateHdlr func(Candidate) // Used to block double Dial/Accept opened bool @@ -68,6 +69,7 @@ type Agent struct { // force candidate to be contacted immediately (instead of waiting for connectivityChan) forceCandidateContact chan bool + trickle bool tieBreaker uint64 connectionState ConnectionState gatheringState GatheringState @@ -78,12 +80,12 @@ type Agent struct { portmin uint16 portmax uint16 - //How long should a pair stay quiet before we declare it dead? - //0 means never timeout + // How long should a pair stay quiet before we declare it dead? + // 0 means never timeout connectionTimeout time.Duration - //How often should we send keepalive packets? - //0 means never + // How often should we send keepalive packets? + // 0 means never keepaliveInterval time.Duration // How after should we run our internal taskLoop @@ -139,6 +141,10 @@ type AgentConfig struct { PortMin uint16 PortMax uint16 + // Trickle specifies whether or not ice agent should trickle candidates or + // work perform synchronous gathering. + Trickle bool + // ConnectionTimeout defaults to 30 seconds when this property is nil. // If the duration is 0, we will never timeout this connection. ConnectionTimeout *time.Duration @@ -177,7 +183,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { a := &Agent{ tieBreaker: rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), - gatheringState: GatheringStateComplete, // TODO trickle-ice + gatheringState: GatheringStateNew, connectionState: ConnectionStateNew, localCandidates: make(map[NetworkType][]Candidate), remoteCandidates: make(map[NetworkType][]Candidate), @@ -191,6 +197,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { done: make(chan struct{}), portmin: config.PortMin, portmax: config.PortMax, + trickle: config.Trickle, log: loggerFactory.NewLogger("ice"), forceCandidateContact: make(chan bool, 1), @@ -221,12 +228,12 @@ func NewAgent(config *AgentConfig) (*Agent, error) { a.taskLoopInterval = config.taskLoopInterval } - // Initialize local candidates - gatherCandidatesLocal(a, config.NetworkTypes) - gatherCandidatesReflective(a, config.Urls, config.NetworkTypes) - go a.taskLoop() + // Initialize local candidates + if !a.trickle { + a.gatherCandidates(config) + } return a, nil } @@ -245,6 +252,14 @@ func (a *Agent) OnSelectedCandidatePairChange(f func(Candidate, Candidate)) erro }) } +// OnCandidate sets a handler that is fired when new candidates gathered. When +// the gathering process complete the last candidate is nil. +func (a *Agent) OnCandidate(f func(Candidate)) error { + return a.run(func(agent *Agent) { + agent.onCandidateHdlr = f + }) +} + func (a *Agent) onSelectedCandidatePairChange(p *candidatePair) { if p != nil { if a.onSelectedCandidatePairChangeHdlr != nil { diff --git a/agent_test.go b/agent_test.go index b7cf04a..7d3ba42 100644 --- a/agent_test.go +++ b/agent_test.go @@ -3,6 +3,7 @@ package ice import ( "context" "net" + "sync" "testing" "time" @@ -318,6 +319,7 @@ func TestConnectivityOnStartup(t *testing.T) { cfg := &AgentConfig{ Urls: []*URL{}, + Trickle: false, NetworkTypes: supportedNetworkTypes, taskLoopInterval: time.Hour, LoggerFactory: logging.NewDefaultLoggerFactory(), @@ -513,10 +515,14 @@ func TestConnectionStateCallback(t *testing.T) { lim := test.TimeOut(time.Second * 5) defer lim.Stop() + var wg sync.WaitGroup + wg.Add(2) + timeoutDuration := time.Second KeepaliveInterval := time.Duration(0) cfg := &AgentConfig{ Urls: []*URL{}, + Trickle: true, NetworkTypes: supportedNetworkTypes, ConnectionTimeout: &timeoutDuration, KeepaliveInterval: &KeepaliveInterval, @@ -527,11 +533,35 @@ func TestConnectionStateCallback(t *testing.T) { if err != nil { t.Error(err) } + err = aAgent.OnCandidate(func(candidate Candidate) { + if candidate == nil { + wg.Done() + } + }) + if err != nil { + panic(err) + } + err = aAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes) + if err != nil { + panic(err) + } bAgent, err := NewAgent(cfg) if err != nil { t.Error(err) } + err = bAgent.OnCandidate(func(candidate Candidate) { + if candidate == nil { + wg.Done() + } + }) + if err != nil { + panic(err) + } + err = bAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes) + if err != nil { + panic(err) + } isChecking := make(chan interface{}) isConnected := make(chan interface{}) @@ -553,6 +583,7 @@ func TestConnectionStateCallback(t *testing.T) { t.Error(err) } + wg.Wait() connect(aAgent, bAgent) <-isChecking diff --git a/errors.go b/errors.go index 96a850a..f5e98ce 100644 --- a/errors.go +++ b/errors.go @@ -42,6 +42,10 @@ var ( // ErrRemotePwdEmpty indicates agent was started with an empty remote pwd ErrRemotePwdEmpty = errors.New("remote pwd is empty") + // ErrNoOnCandidateHandler indicates agent was started without OnCandidate + // while running in trickle mode. + ErrNoOnCandidateHandler = errors.New("no OnCandidate provided") + // ErrUsernameEmpty indicates agent was give TURN URL with an empty Username ErrUsernameEmpty = errors.New("username is empty") diff --git a/gather.go b/gather.go index 32bb484..727c07d 100644 --- a/gather.go +++ b/gather.go @@ -90,7 +90,38 @@ func listenUDP(portMax, portMin int, network string, laddr *net.UDPAddr) (*net.U return nil, ErrPort } -func gatherCandidatesLocal(a *Agent, networkTypes []NetworkType) { +// GatherCandidates initiates the trickle based gathering process. +func (a *Agent) GatherCandidates(urls []*URL, networkTypes []NetworkType) error { + return a.run(func(agent *Agent) { + if a.gatheringState == GatheringStateGathering { + a.log.Warnf("Attempting to gather candidates during gathering state\n") + return + } + + go a.gatherCandidates(&AgentConfig{ + Urls: urls, + NetworkTypes: networkTypes, + }) + }) +} + +func (a *Agent) gatherCandidates(config *AgentConfig) { + a.gatheringState = GatheringStateGathering + a.gatherCandidatesLocal(config.NetworkTypes) + a.gatherCandidatesSrflx(config.Urls, config.NetworkTypes) + if err := a.run(func(agent *Agent) { + if a.onCandidateHdlr != nil { + go a.onCandidateHdlr(nil) + } + }); err != nil { + a.log.Warnf("Failed to run onCandidateHdlr task: %v\n", err) + return + } + a.gatheringState = GatheringStateComplete + +} + +func (a *Agent) gatherCandidatesLocal(networkTypes []NetworkType) { localIPs := localInterfaces(networkTypes) for _, ip := range localIPs { for _, network := range supportedNetworks { @@ -107,16 +138,30 @@ func gatherCandidatesLocal(a *Agent, networkTypes []NetworkType) { continue } - set := a.localCandidates[c.NetworkType()] - set = append(set, c) - a.localCandidates[c.NetworkType()] = set + if err := a.run(func(agent *Agent) { + set := a.localCandidates[c.NetworkType()] + set = append(set, c) + a.localCandidates[c.NetworkType()] = set + }); err != nil { + a.log.Warnf("Failed to append to localCandidates: %v\n", err) + continue + } c.start(a, conn) + + if err := a.run(func(agent *Agent) { + if a.onCandidateHdlr != nil { + go a.onCandidateHdlr(c) + } + }); err != nil { + a.log.Warnf("Failed to run onCandidateHdlr task: %v\n", err) + continue + } } } } -func gatherCandidatesReflective(a *Agent, urls []*URL, networkTypes []NetworkType) { +func (a *Agent) gatherCandidatesSrflx(urls []*URL, networkTypes []NetworkType) { localIPs := localInterfaces(networkTypes) for _, networkType := range networkTypes { network := networkType.String() @@ -153,12 +198,26 @@ func gatherCandidatesReflective(a *Agent, urls []*URL, networkTypes []NetworkTyp continue } - set := a.localCandidates[c.NetworkType()] - set = append(set, c) - a.localCandidates[c.NetworkType()] = set + if err := a.run(func(agent *Agent) { + set := a.localCandidates[c.NetworkType()] + set = append(set, c) + a.localCandidates[c.NetworkType()] = set + }); err != nil { + a.log.Warnf("Failed to append to localCandidates: %v\n", err) + continue + } c.start(a, conn) + if err := a.run(func(agent *Agent) { + if a.onCandidateHdlr != nil { + go a.onCandidateHdlr(c) + } + }); err != nil { + a.log.Warnf("Failed to run onCandidateHdlr task: %v\n", err) + continue + } + default: a.log.Warnf("scheme %s is not implemented\n", url.Scheme) continue diff --git a/transport_test.go b/transport_test.go index 27f40a8..86fde19 100644 --- a/transport_test.go +++ b/transport_test.go @@ -2,6 +2,7 @@ package ice import ( "context" + "sync" "testing" "time" @@ -196,10 +197,16 @@ func pipe() (*Conn, *Conn) { aNotifier, aConnected := onConnected() bNotifier, bConnected := onConnected() - aAgent, err := NewAgent(&AgentConfig{ + var wg sync.WaitGroup + wg.Add(2) + + cfg := &AgentConfig{ Urls: urls, + Trickle: true, NetworkTypes: supportedNetworkTypes, - }) + } + + aAgent, err := NewAgent(cfg) if err != nil { panic(err) } @@ -207,19 +214,41 @@ func pipe() (*Conn, *Conn) { if err != nil { panic(err) } - - bAgent, err := NewAgent(&AgentConfig{ - Urls: urls, - NetworkTypes: supportedNetworkTypes, + err = aAgent.OnCandidate(func(candidate Candidate) { + if candidate == nil { + wg.Done() + } }) if err != nil { panic(err) } + err = aAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes) + if err != nil { + panic(err) + } + + bAgent, err := NewAgent(cfg) + if err != nil { + panic(err) + } err = bAgent.OnConnectionStateChange(bNotifier) if err != nil { panic(err) } + err = bAgent.OnCandidate(func(candidate Candidate) { + if candidate == nil { + wg.Done() + } + }) + if err != nil { + panic(err) + } + err = bAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes) + if err != nil { + panic(err) + } + wg.Wait() aConn, bConn := connect(aAgent, bAgent) // Ensure pair selected @@ -236,12 +265,18 @@ func pipeWithTimeout(iceTimeout time.Duration, iceKeepalive time.Duration) (*Con aNotifier, aConnected := onConnected() bNotifier, bConnected := onConnected() - aAgent, err := NewAgent(&AgentConfig{ + var wg sync.WaitGroup + wg.Add(2) + + cfg := &AgentConfig{ Urls: urls, + Trickle: true, ConnectionTimeout: &iceTimeout, KeepaliveInterval: &iceKeepalive, NetworkTypes: supportedNetworkTypes, - }) + } + + aAgent, err := NewAgent(cfg) if err != nil { panic(err) } @@ -249,21 +284,41 @@ func pipeWithTimeout(iceTimeout time.Duration, iceKeepalive time.Duration) (*Con if err != nil { panic(err) } - - bAgent, err := NewAgent(&AgentConfig{ - Urls: urls, - ConnectionTimeout: &iceTimeout, - KeepaliveInterval: &iceKeepalive, - NetworkTypes: supportedNetworkTypes, + err = aAgent.OnCandidate(func(candidate Candidate) { + if candidate == nil { + wg.Done() + } }) if err != nil { panic(err) } + err = aAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes) + if err != nil { + panic(err) + } + + bAgent, err := NewAgent(cfg) + if err != nil { + panic(err) + } err = bAgent.OnConnectionStateChange(bNotifier) if err != nil { panic(err) } + err = bAgent.OnCandidate(func(candidate Candidate) { + if candidate == nil { + wg.Done() + } + }) + if err != nil { + panic(err) + } + err = bAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes) + if err != nil { + panic(err) + } + wg.Wait() aConn, bConn := connect(aAgent, bAgent) // Ensure pair selected