mirror of
https://github.com/pion/ice.git
synced 2025-09-27 20:02:07 +08:00
Fix ConnectionState being reported out of order
Before we launched a goroutine to announce every ConnectionState change to users. These could then be sent to the user out of order. This commit adds a connectionStateNotifier. The connectionStateNotifier delivers them sequentially to the user. Resolves #624
This commit is contained in:
13
agent.go
13
agent.go
@@ -130,7 +130,7 @@ type Agent struct {
|
|||||||
|
|
||||||
chanCandidate chan Candidate
|
chanCandidate chan Candidate
|
||||||
chanCandidatePair chan *CandidatePair
|
chanCandidatePair chan *CandidatePair
|
||||||
chanState chan ConnectionState
|
stateNotifier *connectionStateNotifier
|
||||||
|
|
||||||
loggerFactory logging.LoggerFactory
|
loggerFactory logging.LoggerFactory
|
||||||
log logging.LeveledLogger
|
log logging.LeveledLogger
|
||||||
@@ -227,7 +227,6 @@ func (a *Agent) taskLoop() {
|
|||||||
|
|
||||||
after()
|
after()
|
||||||
|
|
||||||
close(a.chanState)
|
|
||||||
close(a.chanCandidate)
|
close(a.chanCandidate)
|
||||||
close(a.chanCandidatePair)
|
close(a.chanCandidatePair)
|
||||||
close(a.taskLoopDone)
|
close(a.taskLoopDone)
|
||||||
@@ -278,7 +277,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
|
|||||||
|
|
||||||
a := &Agent{
|
a := &Agent{
|
||||||
chanTask: make(chan task),
|
chanTask: make(chan task),
|
||||||
chanState: make(chan ConnectionState),
|
|
||||||
chanCandidate: make(chan Candidate),
|
chanCandidate: make(chan Candidate),
|
||||||
chanCandidatePair: make(chan *CandidatePair),
|
chanCandidatePair: make(chan *CandidatePair),
|
||||||
tieBreaker: globalMathRandomGenerator.Uint64(),
|
tieBreaker: globalMathRandomGenerator.Uint64(),
|
||||||
@@ -322,6 +320,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
|
|||||||
|
|
||||||
disableActiveTCP: config.DisableActiveTCP,
|
disableActiveTCP: config.DisableActiveTCP,
|
||||||
}
|
}
|
||||||
|
a.stateNotifier = &connectionStateNotifier{NotificationFunc: a.onConnectionStateChange}
|
||||||
|
|
||||||
if a.net == nil {
|
if a.net == nil {
|
||||||
a.net, err = stdnet.NewNet()
|
a.net, err = stdnet.NewNet()
|
||||||
@@ -369,7 +368,6 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
|
|||||||
// Blocking one by the other one causes deadlock.
|
// Blocking one by the other one causes deadlock.
|
||||||
// Hence, we call handlers from independent Goroutines.
|
// Hence, we call handlers from independent Goroutines.
|
||||||
go a.candidatePairRoutine()
|
go a.candidatePairRoutine()
|
||||||
go a.connectionStateRoutine()
|
|
||||||
go a.candidateRoutine()
|
go a.candidateRoutine()
|
||||||
|
|
||||||
// Restart is also used to initialize the agent for the first time
|
// Restart is also used to initialize the agent for the first time
|
||||||
@@ -503,12 +501,7 @@ func (a *Agent) updateConnectionState(newState ConnectionState) {
|
|||||||
|
|
||||||
a.log.Infof("Setting new connection state: %s", newState)
|
a.log.Infof("Setting new connection state: %s", newState)
|
||||||
a.connectionState = newState
|
a.connectionState = newState
|
||||||
|
a.stateNotifier.Enqueue(newState)
|
||||||
// Call handler after finishing current task since we may be holding the agent lock
|
|
||||||
// and the handler may also require it
|
|
||||||
a.afterRun(func(_ context.Context) {
|
|
||||||
a.chanState <- newState
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -3,6 +3,8 @@
|
|||||||
|
|
||||||
package ice
|
package ice
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
// OnConnectionStateChange sets a handler that is fired when the connection state changes
|
// OnConnectionStateChange sets a handler that is fired when the connection state changes
|
||||||
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
|
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
|
||||||
a.onConnectionStateChangeHdlr.Store(f)
|
a.onConnectionStateChangeHdlr.Store(f)
|
||||||
@@ -47,9 +49,35 @@ func (a *Agent) candidatePairRoutine() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Agent) connectionStateRoutine() {
|
type connectionStateNotifier struct {
|
||||||
for s := range a.chanState {
|
sync.Mutex
|
||||||
go a.onConnectionStateChange(s)
|
states []ConnectionState
|
||||||
|
running bool
|
||||||
|
NotificationFunc func(ConnectionState)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *connectionStateNotifier) Enqueue(s ConnectionState) {
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
c.states = append(c.states, s)
|
||||||
|
if !c.running {
|
||||||
|
c.running = true
|
||||||
|
go c.notify()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *connectionStateNotifier) notify() {
|
||||||
|
for {
|
||||||
|
c.Lock()
|
||||||
|
if len(c.states) == 0 {
|
||||||
|
c.running = false
|
||||||
|
c.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s := c.states[0]
|
||||||
|
c.states = c.states[1:]
|
||||||
|
c.Unlock()
|
||||||
|
c.NotificationFunc(s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
71
agent_handlers_test.go
Normal file
71
agent_handlers_test.go
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package ice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pion/transport/v3/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConnectionStateNotifier(t *testing.T) {
|
||||||
|
t.Run("TestManyUpdates", func(t *testing.T) {
|
||||||
|
report := test.CheckRoutines(t)
|
||||||
|
defer report()
|
||||||
|
updates := make(chan struct{}, 1)
|
||||||
|
c := &connectionStateNotifier{
|
||||||
|
NotificationFunc: func(_ ConnectionState) {
|
||||||
|
updates <- struct{}{}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// Enqueue all updates upfront to ensure that it
|
||||||
|
// doesn't block
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
c.Enqueue(ConnectionStateNew)
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
<-updates
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-updates:
|
||||||
|
t.Errorf("received more updates than expected")
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
<-done
|
||||||
|
})
|
||||||
|
t.Run("TestUpdateOrdering", func(t *testing.T) {
|
||||||
|
report := test.CheckRoutines(t)
|
||||||
|
defer report()
|
||||||
|
updates := make(chan ConnectionState)
|
||||||
|
c := &connectionStateNotifier{
|
||||||
|
NotificationFunc: func(cs ConnectionState) {
|
||||||
|
updates <- cs
|
||||||
|
},
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
x := <-updates
|
||||||
|
if x != ConnectionState(i) {
|
||||||
|
t.Errorf("expected %d got %d", x, i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-updates:
|
||||||
|
t.Errorf("received more updates than expected")
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
c.Enqueue(ConnectionState(i))
|
||||||
|
}
|
||||||
|
<-done
|
||||||
|
})
|
||||||
|
}
|
Reference in New Issue
Block a user