Add trickle support

Resolves #51
This commit is contained in:
Konstantin Itskov
2019-05-25 16:22:03 -04:00
committed by Sean DuBois
parent f88c7d2cfa
commit e928d7b932
5 changed files with 195 additions and 31 deletions

View File

@@ -55,6 +55,7 @@ type bindingRequest struct {
type Agent struct { type Agent struct {
onConnectionStateChangeHdlr func(ConnectionState) onConnectionStateChangeHdlr func(ConnectionState)
onSelectedCandidatePairChangeHdlr func(Candidate, Candidate) onSelectedCandidatePairChangeHdlr func(Candidate, Candidate)
onCandidateHdlr func(Candidate)
// Used to block double Dial/Accept // Used to block double Dial/Accept
opened bool opened bool
@@ -68,6 +69,7 @@ type Agent struct {
// force candidate to be contacted immediately (instead of waiting for connectivityChan) // force candidate to be contacted immediately (instead of waiting for connectivityChan)
forceCandidateContact chan bool forceCandidateContact chan bool
trickle bool
tieBreaker uint64 tieBreaker uint64
connectionState ConnectionState connectionState ConnectionState
gatheringState GatheringState gatheringState GatheringState
@@ -78,12 +80,12 @@ type Agent struct {
portmin uint16 portmin uint16
portmax uint16 portmax uint16
//How long should a pair stay quiet before we declare it dead? // How long should a pair stay quiet before we declare it dead?
//0 means never timeout // 0 means never timeout
connectionTimeout time.Duration connectionTimeout time.Duration
//How often should we send keepalive packets? // How often should we send keepalive packets?
//0 means never // 0 means never
keepaliveInterval time.Duration keepaliveInterval time.Duration
// How after should we run our internal taskLoop // How after should we run our internal taskLoop
@@ -139,6 +141,10 @@ type AgentConfig struct {
PortMin uint16 PortMin uint16
PortMax uint16 PortMax uint16
// Trickle specifies whether or not ice agent should trickle candidates or
// work perform synchronous gathering.
Trickle bool
// ConnectionTimeout defaults to 30 seconds when this property is nil. // ConnectionTimeout defaults to 30 seconds when this property is nil.
// If the duration is 0, we will never timeout this connection. // If the duration is 0, we will never timeout this connection.
ConnectionTimeout *time.Duration ConnectionTimeout *time.Duration
@@ -177,7 +183,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) {
a := &Agent{ a := &Agent{
tieBreaker: rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), tieBreaker: rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
gatheringState: GatheringStateComplete, // TODO trickle-ice gatheringState: GatheringStateNew,
connectionState: ConnectionStateNew, connectionState: ConnectionStateNew,
localCandidates: make(map[NetworkType][]Candidate), localCandidates: make(map[NetworkType][]Candidate),
remoteCandidates: make(map[NetworkType][]Candidate), remoteCandidates: make(map[NetworkType][]Candidate),
@@ -191,6 +197,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) {
done: make(chan struct{}), done: make(chan struct{}),
portmin: config.PortMin, portmin: config.PortMin,
portmax: config.PortMax, portmax: config.PortMax,
trickle: config.Trickle,
log: loggerFactory.NewLogger("ice"), log: loggerFactory.NewLogger("ice"),
forceCandidateContact: make(chan bool, 1), forceCandidateContact: make(chan bool, 1),
@@ -221,12 +228,12 @@ func NewAgent(config *AgentConfig) (*Agent, error) {
a.taskLoopInterval = config.taskLoopInterval a.taskLoopInterval = config.taskLoopInterval
} }
// Initialize local candidates
gatherCandidatesLocal(a, config.NetworkTypes)
gatherCandidatesReflective(a, config.Urls, config.NetworkTypes)
go a.taskLoop() go a.taskLoop()
// Initialize local candidates
if !a.trickle {
a.gatherCandidates(config)
}
return a, nil return a, nil
} }
@@ -245,6 +252,14 @@ func (a *Agent) OnSelectedCandidatePairChange(f func(Candidate, Candidate)) erro
}) })
} }
// OnCandidate sets a handler that is fired when new candidates gathered. When
// the gathering process complete the last candidate is nil.
func (a *Agent) OnCandidate(f func(Candidate)) error {
return a.run(func(agent *Agent) {
agent.onCandidateHdlr = f
})
}
func (a *Agent) onSelectedCandidatePairChange(p *candidatePair) { func (a *Agent) onSelectedCandidatePairChange(p *candidatePair) {
if p != nil { if p != nil {
if a.onSelectedCandidatePairChangeHdlr != nil { if a.onSelectedCandidatePairChangeHdlr != nil {

View File

@@ -3,6 +3,7 @@ package ice
import ( import (
"context" "context"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
@@ -318,6 +319,7 @@ func TestConnectivityOnStartup(t *testing.T) {
cfg := &AgentConfig{ cfg := &AgentConfig{
Urls: []*URL{}, Urls: []*URL{},
Trickle: false,
NetworkTypes: supportedNetworkTypes, NetworkTypes: supportedNetworkTypes,
taskLoopInterval: time.Hour, taskLoopInterval: time.Hour,
LoggerFactory: logging.NewDefaultLoggerFactory(), LoggerFactory: logging.NewDefaultLoggerFactory(),
@@ -513,10 +515,14 @@ func TestConnectionStateCallback(t *testing.T) {
lim := test.TimeOut(time.Second * 5) lim := test.TimeOut(time.Second * 5)
defer lim.Stop() defer lim.Stop()
var wg sync.WaitGroup
wg.Add(2)
timeoutDuration := time.Second timeoutDuration := time.Second
KeepaliveInterval := time.Duration(0) KeepaliveInterval := time.Duration(0)
cfg := &AgentConfig{ cfg := &AgentConfig{
Urls: []*URL{}, Urls: []*URL{},
Trickle: true,
NetworkTypes: supportedNetworkTypes, NetworkTypes: supportedNetworkTypes,
ConnectionTimeout: &timeoutDuration, ConnectionTimeout: &timeoutDuration,
KeepaliveInterval: &KeepaliveInterval, KeepaliveInterval: &KeepaliveInterval,
@@ -527,11 +533,35 @@ func TestConnectionStateCallback(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
err = aAgent.OnCandidate(func(candidate Candidate) {
if candidate == nil {
wg.Done()
}
})
if err != nil {
panic(err)
}
err = aAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes)
if err != nil {
panic(err)
}
bAgent, err := NewAgent(cfg) bAgent, err := NewAgent(cfg)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
err = bAgent.OnCandidate(func(candidate Candidate) {
if candidate == nil {
wg.Done()
}
})
if err != nil {
panic(err)
}
err = bAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes)
if err != nil {
panic(err)
}
isChecking := make(chan interface{}) isChecking := make(chan interface{})
isConnected := make(chan interface{}) isConnected := make(chan interface{})
@@ -553,6 +583,7 @@ func TestConnectionStateCallback(t *testing.T) {
t.Error(err) t.Error(err)
} }
wg.Wait()
connect(aAgent, bAgent) connect(aAgent, bAgent)
<-isChecking <-isChecking

View File

@@ -42,6 +42,10 @@ var (
// ErrRemotePwdEmpty indicates agent was started with an empty remote pwd // ErrRemotePwdEmpty indicates agent was started with an empty remote pwd
ErrRemotePwdEmpty = errors.New("remote pwd is empty") ErrRemotePwdEmpty = errors.New("remote pwd is empty")
// ErrNoOnCandidateHandler indicates agent was started without OnCandidate
// while running in trickle mode.
ErrNoOnCandidateHandler = errors.New("no OnCandidate provided")
// ErrUsernameEmpty indicates agent was give TURN URL with an empty Username // ErrUsernameEmpty indicates agent was give TURN URL with an empty Username
ErrUsernameEmpty = errors.New("username is empty") ErrUsernameEmpty = errors.New("username is empty")

View File

@@ -90,7 +90,38 @@ func listenUDP(portMax, portMin int, network string, laddr *net.UDPAddr) (*net.U
return nil, ErrPort return nil, ErrPort
} }
func gatherCandidatesLocal(a *Agent, networkTypes []NetworkType) { // GatherCandidates initiates the trickle based gathering process.
func (a *Agent) GatherCandidates(urls []*URL, networkTypes []NetworkType) error {
return a.run(func(agent *Agent) {
if a.gatheringState == GatheringStateGathering {
a.log.Warnf("Attempting to gather candidates during gathering state\n")
return
}
go a.gatherCandidates(&AgentConfig{
Urls: urls,
NetworkTypes: networkTypes,
})
})
}
func (a *Agent) gatherCandidates(config *AgentConfig) {
a.gatheringState = GatheringStateGathering
a.gatherCandidatesLocal(config.NetworkTypes)
a.gatherCandidatesSrflx(config.Urls, config.NetworkTypes)
if err := a.run(func(agent *Agent) {
if a.onCandidateHdlr != nil {
go a.onCandidateHdlr(nil)
}
}); err != nil {
a.log.Warnf("Failed to run onCandidateHdlr task: %v\n", err)
return
}
a.gatheringState = GatheringStateComplete
}
func (a *Agent) gatherCandidatesLocal(networkTypes []NetworkType) {
localIPs := localInterfaces(networkTypes) localIPs := localInterfaces(networkTypes)
for _, ip := range localIPs { for _, ip := range localIPs {
for _, network := range supportedNetworks { for _, network := range supportedNetworks {
@@ -107,16 +138,30 @@ func gatherCandidatesLocal(a *Agent, networkTypes []NetworkType) {
continue continue
} }
set := a.localCandidates[c.NetworkType()] if err := a.run(func(agent *Agent) {
set = append(set, c) set := a.localCandidates[c.NetworkType()]
a.localCandidates[c.NetworkType()] = set set = append(set, c)
a.localCandidates[c.NetworkType()] = set
}); err != nil {
a.log.Warnf("Failed to append to localCandidates: %v\n", err)
continue
}
c.start(a, conn) c.start(a, conn)
if err := a.run(func(agent *Agent) {
if a.onCandidateHdlr != nil {
go a.onCandidateHdlr(c)
}
}); err != nil {
a.log.Warnf("Failed to run onCandidateHdlr task: %v\n", err)
continue
}
} }
} }
} }
func gatherCandidatesReflective(a *Agent, urls []*URL, networkTypes []NetworkType) { func (a *Agent) gatherCandidatesSrflx(urls []*URL, networkTypes []NetworkType) {
localIPs := localInterfaces(networkTypes) localIPs := localInterfaces(networkTypes)
for _, networkType := range networkTypes { for _, networkType := range networkTypes {
network := networkType.String() network := networkType.String()
@@ -153,12 +198,26 @@ func gatherCandidatesReflective(a *Agent, urls []*URL, networkTypes []NetworkTyp
continue continue
} }
set := a.localCandidates[c.NetworkType()] if err := a.run(func(agent *Agent) {
set = append(set, c) set := a.localCandidates[c.NetworkType()]
a.localCandidates[c.NetworkType()] = set set = append(set, c)
a.localCandidates[c.NetworkType()] = set
}); err != nil {
a.log.Warnf("Failed to append to localCandidates: %v\n", err)
continue
}
c.start(a, conn) c.start(a, conn)
if err := a.run(func(agent *Agent) {
if a.onCandidateHdlr != nil {
go a.onCandidateHdlr(c)
}
}); err != nil {
a.log.Warnf("Failed to run onCandidateHdlr task: %v\n", err)
continue
}
default: default:
a.log.Warnf("scheme %s is not implemented\n", url.Scheme) a.log.Warnf("scheme %s is not implemented\n", url.Scheme)
continue continue

View File

@@ -2,6 +2,7 @@ package ice
import ( import (
"context" "context"
"sync"
"testing" "testing"
"time" "time"
@@ -196,10 +197,16 @@ func pipe() (*Conn, *Conn) {
aNotifier, aConnected := onConnected() aNotifier, aConnected := onConnected()
bNotifier, bConnected := onConnected() bNotifier, bConnected := onConnected()
aAgent, err := NewAgent(&AgentConfig{ var wg sync.WaitGroup
wg.Add(2)
cfg := &AgentConfig{
Urls: urls, Urls: urls,
Trickle: true,
NetworkTypes: supportedNetworkTypes, NetworkTypes: supportedNetworkTypes,
}) }
aAgent, err := NewAgent(cfg)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -207,19 +214,41 @@ func pipe() (*Conn, *Conn) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = aAgent.OnCandidate(func(candidate Candidate) {
bAgent, err := NewAgent(&AgentConfig{ if candidate == nil {
Urls: urls, wg.Done()
NetworkTypes: supportedNetworkTypes, }
}) })
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = aAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes)
if err != nil {
panic(err)
}
bAgent, err := NewAgent(cfg)
if err != nil {
panic(err)
}
err = bAgent.OnConnectionStateChange(bNotifier) err = bAgent.OnConnectionStateChange(bNotifier)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = bAgent.OnCandidate(func(candidate Candidate) {
if candidate == nil {
wg.Done()
}
})
if err != nil {
panic(err)
}
err = bAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes)
if err != nil {
panic(err)
}
wg.Wait()
aConn, bConn := connect(aAgent, bAgent) aConn, bConn := connect(aAgent, bAgent)
// Ensure pair selected // Ensure pair selected
@@ -236,12 +265,18 @@ func pipeWithTimeout(iceTimeout time.Duration, iceKeepalive time.Duration) (*Con
aNotifier, aConnected := onConnected() aNotifier, aConnected := onConnected()
bNotifier, bConnected := onConnected() bNotifier, bConnected := onConnected()
aAgent, err := NewAgent(&AgentConfig{ var wg sync.WaitGroup
wg.Add(2)
cfg := &AgentConfig{
Urls: urls, Urls: urls,
Trickle: true,
ConnectionTimeout: &iceTimeout, ConnectionTimeout: &iceTimeout,
KeepaliveInterval: &iceKeepalive, KeepaliveInterval: &iceKeepalive,
NetworkTypes: supportedNetworkTypes, NetworkTypes: supportedNetworkTypes,
}) }
aAgent, err := NewAgent(cfg)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -249,21 +284,41 @@ func pipeWithTimeout(iceTimeout time.Duration, iceKeepalive time.Duration) (*Con
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = aAgent.OnCandidate(func(candidate Candidate) {
bAgent, err := NewAgent(&AgentConfig{ if candidate == nil {
Urls: urls, wg.Done()
ConnectionTimeout: &iceTimeout, }
KeepaliveInterval: &iceKeepalive,
NetworkTypes: supportedNetworkTypes,
}) })
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = aAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes)
if err != nil {
panic(err)
}
bAgent, err := NewAgent(cfg)
if err != nil {
panic(err)
}
err = bAgent.OnConnectionStateChange(bNotifier) err = bAgent.OnConnectionStateChange(bNotifier)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = bAgent.OnCandidate(func(candidate Candidate) {
if candidate == nil {
wg.Done()
}
})
if err != nil {
panic(err)
}
err = bAgent.GatherCandidates(cfg.Urls, cfg.NetworkTypes)
if err != nil {
panic(err)
}
wg.Wait()
aConn, bConn := connect(aAgent, bAgent) aConn, bConn := connect(aAgent, bAgent)
// Ensure pair selected // Ensure pair selected