Fix ConnectionState being reported out of order

Before we launched a goroutine to announce every ConnectionState change
to users. These could then be sent to the user out of order.

This commit adds a connectionStateNotifier. The connectionStateNotifier
delivers them sequentially to the user.

Resolves #624
This commit is contained in:
sukun
2024-03-13 21:25:57 +05:30
committed by Sean DuBois
parent 77cc354d7f
commit 67cc918a51
3 changed files with 105 additions and 13 deletions

View File

@@ -130,7 +130,7 @@ type Agent struct {
chanCandidate chan Candidate chanCandidate chan Candidate
chanCandidatePair chan *CandidatePair chanCandidatePair chan *CandidatePair
chanState chan ConnectionState stateNotifier *connectionStateNotifier
loggerFactory logging.LoggerFactory loggerFactory logging.LoggerFactory
log logging.LeveledLogger log logging.LeveledLogger
@@ -227,7 +227,6 @@ func (a *Agent) taskLoop() {
after() after()
close(a.chanState)
close(a.chanCandidate) close(a.chanCandidate)
close(a.chanCandidatePair) close(a.chanCandidatePair)
close(a.taskLoopDone) close(a.taskLoopDone)
@@ -278,7 +277,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
a := &Agent{ a := &Agent{
chanTask: make(chan task), chanTask: make(chan task),
chanState: make(chan ConnectionState),
chanCandidate: make(chan Candidate), chanCandidate: make(chan Candidate),
chanCandidatePair: make(chan *CandidatePair), chanCandidatePair: make(chan *CandidatePair),
tieBreaker: globalMathRandomGenerator.Uint64(), tieBreaker: globalMathRandomGenerator.Uint64(),
@@ -322,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
disableActiveTCP: config.DisableActiveTCP, disableActiveTCP: config.DisableActiveTCP,
} }
a.stateNotifier = &connectionStateNotifier{NotificationFunc: a.onConnectionStateChange}
if a.net == nil { if a.net == nil {
a.net, err = stdnet.NewNet() a.net, err = stdnet.NewNet()
@@ -369,7 +368,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
// Blocking one by the other one causes deadlock. // Blocking one by the other one causes deadlock.
// Hence, we call handlers from independent Goroutines. // Hence, we call handlers from independent Goroutines.
go a.candidatePairRoutine() go a.candidatePairRoutine()
go a.connectionStateRoutine()
go a.candidateRoutine() go a.candidateRoutine()
// Restart is also used to initialize the agent for the first time // Restart is also used to initialize the agent for the first time
@@ -503,12 +501,7 @@ func (a *Agent) updateConnectionState(newState ConnectionState) {
a.log.Infof("Setting new connection state: %s", newState) a.log.Infof("Setting new connection state: %s", newState)
a.connectionState = newState a.connectionState = newState
a.stateNotifier.Enqueue(newState)
// Call handler after finishing current task since we may be holding the agent lock
// and the handler may also require it
a.afterRun(func(_ context.Context) {
a.chanState <- newState
})
} }
} }

View File

@@ -3,6 +3,8 @@
package ice package ice
import "sync"
// OnConnectionStateChange sets a handler that is fired when the connection state changes // OnConnectionStateChange sets a handler that is fired when the connection state changes
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error { func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
a.onConnectionStateChangeHdlr.Store(f) a.onConnectionStateChangeHdlr.Store(f)
@@ -47,9 +49,35 @@ func (a *Agent) candidatePairRoutine() {
} }
} }
func (a *Agent) connectionStateRoutine() { type connectionStateNotifier struct {
for s := range a.chanState { sync.Mutex
go a.onConnectionStateChange(s) states []ConnectionState
running bool
NotificationFunc func(ConnectionState)
}
func (c *connectionStateNotifier) Enqueue(s ConnectionState) {
c.Lock()
defer c.Unlock()
c.states = append(c.states, s)
if !c.running {
c.running = true
go c.notify()
}
}
func (c *connectionStateNotifier) notify() {
for {
c.Lock()
if len(c.states) == 0 {
c.running = false
c.Unlock()
return
}
s := c.states[0]
c.states = c.states[1:]
c.Unlock()
c.NotificationFunc(s)
} }
} }

71
agent_handlers_test.go Normal file
View File

@@ -0,0 +1,71 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package ice
import (
"testing"
"time"
"github.com/pion/transport/v3/test"
)
func TestConnectionStateNotifier(t *testing.T) {
t.Run("TestManyUpdates", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan struct{}, 1)
c := &connectionStateNotifier{
NotificationFunc: func(_ ConnectionState) {
updates <- struct{}{}
},
}
// Enqueue all updates upfront to ensure that it
// doesn't block
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionStateNew)
}
done := make(chan struct{})
go func() {
for i := 0; i < 10000; i++ {
<-updates
}
select {
case <-updates:
t.Errorf("received more updates than expected")
case <-time.After(1 * time.Second):
}
close(done)
}()
<-done
})
t.Run("TestUpdateOrdering", func(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
updates := make(chan ConnectionState)
c := &connectionStateNotifier{
NotificationFunc: func(cs ConnectionState) {
updates <- cs
},
}
done := make(chan struct{})
go func() {
for i := 0; i < 10000; i++ {
x := <-updates
if x != ConnectionState(i) {
t.Errorf("expected %d got %d", x, i)
}
}
select {
case <-updates:
t.Errorf("received more updates than expected")
case <-time.After(1 * time.Second):
}
close(done)
}()
for i := 0; i < 10000; i++ {
c.Enqueue(ConnectionState(i))
}
<-done
})
}