mirror of
https://github.com/pion/ice.git
synced 2025-09-27 03:45:54 +08:00
Fix ConnectionState being reported out of order
Before we launched a goroutine to announce every ConnectionState change to users. These could then be sent to the user out of order. This commit adds a connectionStateNotifier. The connectionStateNotifier delivers them sequentially to the user. Resolves #624
This commit is contained in:
13
agent.go
13
agent.go
@@ -130,7 +130,7 @@ type Agent struct {
|
||||
|
||||
chanCandidate chan Candidate
|
||||
chanCandidatePair chan *CandidatePair
|
||||
chanState chan ConnectionState
|
||||
stateNotifier *connectionStateNotifier
|
||||
|
||||
loggerFactory logging.LoggerFactory
|
||||
log logging.LeveledLogger
|
||||
@@ -227,7 +227,6 @@ func (a *Agent) taskLoop() {
|
||||
|
||||
after()
|
||||
|
||||
close(a.chanState)
|
||||
close(a.chanCandidate)
|
||||
close(a.chanCandidatePair)
|
||||
close(a.taskLoopDone)
|
||||
@@ -278,7 +277,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
|
||||
|
||||
a := &Agent{
|
||||
chanTask: make(chan task),
|
||||
chanState: make(chan ConnectionState),
|
||||
chanCandidate: make(chan Candidate),
|
||||
chanCandidatePair: make(chan *CandidatePair),
|
||||
tieBreaker: globalMathRandomGenerator.Uint64(),
|
||||
@@ -322,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
|
||||
|
||||
disableActiveTCP: config.DisableActiveTCP,
|
||||
}
|
||||
a.stateNotifier = &connectionStateNotifier{NotificationFunc: a.onConnectionStateChange}
|
||||
|
||||
if a.net == nil {
|
||||
a.net, err = stdnet.NewNet()
|
||||
@@ -369,7 +368,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
|
||||
// Blocking one by the other one causes deadlock.
|
||||
// Hence, we call handlers from independent Goroutines.
|
||||
go a.candidatePairRoutine()
|
||||
go a.connectionStateRoutine()
|
||||
go a.candidateRoutine()
|
||||
|
||||
// Restart is also used to initialize the agent for the first time
|
||||
@@ -503,12 +501,7 @@ func (a *Agent) updateConnectionState(newState ConnectionState) {
|
||||
|
||||
a.log.Infof("Setting new connection state: %s", newState)
|
||||
a.connectionState = newState
|
||||
|
||||
// Call handler after finishing current task since we may be holding the agent lock
|
||||
// and the handler may also require it
|
||||
a.afterRun(func(_ context.Context) {
|
||||
a.chanState <- newState
|
||||
})
|
||||
a.stateNotifier.Enqueue(newState)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -3,6 +3,8 @@
|
||||
|
||||
package ice
|
||||
|
||||
import "sync"
|
||||
|
||||
// OnConnectionStateChange sets a handler that is fired when the connection state changes
|
||||
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
|
||||
a.onConnectionStateChangeHdlr.Store(f)
|
||||
@@ -47,9 +49,35 @@ func (a *Agent) candidatePairRoutine() {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) connectionStateRoutine() {
|
||||
for s := range a.chanState {
|
||||
go a.onConnectionStateChange(s)
|
||||
type connectionStateNotifier struct {
|
||||
sync.Mutex
|
||||
states []ConnectionState
|
||||
running bool
|
||||
NotificationFunc func(ConnectionState)
|
||||
}
|
||||
|
||||
func (c *connectionStateNotifier) Enqueue(s ConnectionState) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.states = append(c.states, s)
|
||||
if !c.running {
|
||||
c.running = true
|
||||
go c.notify()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connectionStateNotifier) notify() {
|
||||
for {
|
||||
c.Lock()
|
||||
if len(c.states) == 0 {
|
||||
c.running = false
|
||||
c.Unlock()
|
||||
return
|
||||
}
|
||||
s := c.states[0]
|
||||
c.states = c.states[1:]
|
||||
c.Unlock()
|
||||
c.NotificationFunc(s)
|
||||
}
|
||||
}
|
||||
|
||||
|
71
agent_handlers_test.go
Normal file
71
agent_handlers_test.go
Normal file
@@ -0,0 +1,71 @@
|
||||
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package ice
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pion/transport/v3/test"
|
||||
)
|
||||
|
||||
func TestConnectionStateNotifier(t *testing.T) {
|
||||
t.Run("TestManyUpdates", func(t *testing.T) {
|
||||
report := test.CheckRoutines(t)
|
||||
defer report()
|
||||
updates := make(chan struct{}, 1)
|
||||
c := &connectionStateNotifier{
|
||||
NotificationFunc: func(_ ConnectionState) {
|
||||
updates <- struct{}{}
|
||||
},
|
||||
}
|
||||
// Enqueue all updates upfront to ensure that it
|
||||
// doesn't block
|
||||
for i := 0; i < 10000; i++ {
|
||||
c.Enqueue(ConnectionStateNew)
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for i := 0; i < 10000; i++ {
|
||||
<-updates
|
||||
}
|
||||
select {
|
||||
case <-updates:
|
||||
t.Errorf("received more updates than expected")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
<-done
|
||||
})
|
||||
t.Run("TestUpdateOrdering", func(t *testing.T) {
|
||||
report := test.CheckRoutines(t)
|
||||
defer report()
|
||||
updates := make(chan ConnectionState)
|
||||
c := &connectionStateNotifier{
|
||||
NotificationFunc: func(cs ConnectionState) {
|
||||
updates <- cs
|
||||
},
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for i := 0; i < 10000; i++ {
|
||||
x := <-updates
|
||||
if x != ConnectionState(i) {
|
||||
t.Errorf("expected %d got %d", x, i)
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-updates:
|
||||
t.Errorf("received more updates than expected")
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
for i := 0; i < 10000; i++ {
|
||||
c.Enqueue(ConnectionState(i))
|
||||
}
|
||||
<-done
|
||||
})
|
||||
}
|
Reference in New Issue
Block a user