Move pkg/ice to internal/ice

Avoid exposing any non-standard API.
This commit is contained in:
Michael MacDonald
2019-03-01 07:39:19 -05:00
parent a202ccbf2f
commit dc966e04a9
17 changed files with 2647 additions and 0 deletions

787
agent.go Normal file
View File

@@ -0,0 +1,787 @@
// Package ice implements the Interactive Connectivity Establishment (ICE)
// protocol defined in rfc5245.
package ice
import (
"fmt"
"math/rand"
"net"
"sort"
"sync"
"time"
"github.com/pions/stun"
"github.com/pions/webrtc/internal/util"
"github.com/pkg/errors"
)
const (
// taskLoopInterval is the interval at which the agent performs checks
taskLoopInterval = 2 * time.Second
// keepaliveInterval used to keep candidates alive
defaultKeepaliveInterval = 10 * time.Second
// defaultConnectionTimeout used to declare a connection dead
defaultConnectionTimeout = 30 * time.Second
)
// Agent represents the ICE agent
type Agent struct {
onConnectionStateChangeHdlr func(ConnectionState)
onSelectedCandidatePairChangeHdlr func(*Candidate, *Candidate)
// Used to block double Dial/Accept
opened bool
// State owned by the taskLoop
taskChan chan task
onConnected chan struct{}
onConnectedOnce sync.Once
connectivityTicker *time.Ticker
connectivityChan <-chan time.Time
tieBreaker uint64
connectionState ConnectionState
gatheringState GatheringState
haveStarted bool
isControlling bool
portmin uint16
portmax uint16
//How long should a pair stay quiet before we declare it dead?
//0 means never timeout
connectionTimeout time.Duration
//How often should we send keepalive packets?
//0 means never
keepaliveInterval time.Duration
localUfrag string
localPwd string
localCandidates map[NetworkType][]*Candidate
remoteUfrag string
remotePwd string
remoteCandidates map[NetworkType][]*Candidate
selectedPair *candidatePair
validPairs []*candidatePair
// Channel for reading
rcvCh chan *bufIn
// State for closing
done chan struct{}
err atomicError
}
type bufIn struct {
buf []byte
size chan int
}
func (a *Agent) ok() error {
select {
case <-a.done:
return a.getErr()
default:
}
return nil
}
func (a *Agent) getErr() error {
err := a.err.Load()
if err != nil {
return err
}
return ErrClosed
}
// AgentConfig collects the arguments to ice.Agent construction into
// a single structure, for future-proofness of the interface
type AgentConfig struct {
Urls []*URL
// PortMin and PortMax are optional. Leave them 0 for the default UDP port allocation strategy.
PortMin uint16
PortMax uint16
// ConnectionTimeout defaults to 30 seconds when this property is nil.
// If the duration is 0, we will never timeout this connection.
ConnectionTimeout *time.Duration
// KeepaliveInterval determines how often should we send ICE
// keepalives (should be less then connectiontimeout above)
// when this is nil, it defaults to 10 seconds.
// A keepalive interval of 0 means we never send keepalive packets
KeepaliveInterval *time.Duration
}
// NewAgent creates a new Agent
func NewAgent(config *AgentConfig) (*Agent, error) {
if config.PortMax < config.PortMin {
return nil, ErrPort
}
a := &Agent{
tieBreaker: rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
gatheringState: GatheringStateComplete, // TODO trickle-ice
connectionState: ConnectionStateNew,
localCandidates: make(map[NetworkType][]*Candidate),
remoteCandidates: make(map[NetworkType][]*Candidate),
localUfrag: util.RandSeq(16),
localPwd: util.RandSeq(32),
taskChan: make(chan task),
onConnected: make(chan struct{}),
rcvCh: make(chan *bufIn),
done: make(chan struct{}),
portmin: config.PortMin,
portmax: config.PortMax,
}
// connectionTimeout used to declare a connection dead
if config.ConnectionTimeout == nil {
a.connectionTimeout = defaultConnectionTimeout
} else {
a.connectionTimeout = *config.ConnectionTimeout
}
if config.KeepaliveInterval == nil {
a.keepaliveInterval = defaultKeepaliveInterval
} else {
a.keepaliveInterval = *config.KeepaliveInterval
}
// Initialize local candidates
a.gatherCandidatesLocal()
a.gatherCandidatesReflective(config.Urls)
go a.taskLoop()
return a, nil
}
// OnConnectionStateChange sets a handler that is fired when the connection state changes
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
return a.run(func(agent *Agent) {
agent.onConnectionStateChangeHdlr = f
})
}
// OnSelectedCandidatePairChange sets a handler that is fired when the final candidate
// pair is selected
func (a *Agent) OnSelectedCandidatePairChange(f func(*Candidate, *Candidate)) error {
return a.run(func(agent *Agent) {
agent.onSelectedCandidatePairChangeHdlr = f
})
}
func (a *Agent) onSelectedCandidatePairChange(p *candidatePair) {
if p != nil {
if a.onSelectedCandidatePairChangeHdlr != nil {
a.onSelectedCandidatePairChangeHdlr(p.local, p.remote)
}
}
}
func (a *Agent) listenUDP(network string, laddr *net.UDPAddr) (*net.UDPConn, error) {
if (laddr.Port != 0) || ((a.portmin == 0) && (a.portmax == 0)) {
return net.ListenUDP(network, laddr)
}
var i, j int
i = int(a.portmin)
if i == 0 {
i = 1
}
j = int(a.portmax)
if j == 0 {
j = 0xFFFF
}
for i <= j {
c, e := net.ListenUDP(network, &net.UDPAddr{IP: laddr.IP, Port: i})
if e == nil {
return c, e
}
i++
}
return nil, ErrPort
}
func (a *Agent) gatherCandidatesLocal() {
localIPs := localInterfaces()
for _, ip := range localIPs {
for _, network := range supportedNetworks {
conn, err := a.listenUDP(network, &net.UDPAddr{IP: ip, Port: 0})
if err != nil {
iceLog.Warnf("could not listen %s %s\n", network, ip)
continue
}
port := conn.LocalAddr().(*net.UDPAddr).Port
c, err := NewCandidateHost(network, ip, port, ComponentRTP)
if err != nil {
iceLog.Warnf("Failed to create host candidate: %s %s %d: %v\n", network, ip, port, err)
continue
}
networkType := c.NetworkType
set := a.localCandidates[networkType]
set = append(set, c)
a.localCandidates[networkType] = set
c.start(a, conn)
}
}
}
func (a *Agent) gatherCandidatesReflective(urls []*URL) {
for _, networkType := range supportedNetworkTypes {
network := networkType.String()
for _, url := range urls {
switch url.Scheme {
case SchemeTypeSTUN:
laddr, xoraddr, err := allocateUDP(network, url)
if err != nil {
iceLog.Warnf("could not allocate %s %s: %v\n", network, url, err)
continue
}
conn, err := net.ListenUDP(network, laddr)
if err != nil {
iceLog.Warnf("could not listen %s %s: %v\n", network, laddr, err)
}
ip := xoraddr.IP
port := xoraddr.Port
relIP := laddr.IP.String()
relPort := laddr.Port
c, err := NewCandidateServerReflexive(network, ip, port, ComponentRTP, relIP, relPort)
if err != nil {
iceLog.Warnf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err)
continue
}
networkType := c.NetworkType
set := a.localCandidates[networkType]
set = append(set, c)
a.localCandidates[networkType] = set
c.start(a, conn)
default:
iceLog.Warnf("scheme %s is not implemented\n", url.Scheme)
continue
}
}
}
}
func allocateUDP(network string, url *URL) (*net.UDPAddr, *stun.XorAddress, error) {
// TODO Do we want the timeout to be configurable?
client, err := stun.NewClient(network, fmt.Sprintf("%s:%d", url.Host, url.Port), time.Second*5)
if err != nil {
return nil, nil, errors.Wrapf(err, "Failed to create STUN client")
}
localAddr, ok := client.LocalAddr().(*net.UDPAddr)
if !ok {
return nil, nil, errors.Errorf("Failed to cast STUN client to UDPAddr")
}
resp, err := client.Request()
if err != nil {
return nil, nil, errors.Wrapf(err, "Failed to make STUN request")
}
if err = client.Close(); err != nil {
return nil, nil, errors.Wrapf(err, "Failed to close STUN client")
}
attr, ok := resp.GetOneAttribute(stun.AttrXORMappedAddress)
if !ok {
return nil, nil, errors.Errorf("Got respond from STUN server that did not contain XORAddress")
}
var addr stun.XorAddress
if err = addr.Unpack(resp, attr); err != nil {
return nil, nil, errors.Wrapf(err, "Failed to unpack STUN XorAddress response")
}
return localAddr, &addr, nil
}
func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
switch {
case a.haveStarted:
return errors.Errorf("Attempted to start agent twice")
case remoteUfrag == "":
return errors.Errorf("remoteUfrag is empty")
case remotePwd == "":
return errors.Errorf("remotePwd is empty")
}
iceLog.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
return a.run(func(agent *Agent) {
agent.isControlling = isControlling
agent.remoteUfrag = remoteUfrag
agent.remotePwd = remotePwd
// TODO this should be dynamic, and grow when the connection is stable
t := time.NewTicker(taskLoopInterval)
agent.connectivityTicker = t
agent.connectivityChan = t.C
agent.updateConnectionState(ConnectionStateChecking)
})
}
func (a *Agent) pingCandidate(local, remote *Candidate) {
var msg *stun.Message
var err error
// The controlling agent MUST include the USE-CANDIDATE attribute in
// order to nominate a candidate pair (Section 8.1.1). The controlled
// agent MUST NOT include the USE-CANDIDATE attribute in a Binding
// request.
if a.isControlling {
msg, err = stun.Build(stun.ClassRequest, stun.MethodBinding, stun.GenerateTransactionID(),
&stun.Username{Username: a.remoteUfrag + ":" + a.localUfrag},
&stun.UseCandidate{},
&stun.IceControlling{TieBreaker: a.tieBreaker},
&stun.Priority{Priority: uint32(local.Priority())},
&stun.MessageIntegrity{
Key: []byte(a.remotePwd),
},
&stun.Fingerprint{},
)
} else {
msg, err = stun.Build(stun.ClassRequest, stun.MethodBinding, stun.GenerateTransactionID(),
&stun.Username{Username: a.remoteUfrag + ":" + a.localUfrag},
&stun.IceControlled{TieBreaker: a.tieBreaker},
&stun.Priority{Priority: uint32(local.Priority())},
&stun.MessageIntegrity{
Key: []byte(a.remotePwd),
},
&stun.Fingerprint{},
)
}
if err != nil {
iceLog.Debug(err.Error())
return
}
iceLog.Tracef("ping STUN from %s to %s\n", local.String(), remote.String())
a.sendSTUN(msg, local, remote)
}
func (a *Agent) updateConnectionState(newState ConnectionState) {
if a.connectionState != newState {
iceLog.Infof("Setting new connection state: %s", newState)
a.connectionState = newState
hdlr := a.onConnectionStateChangeHdlr
if hdlr != nil {
// Call handler async since we may be holding the agent lock
// and the handler may also require it
go hdlr(newState)
}
}
}
type candidatePairs []*candidatePair
func (cp candidatePairs) Len() int { return len(cp) }
func (cp candidatePairs) Swap(i, j int) { cp[i], cp[j] = cp[j], cp[i] }
type byPairPriority struct{ candidatePairs }
// NB: Reverse sort so our candidates start at highest priority
func (bp byPairPriority) Less(i, j int) bool {
return bp.candidatePairs[i].Priority() > bp.candidatePairs[j].Priority()
}
func (a *Agent) setValidPair(local, remote *Candidate, selected, controlling bool) {
// TODO: avoid duplicates
p := newCandidatePair(local, remote, controlling)
iceLog.Tracef("Found valid candidate pair: %s (selected? %t)", p, selected)
if selected {
// Notify when the selected pair changes
if !a.selectedPair.Equal(p) {
a.onSelectedCandidatePairChange(p)
}
a.selectedPair = p
a.validPairs = nil
// TODO: only set state to connected on selecting final pair?
a.updateConnectionState(ConnectionStateConnected)
} else {
// keep track of pairs with succesfull bindings since any of them
// can be used for communication until the final pair is selected:
// https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-20#section-12
a.validPairs = append(a.validPairs, p)
// Sort the candidate pairs by priority of the remotes
sort.Sort(byPairPriority{a.validPairs})
}
// Signal connected
a.onConnectedOnce.Do(func() { close(a.onConnected) })
}
// A task is a
type task func(*Agent)
func (a *Agent) run(t task) error {
err := a.ok()
if err != nil {
return err
}
select {
case <-a.done:
return a.getErr()
case a.taskChan <- t:
}
return nil
}
func (a *Agent) taskLoop() {
for {
select {
case <-a.connectivityChan:
if a.validateSelectedPair() {
iceLog.Trace("checking keepalive")
a.checkKeepalive()
} else {
iceLog.Trace("pinging all candidates")
a.pingAllCandidates()
}
case t := <-a.taskChan:
// Run the task
t(a)
case <-a.done:
return
}
}
}
// validateSelectedPair checks if the selected pair is (still) valid
// Note: the caller should hold the agent lock.
func (a *Agent) validateSelectedPair() bool {
if a.selectedPair == nil {
// Not valid since not selected
return false
}
if (a.connectionTimeout != 0) &&
(time.Since(a.selectedPair.remote.LastReceived()) > a.connectionTimeout) {
a.selectedPair = nil
a.updateConnectionState(ConnectionStateDisconnected)
return false
}
return true
}
// checkKeepalive sends STUN Binding Indications to the selected pair
// if no packet has been sent on that pair in the last keepaliveInterval
// Note: the caller should hold the agent lock.
func (a *Agent) checkKeepalive() {
if a.selectedPair == nil {
return
}
if (a.keepaliveInterval != 0) &&
(time.Since(a.selectedPair.local.LastSent()) > a.keepaliveInterval) {
a.keepaliveCandidate(a.selectedPair.local, a.selectedPair.remote)
}
}
// pingAllCandidates sends STUN Binding Requests to all candidates
// Note: the caller should hold the agent lock.
func (a *Agent) pingAllCandidates() {
for networkType, localCandidates := range a.localCandidates {
if remoteCandidates, ok := a.remoteCandidates[networkType]; ok {
for _, localCandidate := range localCandidates {
for _, remoteCandidate := range remoteCandidates {
a.pingCandidate(localCandidate, remoteCandidate)
}
}
}
}
}
// AddRemoteCandidate adds a new remote candidate
func (a *Agent) AddRemoteCandidate(c *Candidate) error {
return a.run(func(agent *Agent) {
agent.addRemoteCandidate(c)
})
}
// addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
func (a *Agent) addRemoteCandidate(c *Candidate) {
networkType := c.NetworkType
set := a.remoteCandidates[networkType]
for _, candidate := range set {
if candidate.Equal(c) {
return
}
}
set = append(set, c)
a.remoteCandidates[networkType] = set
}
// GetLocalCandidates returns the local candidates
func (a *Agent) GetLocalCandidates() ([]*Candidate, error) {
res := make(chan []*Candidate)
err := a.run(func(agent *Agent) {
var candidates []*Candidate
for _, set := range agent.localCandidates {
candidates = append(candidates, set...)
}
res <- candidates
})
if err != nil {
return nil, err
}
return <-res, nil
}
// GetLocalUserCredentials returns the local user credentials
func (a *Agent) GetLocalUserCredentials() (frag string, pwd string) {
return a.localUfrag, a.localPwd
}
// Close cleans up the Agent
func (a *Agent) Close() error {
done := make(chan struct{})
err := a.run(func(agent *Agent) {
defer func() {
close(done)
}()
agent.err.Store(ErrClosed)
close(agent.done)
// Cleanup all candidates
for net, cs := range agent.localCandidates {
for _, c := range cs {
err := c.close()
if err != nil {
iceLog.Warnf("Failed to close candidate %s: %v", c, err)
}
}
delete(agent.localCandidates, net)
}
for net, cs := range agent.remoteCandidates {
for _, c := range cs {
err := c.close()
if err != nil {
iceLog.Warnf("Failed to close candidate %s: %v", c, err)
}
}
delete(agent.remoteCandidates, net)
}
})
if err != nil {
return err
}
<-done
return nil
}
func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) *Candidate {
var ip net.IP
var port int
switch a := addr.(type) {
case *net.UDPAddr:
ip = a.IP
port = a.Port
case *net.TCPAddr:
ip = a.IP
port = a.Port
default:
iceLog.Warnf("unsupported address type %T", a)
return nil
}
set := a.remoteCandidates[networkType]
for _, c := range set {
base := c
if base.IP.Equal(ip) &&
base.Port == port {
return c
}
}
return nil
}
func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote *Candidate) {
base := remote
if out, err := stun.Build(stun.ClassSuccessResponse, stun.MethodBinding, m.TransactionID,
&stun.XorMappedAddress{
XorAddress: stun.XorAddress{
IP: base.IP,
Port: base.Port,
},
},
&stun.MessageIntegrity{
Key: []byte(a.localPwd),
},
&stun.Fingerprint{},
); err != nil {
iceLog.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
} else {
a.sendSTUN(out, local, remote)
}
}
func (a *Agent) handleInboundControlled(m *stun.Message, localCandidate, remoteCandidate *Candidate) {
if _, isControlled := m.GetOneAttribute(stun.AttrIceControlled); isControlled && !a.isControlling {
iceLog.Debug("inbound isControlled && a.isControlling == false")
return
}
successResponse := m.Method == stun.MethodBinding && m.Class == stun.ClassSuccessResponse
_, usepair := m.GetOneAttribute(stun.AttrUseCandidate)
iceLog.Tracef("got controlled message (success? %t, usepair? %t)", successResponse, usepair)
// Remember the working pair and select it when marked with usepair
a.setValidPair(localCandidate, remoteCandidate, usepair, false)
if !successResponse {
// Send success response
a.sendBindingSuccess(m, localCandidate, remoteCandidate)
}
}
func (a *Agent) handleInboundControlling(m *stun.Message, localCandidate, remoteCandidate *Candidate) {
if _, isControlling := m.GetOneAttribute(stun.AttrIceControlling); isControlling && a.isControlling {
iceLog.Debug("inbound isControlling && a.isControlling == true")
return
} else if _, useCandidate := m.GetOneAttribute(stun.AttrUseCandidate); useCandidate && a.isControlling {
iceLog.Debug("useCandidate && a.isControlling == true")
return
}
iceLog.Tracef("got controlling message: %#v", m)
successResponse := m.Method == stun.MethodBinding && m.Class == stun.ClassSuccessResponse
// Remember the working pair and select it when receiving a success response
a.setValidPair(localCandidate, remoteCandidate, successResponse, true)
if !successResponse {
// Send success response
a.sendBindingSuccess(m, localCandidate, remoteCandidate)
// We received a ping from the controlled agent. We know the pair works so now we ping with use-candidate set:
a.pingCandidate(localCandidate, remoteCandidate)
}
}
// handleNewPeerReflexiveCandidate adds an unseen remote transport address
// to the remote candidate list as a peer-reflexive candidate.
func (a *Agent) handleNewPeerReflexiveCandidate(local *Candidate, remote net.Addr) error {
var ip net.IP
var port int
switch addr := remote.(type) {
case *net.UDPAddr:
ip = addr.IP
port = addr.Port
case *net.TCPAddr:
ip = addr.IP
port = addr.Port
default:
return errors.Errorf("unsupported address type %T", addr)
}
pflxCandidate, err := NewCandidatePeerReflexive(
local.NetworkType.String(), // assume, same as that of local
ip,
port,
local.Component,
"", // unknown at this moment. TODO: need a review
0, // unknown at this moment. TODO: need a review
)
if err != nil {
return errors.Wrapf(err, "failed to create peer-reflexive candidate: %v", remote)
}
// Add pflxCandidate to the remote candidate list
a.addRemoteCandidate(pflxCandidate)
return nil
}
// handleInbound processes STUN traffic from a remote candidate
func (a *Agent) handleInbound(m *stun.Message, local *Candidate, remote net.Addr) {
iceLog.Tracef("inbound STUN from %s to %s", remote.String(), local.String())
remoteCandidate := a.findRemoteCandidate(local.NetworkType, remote)
if remoteCandidate == nil {
iceLog.Debugf("detected a new peer-reflexive candiate: %s ", remote)
err := a.handleNewPeerReflexiveCandidate(local, remote)
if err != nil {
// Log warning, then move on..
iceLog.Warn(err.Error())
}
return
}
remoteCandidate.seen(false)
if m.Class == stun.ClassIndication {
return
}
if a.isControlling {
a.handleInboundControlling(m, local, remoteCandidate)
} else {
a.handleInboundControlled(m, local, remoteCandidate)
}
}
// noSTUNSeen processes non STUN traffic from a remote candidate
func (a *Agent) noSTUNSeen(local *Candidate, remote net.Addr) {
remoteCandidate := a.findRemoteCandidate(local.NetworkType, remote)
if remoteCandidate != nil {
remoteCandidate.seen(false)
}
}
func (a *Agent) getBestPair() (*candidatePair, error) {
res := make(chan *candidatePair)
err := a.run(func(agent *Agent) {
if agent.selectedPair != nil {
res <- agent.selectedPair
return
}
for _, p := range agent.validPairs {
res <- p
return
}
res <- nil
})
if err != nil {
return nil, err
}
out := <-res
if out == nil {
return nil, errors.New("No Valid Candidate Pairs Available")
}
return out, nil
}

314
agent_test.go Normal file
View File

@@ -0,0 +1,314 @@
package ice
import (
"net"
"testing"
"time"
"github.com/pions/transport/test"
)
func TestPairSearch(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()
var config AgentConfig
a, err := NewAgent(&config)
if err != nil {
t.Fatalf("Error constructing ice.Agent")
}
if len(a.validPairs) != 0 {
t.Fatalf("TestPairSearch is only a valid test if a.validPairs is empty on construction")
}
cp, err := a.getBestPair()
if cp != nil {
t.Fatalf("No Candidate pairs should exist")
}
if err == nil {
t.Fatalf("An error should have been reported (with no available candidate pairs)")
}
err = a.Close()
if err != nil {
t.Fatalf("Close agent emits error %v", err)
}
}
func TestPairPriority(t *testing.T) {
// avoid deadlocks?
defer test.TimeOut(1 * time.Second).Stop()
a, err := NewAgent(&AgentConfig{})
if err != nil {
t.Fatalf("Failed to create agent: %s", err)
}
hostLocal, err := NewCandidateHost(
"udp",
net.ParseIP("192.168.1.1"), 19216,
1,
)
if err != nil {
t.Fatalf("Failed to construct local host candidate: %s", err)
}
relayRemote, err := NewCandidateRelay(
"udp",
net.ParseIP("1.2.3.4"), 12340,
1,
"4.3.2.1", 43210,
)
if err != nil {
t.Fatalf("Failed to construct remote relay candidate: %s", err)
}
srflxRemote, err := NewCandidateServerReflexive(
"udp",
net.ParseIP("10.10.10.2"), 19218,
1,
"4.3.2.1", 43212,
)
if err != nil {
t.Fatalf("Failed to construct remote srflx candidate: %s", err)
}
prflxRemote, err := NewCandidatePeerReflexive(
"udp",
net.ParseIP("10.10.10.2"), 19217,
1,
"4.3.2.1", 43211,
)
if err != nil {
t.Fatalf("Failed to construct remote prflx candidate: %s", err)
}
hostRemote, err := NewCandidateHost(
"udp",
net.ParseIP("1.2.3.5"), 12350,
1,
)
if err != nil {
t.Fatalf("Failed to construct remote host candidate: %s", err)
}
for _, remote := range []*Candidate{relayRemote, srflxRemote, prflxRemote, hostRemote} {
a.setValidPair(hostLocal, remote, false, false)
bestPair, err := a.getBestPair()
if err != nil {
t.Fatalf("Failed to get best candidate pair: %s", err)
}
if bestPair.String() != (&candidatePair{remote: remote, local: hostLocal}).String() {
t.Fatalf("Unexpected bestPair %s (expected remote: %s)", bestPair, remote)
}
}
if err := a.Close(); err != nil {
t.Fatalf("Error on agent.Close(): %s", err)
}
}
func TestOnSelectedCandidatePairChange(t *testing.T) {
// avoid deadlocks?
defer test.TimeOut(1 * time.Second).Stop()
a, err := NewAgent(&AgentConfig{})
if err != nil {
t.Fatalf("Failed to create agent: %s", err)
}
callbackCalled := make(chan struct{}, 1)
if err = a.OnSelectedCandidatePairChange(func(local, remote *Candidate) {
close(callbackCalled)
}); err != nil {
t.Fatalf("Failed to set agent OnCandidatePairChange callback: %s", err)
}
hostLocal, err := NewCandidateHost(
"udp",
net.ParseIP("192.168.1.1"), 19216,
1,
)
if err != nil {
t.Fatalf("Failed to construct local host candidate: %s", err)
}
relayRemote, err := NewCandidateRelay(
"udp",
net.ParseIP("1.2.3.4"), 12340,
1,
"4.3.2.1", 43210,
)
if err != nil {
t.Fatalf("Failed to construct remote relay candidate: %s", err)
}
// select the pair
if err = a.run(func(agent *Agent) {
agent.setValidPair(hostLocal, relayRemote, true, false)
}); err != nil {
t.Fatalf("Failed to setValidPair(): %s", err)
}
// ensure that the callback fired on setting the pair
<-callbackCalled
// set the same pair; this should not invoke the callback
// if the callback is invoked now it will panic due
// to second close of the channel
if err = a.run(func(agent *Agent) {
agent.setValidPair(hostLocal, relayRemote, true, false)
}); err != nil {
t.Fatalf("Failed to setValidPair(): %s", err)
}
if err := a.Close(); err != nil {
t.Fatalf("Error on agent.Close(): %s", err)
}
}
type BadAddr struct{}
func (ba *BadAddr) Network() string {
return "xxx"
}
func (ba *BadAddr) String() string {
return "yyy"
}
func TestHandlePeerReflexive(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 2)
defer lim.Stop()
t.Run("UDP pflx candidate from handleInboud()", func(t *testing.T) {
var config AgentConfig
a, err := NewAgent(&config)
if err != nil {
t.Fatalf("Error constructing ice.Agent")
}
ip := net.ParseIP("192.168.0.2")
local, err := NewCandidateHost("udp", ip, 777, 1)
if err != nil {
t.Fatalf("failed to create a new candidate: %v", err)
}
remote := &net.UDPAddr{IP: net.ParseIP("172.17.0.3"), Port: 999}
a.handleInbound(nil, local, remote)
// length of remote candidate list must be one now
if len(a.remoteCandidates) != 1 {
t.Fatal("failed to add a network type to the remote candidate list")
}
// length of remote candidate list for a network type must be 1
set := a.remoteCandidates[local.NetworkType]
if len(set) != 1 {
t.Fatal("failed to add prflx candidate to remote candidate list")
}
c := set[0]
if c.Type != CandidateTypePeerReflexive {
t.Fatal("candidate type must be prflx")
}
if !c.IP.Equal(net.ParseIP("172.17.0.3")) {
t.Fatal("IP address mismatch")
}
if c.Port != 999 {
t.Fatal("Port number mismatch")
}
err = a.Close()
if err != nil {
t.Fatalf("Close agent emits error %v", err)
}
})
t.Run("Bad network type with handleInbound()", func(t *testing.T) {
var config AgentConfig
a, err := NewAgent(&config)
if err != nil {
t.Fatal("Error constructing ice.Agent")
}
ip := net.ParseIP("192.168.0.2")
local, err := NewCandidateHost("tcp", ip, 777, 1)
if err != nil {
t.Fatalf("failed to create a new candidate: %v", err)
}
remote := &BadAddr{}
a.handleInbound(nil, local, remote)
if len(a.remoteCandidates) != 0 {
t.Fatal("bad address should not be added to the remote candidate list")
}
err = a.Close()
if err != nil {
t.Fatalf("Close agent emits error %v", err)
}
})
t.Run("TCP prflx with handleNewPeerReflexiveCandidate()", func(t *testing.T) {
var config AgentConfig
a, err := NewAgent(&config)
if err != nil {
t.Fatal("Error constructing ice.Agent")
}
ip := net.ParseIP("192.168.0.2")
local, err := NewCandidateHost("tcp", ip, 777, 1)
if err != nil {
t.Fatalf("failed to create a new candidate: %v", err)
}
remote := &net.TCPAddr{IP: net.ParseIP("172.17.0.3"), Port: 999}
err = a.handleNewPeerReflexiveCandidate(local, remote)
if err != nil {
t.Fatalf("handleNewPeerReflexiveCandidate() should not fail: %v", err)
}
// length of remote candidate list must be one now
if len(a.remoteCandidates) != 1 {
t.Fatal("failed to add a network type to the remote candidate list")
}
// length of remote candidate list for a network type must be 1
set := a.remoteCandidates[local.NetworkType]
if len(set) != 1 {
t.Fatal("failed to add prflx candidate to remote candidate list")
}
c := set[0]
if c.Type != CandidateTypePeerReflexive {
t.Fatal("candidate type must be prflx")
}
if !c.IP.Equal(net.ParseIP("172.17.0.3")) {
t.Fatal("IP address mismatch")
}
if c.Port != 999 {
t.Fatal("Port number mismatch")
}
err = a.Close()
if err != nil {
t.Fatalf("Close agent emits error %v", err)
}
})
}

272
candidate.go Normal file
View File

@@ -0,0 +1,272 @@
package ice
import (
"fmt"
"net"
"sync"
"time"
"github.com/pions/stun"
)
const (
receiveMTU = 8192
defaultLocalPreference = 65535
// ComponentRTP indicates that the candidate is used for RTP
ComponentRTP uint16 = 1
// ComponentRTCP indicates that the candidate is used for RTCP
ComponentRTCP
)
// Candidate represents an ICE candidate
type Candidate struct {
NetworkType
Type CandidateType
LocalPreference uint16
Component uint16
IP net.IP
Port int
RelatedAddress *CandidateRelatedAddress
lock sync.RWMutex
lastSent time.Time
lastReceived time.Time
agent *Agent
conn net.PacketConn
closeCh chan struct{}
closedCh chan struct{}
}
// NewCandidateHost creates a new host candidate
func NewCandidateHost(network string, ip net.IP, port int, component uint16) (*Candidate, error) {
networkType, err := determineNetworkType(network, ip)
if err != nil {
return nil, err
}
return &Candidate{
Type: CandidateTypeHost,
NetworkType: networkType,
IP: ip,
Port: port,
LocalPreference: defaultLocalPreference,
Component: component,
}, nil
}
// NewCandidateServerReflexive creates a new server reflective candidate
func NewCandidateServerReflexive(network string, ip net.IP, port int, component uint16, relAddr string, relPort int) (*Candidate, error) {
networkType, err := determineNetworkType(network, ip)
if err != nil {
return nil, err
}
return &Candidate{
Type: CandidateTypeServerReflexive,
NetworkType: networkType,
IP: ip,
Port: port,
LocalPreference: defaultLocalPreference,
Component: component,
RelatedAddress: &CandidateRelatedAddress{
Address: relAddr,
Port: relPort,
},
}, nil
}
// NewCandidatePeerReflexive creates a new peer reflective candidate
func NewCandidatePeerReflexive(network string, ip net.IP, port int, component uint16, relAddr string, relPort int) (*Candidate, error) {
networkType, err := determineNetworkType(network, ip)
if err != nil {
return nil, err
}
return &Candidate{
Type: CandidateTypePeerReflexive,
NetworkType: networkType,
IP: ip,
Port: port,
LocalPreference: defaultLocalPreference,
Component: component,
RelatedAddress: &CandidateRelatedAddress{
Address: relAddr,
Port: relPort,
},
}, nil
}
// NewCandidateRelay creates a new relay candidate
func NewCandidateRelay(network string, ip net.IP, port int, component uint16, relAddr string, relPort int) (*Candidate, error) {
networkType, err := determineNetworkType(network, ip)
if err != nil {
return nil, err
}
return &Candidate{
Type: CandidateTypeRelay,
NetworkType: networkType,
IP: ip,
Port: port,
LocalPreference: defaultLocalPreference,
Component: component,
RelatedAddress: &CandidateRelatedAddress{
Address: relAddr,
Port: relPort,
},
}, nil
}
// start runs the candidate using the provided connection
func (c *Candidate) start(a *Agent, conn net.PacketConn) {
c.agent = a
c.conn = conn
c.closeCh = make(chan struct{})
c.closedCh = make(chan struct{})
go c.recvLoop()
}
func (c *Candidate) recvLoop() {
defer func() {
close(c.closedCh)
}()
buffer := make([]byte, receiveMTU)
for {
n, srcAddr, err := c.conn.ReadFrom(buffer)
if err != nil {
return
}
if stun.IsSTUN(buffer[:n]) {
m, err := stun.NewMessage(buffer[:n])
if err != nil {
iceLog.Warnf("Failed to handle decode ICE from %s to %s: %v", c.addr(), srcAddr, err)
continue
}
err = c.agent.run(func(agent *Agent) {
agent.handleInbound(m, c, srcAddr)
})
if err != nil {
iceLog.Warnf("Failed to handle message: %v", err)
}
continue
} else {
err := c.agent.run(func(agent *Agent) {
agent.noSTUNSeen(c, srcAddr)
})
if err != nil {
iceLog.Warnf("Failed to handle message: %v", err)
}
}
select {
case bufin := <-c.agent.rcvCh:
copy(bufin.buf, buffer[:n]) // TODO: avoid copy in common case?
bufin.size <- n
case <-c.closeCh:
return
}
}
}
// close stops the recvLoop
func (c *Candidate) close() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.conn != nil {
// Unblock recvLoop
close(c.closeCh)
// Close the conn
err := c.conn.Close()
if err != nil {
return err
}
// Wait until the recvLoop is closed
<-c.closedCh
}
return nil
}
func (c *Candidate) writeTo(raw []byte, dst *Candidate) (int, error) {
n, err := c.conn.WriteTo(raw, dst.addr())
if err != nil {
return n, fmt.Errorf("failed to send packet: %v", err)
}
c.seen(true)
return n, nil
}
// Priority computes the priority for this ICE Candidate
func (c *Candidate) Priority() uint16 {
// The local preference MUST be an integer from 0 (lowest preference) to
// 65535 (highest preference) inclusive. When there is only a single IP
// address, this value SHOULD be set to 65535. If there are multiple
// candidates for a particular component for a particular data stream
// that have the same type, the local preference MUST be unique for each
// one.
return (2^24)*c.Type.Preference() +
(2^8)*c.LocalPreference +
(2^0)*(256-c.Component)
}
// Equal is used to compare two CandidateBases
func (c *Candidate) Equal(other *Candidate) bool {
return c.NetworkType == other.NetworkType &&
c.Type == other.Type &&
c.IP.Equal(other.IP) &&
c.Port == other.Port &&
c.RelatedAddress.Equal(other.RelatedAddress)
}
// String makes the CandidateHost printable
func (c *Candidate) String() string {
return fmt.Sprintf("%s %s:%d%s", c.Type, c.IP, c.Port, c.RelatedAddress)
}
// LastReceived returns a time.Time indicating the last time
// this candidate was received
func (c *Candidate) LastReceived() time.Time {
c.lock.RLock()
defer c.lock.RUnlock()
return c.lastReceived
}
func (c *Candidate) setLastReceived(t time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
c.lastReceived = t
}
// LastSent returns a time.Time indicating the last time
// this candidate was sent
func (c *Candidate) LastSent() time.Time {
c.lock.RLock()
defer c.lock.RUnlock()
return c.lastSent
}
func (c *Candidate) setLastSent(t time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
c.lastSent = t
}
func (c *Candidate) seen(outbound bool) {
if outbound {
c.setLastSent(time.Now())
} else {
c.setLastReceived(time.Now())
}
}
func (c *Candidate) addr() net.Addr {
return &net.UDPAddr{
IP: c.IP,
Port: c.Port,
}
}

106
candidatepair.go Normal file
View File

@@ -0,0 +1,106 @@
package ice
import (
"fmt"
"github.com/pions/stun"
)
func newCandidatePair(local, remote *Candidate, controlling bool) *candidatePair {
return &candidatePair{
iceRoleControlling: controlling,
remote: remote,
local: local,
}
}
// candidatePair represents a combination of a local and remote candidate
type candidatePair struct {
iceRoleControlling bool
remote *Candidate
local *Candidate
}
func (p *candidatePair) String() string {
return fmt.Sprintf("prio %d (local, prio %d) %s <-> %s (remote, prio %d)",
p.Priority(), p.local.Priority(), p.local, p.remote, p.remote.Priority())
}
func (p *candidatePair) Equal(other *candidatePair) bool {
if p == nil && other == nil {
return true
}
if p == nil || other == nil {
return false
}
return p.local.Equal(other.local) && p.remote.Equal(other.remote)
}
// RFC 5245 - 5.7.2. Computing Pair Priority and Ordering Pairs
// Let G be the priority for the candidate provided by the controlling
// agent. Let D be the priority for the candidate provided by the
// controlled agent.
// pair priority = 2^32*MIN(G,D) + 2*MAX(G,D) + (G>D?1:0)
func (p *candidatePair) Priority() uint32 {
var g uint32
var d uint32
if p.iceRoleControlling {
g = uint32(p.local.Priority())
d = uint32(p.remote.Priority())
} else {
g = uint32(p.remote.Priority())
d = uint32(p.local.Priority())
}
// Just implement these here rather
// than fooling around with the math package
min := func(x, y uint32) uint32 {
if x < y {
return x
}
return y
}
max := func(x, y uint32) uint32 {
if x > y {
return x
}
return y
}
cmp := func(x, y uint32) uint32 {
if x > y {
return 1
}
return 0
}
return (2^32)*min(g, d) + 2*max(g, d) + cmp(g, d)
}
func (p *candidatePair) Write(b []byte) (int, error) {
return p.local.writeTo(b, p.remote)
}
// keepaliveCandidate sends a STUN Binding Indication to the remote candidate
func (a *Agent) keepaliveCandidate(local, remote *Candidate) {
msg, err := stun.Build(stun.ClassIndication, stun.MethodBinding, stun.GenerateTransactionID(),
&stun.Username{Username: a.remoteUfrag + ":" + a.localUfrag},
&stun.MessageIntegrity{
Key: []byte(a.remotePwd),
},
&stun.Fingerprint{},
)
if err != nil {
iceLog.Warn(err.Error())
return
}
a.sendSTUN(msg, local, remote)
}
func (a *Agent) sendSTUN(msg *stun.Message, local, remote *Candidate) {
_, err := local.writeTo(msg.Pack(), remote)
if err != nil {
iceLog.Tracef("failed to send STUN message: %s", err)
}
}

View File

@@ -0,0 +1,30 @@
package ice
import "fmt"
// CandidateRelatedAddress convey transport addresses related to the
// candidate, useful for diagnostics and other purposes.
type CandidateRelatedAddress struct {
Address string
Port int
}
// String makes CandidateRelatedAddress printable
func (c *CandidateRelatedAddress) String() string {
if c == nil {
return ""
}
return fmt.Sprintf(" related %s:%d", c.Address, c.Port)
}
// Equal allows comparing two CandidateRelatedAddresses.
// The CandidateRelatedAddress are allowed to be nil.
func (c *CandidateRelatedAddress) Equal(other *CandidateRelatedAddress) bool {
if c == nil && other == nil {
return true
}
return c != nil && other != nil &&
c.Address == other.Address &&
c.Port == other.Port
}

45
candidatetype.go Normal file
View File

@@ -0,0 +1,45 @@
package ice
// CandidateType represents the type of candidate
type CandidateType byte
// CandidateType enum
const (
CandidateTypeHost CandidateType = iota + 1
CandidateTypeServerReflexive
CandidateTypePeerReflexive
CandidateTypeRelay
)
// String makes CandidateType printable
func (c CandidateType) String() string {
switch c {
case CandidateTypeHost:
return "host"
case CandidateTypeServerReflexive:
return "srflx"
case CandidateTypePeerReflexive:
return "prflx"
case CandidateTypeRelay:
return "relay"
}
return "Unknown candidate type"
}
// Preference returns the preference weight of a CandidateType
//
// 4.1.2.2. Guidelines for Choosing Type and Local Preferences
// The RECOMMENDED values are 126 for host candidates, 100
// for server reflexive candidates, 110 for peer reflexive candidates,
// and 0 for relayed candidates.
func (c CandidateType) Preference() uint16 {
switch c {
case CandidateTypeHost:
return 126
case CandidateTypePeerReflexive:
return 110
case CandidateTypeServerReflexive:
return 100
}
return 0
}

31
errors.go Normal file
View File

@@ -0,0 +1,31 @@
package ice
import (
"github.com/pkg/errors"
)
var (
// ErrUnknownType indicates an error with Unknown info.
ErrUnknownType = errors.New("Unknown")
// ErrSchemeType indicates the scheme type could not be parsed.
ErrSchemeType = errors.New("unknown scheme type")
// ErrSTUNQuery indicates query arguments are provided in a STUN URL.
ErrSTUNQuery = errors.New("queries not supported in stun address")
// ErrInvalidQuery indicates an malformed query is provided.
ErrInvalidQuery = errors.New("invalid query")
// ErrHost indicates malformed hostname is provided.
ErrHost = errors.New("invalid hostname")
// ErrPort indicates malformed port is provided.
ErrPort = errors.New("invalid port")
// ErrProtoType indicates an unsupported transport type was provided.
ErrProtoType = errors.New("invalid transport protocol type")
// ErrClosed indicates the agent is closed
ErrClosed = errors.New("the agent is closed")
)

76
ice.go Normal file
View File

@@ -0,0 +1,76 @@
package ice
// ConnectionState is an enum showing the state of a ICE Connection
type ConnectionState int
// List of supported States
const (
// ConnectionStateNew ICE agent is gathering addresses
ConnectionStateNew = iota + 1
// ConnectionStateChecking ICE agent has been given local and remote candidates, and is attempting to find a match
ConnectionStateChecking
// ConnectionStateConnected ICE agent has a pairing, but is still checking other pairs
ConnectionStateConnected
// ConnectionStateCompleted ICE agent has finished
ConnectionStateCompleted
// ConnectionStateFailed ICE agent never could successfully connect
ConnectionStateFailed
// ConnectionStateDisconnected ICE agent connected successfully, but has entered a failed state
ConnectionStateDisconnected
// ConnectionStateClosed ICE agent has finished and is no longer handling requests
ConnectionStateClosed
)
func (c ConnectionState) String() string {
switch c {
case ConnectionStateNew:
return "New"
case ConnectionStateChecking:
return "Checking"
case ConnectionStateConnected:
return "Connected"
case ConnectionStateCompleted:
return "Completed"
case ConnectionStateFailed:
return "Failed"
case ConnectionStateDisconnected:
return "Disconnected"
case ConnectionStateClosed:
return "Closed"
default:
return "Invalid"
}
}
// GatheringState describes the state of the candidate gathering process
type GatheringState int
const (
// GatheringStateNew indicates candidate gatering is not yet started
GatheringStateNew GatheringState = iota + 1
// GatheringStateGathering indicates candidate gatering is ongoing
GatheringStateGathering
// GatheringStateComplete indicates candidate gatering has been completed
GatheringStateComplete
)
func (t GatheringState) String() string {
switch t {
case GatheringStateNew:
return "new"
case GatheringStateGathering:
return "gathering"
case GatheringStateComplete:
return "complete"
default:
return ErrUnknownType.Error()
}
}

51
ice_test.go Normal file
View File

@@ -0,0 +1,51 @@
package ice
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestConnectedState_String(t *testing.T) {
testCases := []struct {
connectionState ConnectionState
expectedString string
}{
{ConnectionState(Unknown), "Invalid"},
{ConnectionStateNew, "New"},
{ConnectionStateChecking, "Checking"},
{ConnectionStateConnected, "Connected"},
{ConnectionStateCompleted, "Completed"},
{ConnectionStateFailed, "Failed"},
{ConnectionStateDisconnected, "Disconnected"},
{ConnectionStateClosed, "Closed"},
}
for i, testCase := range testCases {
assert.Equal(t,
testCase.expectedString,
testCase.connectionState.String(),
"testCase: %d %v", i, testCase,
)
}
}
func TestGatheringState_String(t *testing.T) {
testCases := []struct {
gatheringState GatheringState
expectedString string
}{
{GatheringState(Unknown), ErrUnknownType.Error()},
{GatheringStateNew, "new"},
{GatheringStateGathering, "gathering"},
{GatheringStateComplete, "complete"},
}
for i, testCase := range testCases {
assert.Equal(t,
testCase.expectedString,
testCase.gatheringState.String(),
"testCase: %d %v", i, testCase,
)
}
}

5
logging.go Normal file
View File

@@ -0,0 +1,5 @@
package ice
import "github.com/pions/webrtc/pkg/logging"
var iceLog = logging.NewScopedLogger("ice")

102
networktype.go Normal file
View File

@@ -0,0 +1,102 @@
package ice
import (
"fmt"
"net"
"strings"
)
const (
udp = "udp"
tcp = "tcp"
)
var supportedNetworks = []string{
udp,
// tcp, // Not supported yet
}
var supportedNetworkTypes = []NetworkType{
NetworkTypeUDP4,
NetworkTypeUDP6,
// NetworkTypeTCP4, // Not supported yet
// NetworkTypeTCP6, // Not supported yet
}
// NetworkType represents the type of network
type NetworkType int
const (
// NetworkTypeUDP4 indicates UDP over IPv4.
NetworkTypeUDP4 NetworkType = iota + 1
// NetworkTypeUDP6 indicates UDP over IPv4.
NetworkTypeUDP6
// NetworkTypeTCP4 indicates TCP over IPv4.
NetworkTypeTCP4
// NetworkTypeTCP6 indicates TCP over IPv4.
NetworkTypeTCP6
)
func (t NetworkType) String() string {
switch t {
case NetworkTypeUDP4:
return "udp4"
case NetworkTypeUDP6:
return "udp6"
case NetworkTypeTCP4:
return "tcp4"
case NetworkTypeTCP6:
return "tcp6"
default:
return ErrUnknownType.Error()
}
}
// NetworkShort returns the short network description
func (t NetworkType) NetworkShort() string {
switch t {
case NetworkTypeUDP4, NetworkTypeUDP6:
return udp
case NetworkTypeTCP4, NetworkTypeTCP6:
return tcp
default:
return ErrUnknownType.Error()
}
}
// IsReliable returns true if the network is reliable
func (t NetworkType) IsReliable() bool {
switch t {
case NetworkTypeUDP4, NetworkTypeUDP6:
return false
case NetworkTypeTCP4, NetworkTypeTCP6:
return true
}
return false
}
// determineNetworkType determines the type of network based on
// the short network string and an IP address.
func determineNetworkType(network string, ip net.IP) (NetworkType, error) {
ipv4 := ip.To4() != nil
switch {
case strings.HasPrefix(strings.ToLower(network), udp):
if ipv4 {
return NetworkTypeUDP4, nil
}
return NetworkTypeUDP6, nil
case strings.HasPrefix(strings.ToLower(network), tcp):
if ipv4 {
return NetworkTypeTCP4, nil
}
return NetworkTypeTCP6, nil
}
return NetworkType(0), fmt.Errorf("unable to determine networkType from %s %s", network, ip)
}

74
networktype_test.go Normal file
View File

@@ -0,0 +1,74 @@
package ice
import (
"net"
"testing"
)
func TestNetworkTypeParsing_Success(t *testing.T) {
ipv4 := net.ParseIP("192.168.0.1")
ipv6 := net.ParseIP("fe80::a3:6ff:fec4:5454")
for _, test := range []struct {
name string
inNetwork string
inIP net.IP
expected NetworkType
}{
{
"lowercase UDP4",
"udp",
ipv4,
NetworkTypeUDP4,
},
{
"uppercase UDP4",
"UDP",
ipv4,
NetworkTypeUDP4,
},
{
"lowercase UDP6",
"udp",
ipv6,
NetworkTypeUDP6,
},
{
"uppercase UDP6",
"UDP",
ipv6,
NetworkTypeUDP6,
},
} {
actual, err := determineNetworkType(test.inNetwork, test.inIP)
if err != nil {
t.Errorf("NetworkTypeParsing failed: %v", err)
}
if actual != test.expected {
t.Errorf("NetworkTypeParsing: '%s' -- input:%s expected:%s actual:%s",
test.name, test.inNetwork, test.expected, actual)
}
}
}
func TestNetworkTypeParsing_Failure(t *testing.T) {
ipv6 := net.ParseIP("fe80::a3:6ff:fec4:5454")
for _, test := range []struct {
name string
inNetwork string
inIP net.IP
}{
{
"invalid network",
"junkNetwork",
ipv6,
},
} {
actual, err := determineNetworkType(test.inNetwork, test.inIP)
if err == nil {
t.Errorf("NetworkTypeParsing should fail: '%s' -- input:%s actual:%s",
test.name, test.inNetwork, actual)
}
}
}

124
transport.go Normal file
View File

@@ -0,0 +1,124 @@
package ice
import (
"context"
"errors"
"net"
"time"
"github.com/pions/stun"
)
// Dial connects to the remote agent, acting as the controlling ice agent.
// Dial blocks until at least one ice candidate pair has successfully connected.
func (a *Agent) Dial(ctx context.Context, remoteUfrag, remotePwd string) (*Conn, error) {
return a.connect(ctx, true, remoteUfrag, remotePwd)
}
// Accept connects to the remote agent, acting as the controlled ice agent.
// Accept blocks until at least one ice candidate pair has successfully connected.
func (a *Agent) Accept(ctx context.Context, remoteUfrag, remotePwd string) (*Conn, error) {
return a.connect(ctx, false, remoteUfrag, remotePwd)
}
// Conn represents the ICE connection.
// At the moment the lifetime of the Conn is equal to the Agent.
type Conn struct {
agent *Agent
}
func (a *Agent) connect(ctx context.Context, isControlling bool, remoteUfrag, remotePwd string) (*Conn, error) {
err := a.ok()
if err != nil {
return nil, err
}
if a.opened {
return nil, errors.New("a connection is already opened")
}
err = a.startConnectivityChecks(isControlling, remoteUfrag, remotePwd)
if err != nil {
return nil, err
}
// block until pair selected
select {
case <-ctx.Done():
// TODO: Stop connectivity checks?
return nil, errors.New("connecting canceled by caller")
case <-a.onConnected:
}
return &Conn{
agent: a,
}, nil
}
// Read implements the Conn Read method.
func (c *Conn) Read(p []byte) (int, error) {
err := c.agent.ok()
if err != nil {
return 0, err
}
resN := make(chan int)
select {
case c.agent.rcvCh <- &bufIn{p, resN}:
n := <-resN
return n, nil
case <-c.agent.done:
return 0, c.agent.getErr()
}
}
// Write implements the Conn Write method.
func (c *Conn) Write(p []byte) (int, error) {
err := c.agent.ok()
if err != nil {
return 0, err
}
if stun.IsSTUN(p) {
return 0, errors.New("the ICE conn can't write STUN messages")
}
pair, err := c.agent.getBestPair()
if err != nil {
return 0, err
}
return pair.Write(p)
}
// Close implements the Conn Close method. It is used to close
// the connection. Any calls to Read and Write will be unblocked and return an error.
func (c *Conn) Close() error {
return c.agent.Close()
}
// TODO: Maybe just switch to using io.ReadWriteCloser?
// LocalAddr is a stub
func (c *Conn) LocalAddr() net.Addr {
return nil
}
// RemoteAddr is a stub
func (c *Conn) RemoteAddr() net.Addr {
return nil
}
// SetDeadline is a stub
func (c *Conn) SetDeadline(t time.Time) error {
return nil
}
// SetReadDeadline is a stub
func (c *Conn) SetReadDeadline(t time.Time) error {
return nil
}
// SetWriteDeadline is a stub
func (c *Conn) SetWriteDeadline(t time.Time) error {
return nil
}

281
transport_test.go Normal file
View File

@@ -0,0 +1,281 @@
package ice
import (
"context"
"testing"
"time"
"github.com/pions/transport/test"
)
func TestStressDuplex(t *testing.T) {
// Limit runtime in case of deadlocks
lim := test.TimeOut(time.Second * 20)
defer lim.Stop()
// Check for leaking routines
report := test.CheckRoutines(t)
defer report()
// Run the test
stressDuplex(t)
}
func testTimeout(t *testing.T, c *Conn, timeout time.Duration) {
const pollrate = 100 * time.Millisecond
statechan := make(chan ConnectionState)
ticker := time.NewTicker(pollrate)
for cnt := time.Duration(0); cnt <= timeout+taskLoopInterval; cnt += pollrate {
<-ticker.C
err := c.agent.run(func(agent *Agent) {
statechan <- agent.connectionState
})
if err != nil {
//we should never get here.
panic(err)
}
cs := <-statechan
if cs != ConnectionStateConnected {
if cnt < timeout {
t.Fatalf("Connection timed out early. (after %d ms)", cnt/time.Millisecond)
} else {
return
}
}
}
t.Fatalf("Connection failed to time out in time.")
}
func TestTimeout(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
ca, cb := pipe()
err := cb.Close()
if err != nil {
//we should never get here.
panic(err)
}
testTimeout(t, ca, 30*time.Second)
ca, cb = pipeWithTimeout(5*time.Second, 3*time.Second)
err = cb.Close()
if err != nil {
//we should never get here.
panic(err)
}
testTimeout(t, ca, 5*time.Second)
}
func TestReadClosed(t *testing.T) {
ca, cb := pipe()
err := ca.Close()
if err != nil {
//we should never get here.
panic(err)
}
err = cb.Close()
if err != nil {
//we should never get here.
panic(err)
}
empty := make([]byte, 10)
_, err = ca.Read(empty)
if err == nil {
t.Fatalf("Reading from a closed channel should return an error")
}
}
func stressDuplex(t *testing.T) {
ca, cb := pipe()
defer func() {
err := ca.Close()
if err != nil {
t.Fatal(err)
}
err = cb.Close()
if err != nil {
t.Fatal(err)
}
}()
opt := test.Options{
MsgSize: 10,
MsgCount: 1, // Order not reliable due to UDP & potentially multiple candidate pairs.
}
err := test.StressDuplex(ca, cb, opt)
if err != nil {
t.Fatal(err)
}
}
func Benchmark(b *testing.B) {
ca, cb := pipe()
defer func() {
err := ca.Close()
check(err)
err = cb.Close()
check(err)
}()
b.ResetTimer()
opt := test.Options{
MsgSize: 128,
MsgCount: b.N,
}
err := test.StressDuplex(ca, cb, opt)
check(err)
}
func check(err error) {
if err != nil {
panic(err)
}
}
func connect(aAgent, bAgent *Agent) (*Conn, *Conn) {
// Manual signaling
aUfrag, aPwd := aAgent.GetLocalUserCredentials()
bUfrag, bPwd := bAgent.GetLocalUserCredentials()
candidates, err := aAgent.GetLocalCandidates()
check(err)
for _, c := range candidates {
check(bAgent.AddRemoteCandidate(copyCandidate(c)))
}
candidates, err = bAgent.GetLocalCandidates()
check(err)
for _, c := range candidates {
check(aAgent.AddRemoteCandidate(copyCandidate(c)))
}
accepted := make(chan struct{})
var aConn *Conn
go func() {
var acceptErr error
aConn, acceptErr = aAgent.Accept(context.TODO(), bUfrag, bPwd)
check(acceptErr)
close(accepted)
}()
bConn, err := bAgent.Dial(context.TODO(), aUfrag, aPwd)
check(err)
// Ensure accepted
<-accepted
return aConn, bConn
}
func pipe() (*Conn, *Conn) {
var urls []*URL
aNotifier, aConnected := onConnected()
bNotifier, bConnected := onConnected()
aAgent, err := NewAgent(&AgentConfig{Urls: urls})
if err != nil {
panic(err)
}
err = aAgent.OnConnectionStateChange(aNotifier)
if err != nil {
panic(err)
}
bAgent, err := NewAgent(&AgentConfig{Urls: urls})
if err != nil {
panic(err)
}
err = bAgent.OnConnectionStateChange(bNotifier)
if err != nil {
panic(err)
}
aConn, bConn := connect(aAgent, bAgent)
// Ensure pair selected
// Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
<-aConnected
<-bConnected
return aConn, bConn
}
func pipeWithTimeout(iceTimeout time.Duration, iceKeepalive time.Duration) (*Conn, *Conn) {
var urls []*URL
aNotifier, aConnected := onConnected()
bNotifier, bConnected := onConnected()
aAgent, err := NewAgent(&AgentConfig{Urls: urls, ConnectionTimeout: &iceTimeout, KeepaliveInterval: &iceKeepalive})
if err != nil {
panic(err)
}
err = aAgent.OnConnectionStateChange(aNotifier)
if err != nil {
panic(err)
}
bAgent, err := NewAgent(&AgentConfig{Urls: urls, ConnectionTimeout: &iceTimeout, KeepaliveInterval: &iceKeepalive})
if err != nil {
panic(err)
}
err = bAgent.OnConnectionStateChange(bNotifier)
if err != nil {
panic(err)
}
aConn, bConn := connect(aAgent, bAgent)
// Ensure pair selected
// Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
<-aConnected
<-bConnected
return aConn, bConn
}
func copyCandidate(orig *Candidate) *Candidate {
c := &Candidate{
Type: orig.Type,
NetworkType: orig.NetworkType,
IP: orig.IP,
Port: orig.Port,
}
if orig.RelatedAddress != nil {
c.RelatedAddress = &CandidateRelatedAddress{
Address: orig.RelatedAddress.Address,
Port: orig.RelatedAddress.Port,
}
}
return c
}
func onConnected() (func(ConnectionState), chan struct{}) {
done := make(chan struct{})
return func(state ConnectionState) {
if state == ConnectionStateConnected {
close(done)
}
}, done
}

227
url.go Normal file
View File

@@ -0,0 +1,227 @@
package ice
import (
"net"
"net/url"
"strconv"
"github.com/pions/webrtc/pkg/rtcerr"
)
// TODO: Migrate address parsing to STUN/TURN
// SchemeType indicates the type of server used in the ice.URL structure.
type SchemeType int
// Unknown defines default public constant to use for "enum" like struct
// comparisons when no value was defined.
const Unknown = iota
const (
// SchemeTypeSTUN indicates the URL represents a STUN server.
SchemeTypeSTUN SchemeType = iota + 1
// SchemeTypeSTUNS indicates the URL represents a STUNS (secure) server.
SchemeTypeSTUNS
// SchemeTypeTURN indicates the URL represents a TURN server.
SchemeTypeTURN
// SchemeTypeTURNS indicates the URL represents a TURNS (secure) server.
SchemeTypeTURNS
)
// NewSchemeType defines a procedure for creating a new SchemeType from a raw
// string naming the scheme type.
func NewSchemeType(raw string) SchemeType {
switch raw {
case "stun":
return SchemeTypeSTUN
case "stuns":
return SchemeTypeSTUNS
case "turn":
return SchemeTypeTURN
case "turns":
return SchemeTypeTURNS
default:
return SchemeType(Unknown)
}
}
func (t SchemeType) String() string {
switch t {
case SchemeTypeSTUN:
return "stun"
case SchemeTypeSTUNS:
return "stuns"
case SchemeTypeTURN:
return "turn"
case SchemeTypeTURNS:
return "turns"
default:
return ErrUnknownType.Error()
}
}
// ProtoType indicates the transport protocol type that is used in the ice.URL
// structure.
type ProtoType int
const (
// ProtoTypeUDP indicates the URL uses a UDP transport.
ProtoTypeUDP ProtoType = iota + 1
// ProtoTypeTCP indicates the URL uses a TCP transport.
ProtoTypeTCP
)
// NewProtoType defines a procedure for creating a new ProtoType from a raw
// string naming the transport protocol type.
func NewProtoType(raw string) ProtoType {
switch raw {
case "udp":
return ProtoTypeUDP
case "tcp":
return ProtoTypeTCP
default:
return ProtoType(Unknown)
}
}
func (t ProtoType) String() string {
switch t {
case ProtoTypeUDP:
return "udp"
case ProtoTypeTCP:
return "tcp"
default:
return ErrUnknownType.Error()
}
}
// URL represents a STUN (rfc7064) or TURN (rfc7065) URL
type URL struct {
Scheme SchemeType
Host string
Port int
Proto ProtoType
}
// ParseURL parses a STUN or TURN urls following the ABNF syntax described in
// https://tools.ietf.org/html/rfc7064 and https://tools.ietf.org/html/rfc7065
// respectively.
func ParseURL(raw string) (*URL, error) {
rawParts, err := url.Parse(raw)
if err != nil {
return nil, &rtcerr.UnknownError{Err: err}
}
var u URL
u.Scheme = NewSchemeType(rawParts.Scheme)
if u.Scheme == SchemeType(Unknown) {
return nil, &rtcerr.SyntaxError{Err: ErrSchemeType}
}
var rawPort string
if u.Host, rawPort, err = net.SplitHostPort(rawParts.Opaque); err != nil {
if e, ok := err.(*net.AddrError); ok {
if e.Err == "missing port in address" {
nextRawURL := u.Scheme.String() + ":" + rawParts.Opaque
switch {
case u.Scheme == SchemeTypeSTUN || u.Scheme == SchemeTypeTURN:
nextRawURL += ":3478"
if rawParts.RawQuery != "" {
nextRawURL += "?" + rawParts.RawQuery
}
return ParseURL(nextRawURL)
case u.Scheme == SchemeTypeSTUNS || u.Scheme == SchemeTypeTURNS:
nextRawURL += ":5349"
if rawParts.RawQuery != "" {
nextRawURL += "?" + rawParts.RawQuery
}
return ParseURL(nextRawURL)
}
}
}
return nil, &rtcerr.UnknownError{Err: err}
}
if u.Host == "" {
return nil, &rtcerr.SyntaxError{Err: ErrHost}
}
if u.Port, err = strconv.Atoi(rawPort); err != nil {
return nil, &rtcerr.SyntaxError{Err: ErrPort}
}
switch {
case u.Scheme == SchemeTypeSTUN:
qArgs, err := url.ParseQuery(rawParts.RawQuery)
if err != nil || (err == nil && len(qArgs) > 0) {
return nil, &rtcerr.SyntaxError{Err: ErrSTUNQuery}
}
u.Proto = ProtoTypeUDP
case u.Scheme == SchemeTypeSTUNS:
qArgs, err := url.ParseQuery(rawParts.RawQuery)
if err != nil || (err == nil && len(qArgs) > 0) {
return nil, &rtcerr.SyntaxError{Err: ErrSTUNQuery}
}
u.Proto = ProtoTypeTCP
case u.Scheme == SchemeTypeTURN:
proto, err := parseProto(rawParts.RawQuery)
if err != nil {
return nil, err
}
u.Proto = proto
if u.Proto == ProtoType(Unknown) {
u.Proto = ProtoTypeUDP
}
case u.Scheme == SchemeTypeTURNS:
proto, err := parseProto(rawParts.RawQuery)
if err != nil {
return nil, err
}
u.Proto = proto
if u.Proto == ProtoType(Unknown) {
u.Proto = ProtoTypeTCP
}
}
return &u, nil
}
func parseProto(raw string) (ProtoType, error) {
qArgs, err := url.ParseQuery(raw)
if err != nil || len(qArgs) > 1 {
return ProtoType(Unknown), &rtcerr.SyntaxError{Err: ErrInvalidQuery}
}
var proto ProtoType
if rawProto := qArgs.Get("transport"); rawProto != "" {
if proto = NewProtoType(rawProto); proto == ProtoType(0) {
return ProtoType(Unknown), &rtcerr.NotSupportedError{Err: ErrProtoType}
}
return proto, nil
}
if len(qArgs) > 0 {
return ProtoType(Unknown), &rtcerr.SyntaxError{Err: ErrInvalidQuery}
}
return proto, nil
}
func (u URL) String() string {
rawURL := u.Scheme.String() + ":" + net.JoinHostPort(u.Host, strconv.Itoa(u.Port))
if u.Scheme == SchemeTypeTURN || u.Scheme == SchemeTypeTURNS {
rawURL += "?transport=" + u.Proto.String()
}
return rawURL
}
// IsSecure returns whether the this URL's scheme describes secure scheme or not.
func (u URL) IsSecure() bool {
return u.Scheme == SchemeTypeSTUNS || u.Scheme == SchemeTypeTURNS
}

72
url_test.go Normal file
View File

@@ -0,0 +1,72 @@
package ice
import (
"testing"
"github.com/pions/webrtc/pkg/rtcerr"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
func TestParseURL(t *testing.T) {
t.Run("Success", func(t *testing.T) {
testCases := []struct {
rawURL string
expectedURLString string
expectedScheme SchemeType
expectedSecure bool
expectedHost string
expectedPort int
expectedProto ProtoType
}{
{"stun:google.de", "stun:google.de:3478", SchemeTypeSTUN, false, "google.de", 3478, ProtoTypeUDP},
{"stun:google.de:1234", "stun:google.de:1234", SchemeTypeSTUN, false, "google.de", 1234, ProtoTypeUDP},
{"stuns:google.de", "stuns:google.de:5349", SchemeTypeSTUNS, true, "google.de", 5349, ProtoTypeTCP},
{"stun:[::1]:123", "stun:[::1]:123", SchemeTypeSTUN, false, "::1", 123, ProtoTypeUDP},
{"turn:google.de", "turn:google.de:3478?transport=udp", SchemeTypeTURN, false, "google.de", 3478, ProtoTypeUDP},
{"turns:google.de", "turns:google.de:5349?transport=tcp", SchemeTypeTURNS, true, "google.de", 5349, ProtoTypeTCP},
{"turn:google.de?transport=udp", "turn:google.de:3478?transport=udp", SchemeTypeTURN, false, "google.de", 3478, ProtoTypeUDP},
{"turns:google.de?transport=tcp", "turns:google.de:5349?transport=tcp", SchemeTypeTURNS, true, "google.de", 5349, ProtoTypeTCP},
}
for i, testCase := range testCases {
url, err := ParseURL(testCase.rawURL)
assert.Nil(t, err, "testCase: %d %v", i, testCase)
if err != nil {
return
}
assert.Equal(t, testCase.expectedScheme, url.Scheme, "testCase: %d %v", i, testCase)
assert.Equal(t, testCase.expectedURLString, url.String(), "testCase: %d %v", i, testCase)
assert.Equal(t, testCase.expectedSecure, url.IsSecure(), "testCase: %d %v", i, testCase)
assert.Equal(t, testCase.expectedHost, url.Host, "testCase: %d %v", i, testCase)
assert.Equal(t, testCase.expectedPort, url.Port, "testCase: %d %v", i, testCase)
assert.Equal(t, testCase.expectedProto, url.Proto, "testCase: %d %v", i, testCase)
}
})
t.Run("Failure", func(t *testing.T) {
testCases := []struct {
rawURL string
expectedErr error
}{
{"", &rtcerr.SyntaxError{Err: ErrSchemeType}},
{":::", &rtcerr.UnknownError{Err: errors.New("parse :::: missing protocol scheme")}},
{"stun:[::1]:123:", &rtcerr.UnknownError{Err: errors.New("address [::1]:123:: too many colons in address")}},
{"stun:[::1]:123a", &rtcerr.SyntaxError{Err: ErrPort}},
{"google.de", &rtcerr.SyntaxError{Err: ErrSchemeType}},
{"stun:", &rtcerr.SyntaxError{Err: ErrHost}},
{"stun:google.de:abc", &rtcerr.SyntaxError{Err: ErrPort}},
{"stun:google.de?transport=udp", &rtcerr.SyntaxError{Err: ErrSTUNQuery}},
{"stuns:google.de?transport=udp", &rtcerr.SyntaxError{Err: ErrSTUNQuery}},
{"turn:google.de?trans=udp", &rtcerr.SyntaxError{Err: ErrInvalidQuery}},
{"turns:google.de?trans=udp", &rtcerr.SyntaxError{Err: ErrInvalidQuery}},
{"turns:google.de?transport=udp&another=1", &rtcerr.SyntaxError{Err: ErrInvalidQuery}},
{"turn:google.de?transport=ip", &rtcerr.NotSupportedError{Err: ErrProtoType}},
}
for i, testCase := range testCases {
_, err := ParseURL(testCase.rawURL)
assert.EqualError(t, err, testCase.expectedErr.Error(), "testCase: %d %v", i, testCase)
}
})
}

50
util.go Normal file
View File

@@ -0,0 +1,50 @@
package ice
import (
"net"
"sync/atomic"
)
func localInterfaces() (ips []net.IP) {
ifaces, err := net.Interfaces()
if err != nil {
return ips
}
for _, iface := range ifaces {
if iface.Flags&net.FlagUp == 0 {
continue // interface down
}
if iface.Flags&net.FlagLoopback != 0 {
continue // loopback interface
}
addrs, err := iface.Addrs()
if err != nil {
return ips
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ips = append(ips, ip)
}
}
return ips
}
type atomicError struct{ v atomic.Value }
func (a *atomicError) Store(err error) {
a.v.Store(struct{ error }{err})
}
func (a *atomicError) Load() error {
err, _ := a.v.Load().(struct{ error })
return err.error
}