Add OnSelectedCandidatePairChange callback

Invoked when the selected ICE candidate pair changes.
This commit is contained in:
Michael MacDonald
2019-01-28 11:16:46 -05:00
parent 71053c1e16
commit b8f3e36b00
10 changed files with 248 additions and 5 deletions

View File

@@ -69,6 +69,14 @@ func (api *API) NewDTLSTransport(transport *ICETransport, certificates []Certifi
return t, nil
}
// ICETransport returns the currently-configured *ICETransport or nil
// if one has not been configured
func (t *DTLSTransport) ICETransport() *ICETransport {
t.lock.RLock()
defer t.lock.RUnlock()
return t.iceTransport
}
// GetLocalParameters returns the DTLS parameters of the local DTLSTransport upon construction.
func (t *DTLSTransport) GetLocalParameters() DTLSParameters {
fingerprints := []DTLSFingerprint{}

View File

@@ -141,3 +141,11 @@ func convertTypeFromICE(t ice.CandidateType) (ICECandidateType, error) {
return ICECandidateType(t), fmt.Errorf("unknown ICE candidate type: %s", t)
}
}
func (c ICECandidate) String() string {
ic, err := c.toICE()
if err != nil {
return fmt.Sprintf("%#v failed to convert to ICE: %s", c, err)
}
return ic.String()
}

24
icecandidatepair.go Normal file
View File

@@ -0,0 +1,24 @@
package webrtc
import "fmt"
type (
// ICECandidatePair represents an ICE Candidate pair
ICECandidatePair struct {
Local *ICECandidate
Remote *ICECandidate
}
)
func (p *ICECandidatePair) String() string {
return fmt.Sprintf("(local) %s <-> (remote) %s", p.Local, p.Remote)
}
// NewICECandidatePair returns an initialized *ICECandidatePair
// for the given pair of ICECandidate instances
func NewICECandidatePair(local, remote *ICECandidate) *ICECandidatePair {
return &ICECandidatePair{
Local: local,
Remote: remote,
}
}

View File

@@ -19,7 +19,8 @@ type ICETransport struct {
// State ICETransportState
// gatheringState ICEGathererState
onConnectionStateChangeHdlr func(ICETransportState)
onConnectionStateChangeHdlr func(ICETransportState)
onSelectedCandidatePairChangeHdlr func(*ICECandidatePair)
gatherer *ICEGatherer
conn *ice.Conn
@@ -67,10 +68,19 @@ func (t *ICETransport) Start(gatherer *ICEGatherer, params ICEParameters, role *
}
agent := t.gatherer.agent
err := agent.OnConnectionStateChange(func(iceState ice.ConnectionState) {
if err := agent.OnConnectionStateChange(func(iceState ice.ConnectionState) {
t.onConnectionStateChange(newICETransportStateFromICE(iceState))
})
if err != nil {
}); err != nil {
return err
}
if err := agent.OnSelectedCandidatePairChange(func(local, remote *ice.Candidate) {
candidates, err := newICECandidatesFromICE([]*ice.Candidate{local, remote})
if err != nil {
pcLog.Warnf("Unable to convert ICE candidates to ICECandidates: %s", err)
return
}
t.onSelectedCandidatePairChange(NewICECandidatePair(&candidates[0], &candidates[1]))
}); err != nil {
return err
}
@@ -85,6 +95,7 @@ func (t *ICETransport) Start(gatherer *ICEGatherer, params ICEParameters, role *
t.lock.Unlock()
var iceConn *ice.Conn
var err error
switch *role {
case ICERoleControlling:
iceConn, err = agent.Dial(context.TODO(),
@@ -124,6 +135,23 @@ func (t *ICETransport) Stop() error {
return nil
}
// OnSelectedCandidatePairChange sets a handler that is invoked when a new
// ICE candidate pair is selected
func (t *ICETransport) OnSelectedCandidatePairChange(f func(*ICECandidatePair)) {
t.lock.Lock()
defer t.lock.Unlock()
t.onSelectedCandidatePairChangeHdlr = f
}
func (t *ICETransport) onSelectedCandidatePairChange(pair *ICECandidatePair) {
t.lock.RLock()
hdlr := t.onSelectedCandidatePairChangeHdlr
t.lock.RUnlock()
if hdlr != nil {
hdlr(pair)
}
}
// OnConnectionStateChange sets a handler that is fired when the ICE
// connection state changes.
func (t *ICETransport) OnConnectionStateChange(f func(ICETransportState)) {

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
@@ -257,3 +258,74 @@ func TestPeerConnection_Media_Shutdown(t *testing.T) {
}
onTrackFiredLock.Unlock()
}
func TestPeerConnection_Media_Sender_Transports_OnSelectedCandidatePairChange(t *testing.T) {
iceComplete := make(chan bool)
api := NewAPI()
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
api.mediaEngine.RegisterDefaultCodecs()
pcOffer, pcAnswer, err := api.newPair()
if err != nil {
t.Fatal(err)
}
opusTrack, err := pcOffer.NewTrack(DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion1")
if err != nil {
t.Fatal(err)
}
vp8Track, err := pcOffer.NewTrack(DefaultPayloadTypeVP8, rand.Uint32(), "video", "pion2")
if err != nil {
t.Fatal(err)
}
if _, err = pcOffer.AddTrack(opusTrack); err != nil {
t.Fatal(err)
} else if _, err = pcAnswer.AddTrack(vp8Track); err != nil {
t.Fatal(err)
}
pcAnswer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
if iceState == ICEConnectionStateConnected {
go func() {
time.Sleep(3 * time.Second) // TODO PeerConnection.Close() doesn't block for all subsystems
close(iceComplete)
}()
}
})
senderCalledCandidateChange := int32(0)
for _, sender := range pcOffer.GetSenders() {
dtlsTransport := sender.Transport()
if dtlsTransport == nil {
continue
}
if iceTransport := dtlsTransport.ICETransport(); iceTransport != nil {
iceTransport.OnSelectedCandidatePairChange(func(pair *ICECandidatePair) {
atomic.StoreInt32(&senderCalledCandidateChange, 1)
})
}
}
err = signalPair(pcOffer, pcAnswer)
if err != nil {
t.Fatal(err)
}
<-iceComplete
if atomic.LoadInt32(&senderCalledCandidateChange) == 0 {
t.Fatalf("Sender ICETransport OnSelectedCandidateChange was never called")
}
err = pcOffer.Close()
if err != nil {
t.Fatal(err)
}
err = pcAnswer.Close()
if err != nil {
t.Fatal(err)
}
}

View File

@@ -28,7 +28,8 @@ const (
// Agent represents the ICE agent
type Agent struct {
onConnectionStateChangeHdlr func(ConnectionState)
onConnectionStateChangeHdlr func(ConnectionState)
onSelectedCandidatePairChangeHdlr func(*Candidate, *Candidate)
// Used to block double Dial/Accept
opened bool
@@ -170,6 +171,22 @@ func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
})
}
// OnSelectedCandidatePairChange sets a handler that is fired when the final candidate
// pair is selected
func (a *Agent) OnSelectedCandidatePairChange(f func(*Candidate, *Candidate)) error {
return a.run(func(agent *Agent) {
agent.onSelectedCandidatePairChangeHdlr = f
})
}
func (a *Agent) onSelectedCandidatePairChange(p *candidatePair) {
if p != nil {
if a.onSelectedCandidatePairChangeHdlr != nil {
a.onSelectedCandidatePairChangeHdlr(p.local, p.remote)
}
}
}
func (a *Agent) listenUDP(network string, laddr *net.UDPAddr) (*net.UDPConn, error) {
if (laddr.Port != 0) || ((a.portmin == 0) && (a.portmax == 0)) {
return net.ListenUDP(network, laddr)
@@ -362,6 +379,7 @@ func (a *Agent) pingCandidate(local, remote *Candidate) {
func (a *Agent) updateConnectionState(newState ConnectionState) {
if a.connectionState != newState {
iceLog.Infof("Setting new connection state: %s", newState)
a.connectionState = newState
hdlr := a.onConnectionStateChangeHdlr
if hdlr != nil {
@@ -390,6 +408,10 @@ func (a *Agent) setValidPair(local, remote *Candidate, selected, controlling boo
iceLog.Tracef("Found valid candidate pair: %s (selected? %t)", p, selected)
if selected {
// Notify when the selected pair changes
if !a.selectedPair.Equal(p) {
a.onSelectedCandidatePairChange(p)
}
a.selectedPair = p
a.validPairs = nil
// TODO: only set state to connected on selecting final pair?

View File

@@ -114,6 +114,61 @@ func TestPairPriority(t *testing.T) {
}
}
func TestOnSelectedCandidatePairChange(t *testing.T) {
// avoid deadlocks?
defer test.TimeOut(1 * time.Second).Stop()
a, err := NewAgent(&AgentConfig{})
if err != nil {
t.Fatalf("Failed to create agent: %s", err)
}
callbackCalled := make(chan struct{}, 1)
if err = a.OnSelectedCandidatePairChange(func(local, remote *Candidate) {
close(callbackCalled)
}); err != nil {
t.Fatalf("Failed to set agent OnCandidatePairChange callback: %s", err)
}
hostLocal, err := NewCandidateHost(
"udp",
net.ParseIP("192.168.1.1"), 19216,
1,
)
if err != nil {
t.Fatalf("Failed to construct local host candidate: %s", err)
}
relayRemote, err := NewCandidateRelay(
"udp",
net.ParseIP("1.2.3.4"), 12340,
1,
"4.3.2.1", 43210,
)
if err != nil {
t.Fatalf("Failed to construct remote relay candidate: %s", err)
}
// select the pair
if err = a.run(func(agent *Agent) {
agent.setValidPair(hostLocal, relayRemote, true, false)
}); err != nil {
t.Fatalf("Failed to setValidPair(): %s", err)
}
// ensure that the callback fired on setting the pair
<-callbackCalled
// set the same pair; this should not invoke the callback
// if the callback is invoked now it will panic due
// to second close of the channel
if err = a.run(func(agent *Agent) {
agent.setValidPair(hostLocal, relayRemote, true, false)
}); err != nil {
t.Fatalf("Failed to setValidPair(): %s", err)
}
if err := a.Close(); err != nil {
t.Fatalf("Error on agent.Close(): %s", err)
}
}
type BadAddr struct{}
func (ba *BadAddr) Network() string {

View File

@@ -26,6 +26,16 @@ func (p *candidatePair) String() string {
p.Priority(), p.local.Priority(), p.local, p.remote, p.remote.Priority())
}
func (p *candidatePair) Equal(other *candidatePair) bool {
if p == nil && other == nil {
return true
}
if p == nil || other == nil {
return false
}
return p.local.Equal(other.local) && p.remote.Equal(other.remote)
}
// RFC 5245 - 5.7.2. Computing Pair Priority and Ordering Pairs
// Let G be the priority for the candidate provided by the controlling
// agent. Let D be the priority for the candidate provided by the

View File

@@ -38,6 +38,14 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT
}, nil
}
// Transport returns the currently-configured *DTLSTransport or nil
// if one has not yet been configured
func (r *RTPReceiver) Transport() *DTLSTransport {
r.mu.RLock()
defer r.mu.RUnlock()
return r.transport
}
// Track returns the RTCRtpTransceiver track
func (r *RTPReceiver) Track() *Track {
r.mu.RLock()

View File

@@ -44,6 +44,14 @@ func (api *API) NewRTPSender(track *Track, transport *DTLSTransport) (*RTPSender
}, nil
}
// Transport returns the currently-configured *DTLSTransport or nil
// if one has not yet been configured
func (r *RTPSender) Transport() *DTLSTransport {
r.mu.RLock()
defer r.mu.RUnlock()
return r.transport
}
// Send Attempts to set the parameters controlling the sending of media.
func (r *RTPSender) Send(parameters RTPSendParameters) error {
r.mu.Lock()