Make onNegotiationNeeded conform to spec

- Removes non-canon logic
This commit is contained in:
Eric Daniels
2024-06-24 10:54:41 -04:00
parent 4430f4194c
commit b3856ffcfc
5 changed files with 66 additions and 96 deletions

View File

@@ -16,11 +16,19 @@ type operations struct {
mu sync.Mutex mu sync.Mutex
busy bool busy bool
ops *list.List ops *list.List
updateNegotiationNeededFlagOnEmptyChain *atomicBool
onNegotiationNeeded func()
} }
func newOperations() *operations { func newOperations(
updateNegotiationNeededFlagOnEmptyChain *atomicBool,
onNegotiationNeeded func(),
) *operations {
return &operations{ return &operations{
ops: list.New(), ops: list.New(),
updateNegotiationNeededFlagOnEmptyChain: updateNegotiationNeededFlagOnEmptyChain,
onNegotiationNeeded: onNegotiationNeeded,
} }
} }
@@ -93,4 +101,9 @@ func (o *operations) start() {
fn() fn()
fn = o.pop() fn = o.pop()
} }
if !o.updateNegotiationNeededFlagOnEmptyChain.get() {
return
}
o.updateNegotiationNeededFlagOnEmptyChain.set(false)
o.onNegotiationNeeded()
} }

View File

@@ -4,19 +4,31 @@
package webrtc package webrtc
import ( import (
"sync"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestOperations_Enqueue(t *testing.T) { func TestOperations_Enqueue(t *testing.T) {
ops := newOperations() updateNegotiationNeededFlagOnEmptyChain := &atomicBool{}
for i := 0; i < 100; i++ { onNegotiationNeededCalledCount := 0
var onNegotiationNeededCalledCountMu sync.Mutex
ops := newOperations(updateNegotiationNeededFlagOnEmptyChain, func() {
onNegotiationNeededCalledCountMu.Lock()
onNegotiationNeededCalledCount++
onNegotiationNeededCalledCountMu.Unlock()
})
for resultSet := 0; resultSet < 100; resultSet++ {
results := make([]int, 16) results := make([]int, 16)
resultSetCopy := resultSet
for i := range results { for i := range results {
func(j int) { func(j int) {
ops.Enqueue(func() { ops.Enqueue(func() {
results[j] = j * j results[j] = j * j
if resultSetCopy > 50 {
updateNegotiationNeededFlagOnEmptyChain.set(true)
}
}) })
}(i) }(i)
} }
@@ -26,9 +38,13 @@ func TestOperations_Enqueue(t *testing.T) {
assert.Equal(t, len(expected), len(results)) assert.Equal(t, len(expected), len(results))
assert.Equal(t, expected, results) assert.Equal(t, expected, results)
} }
onNegotiationNeededCalledCountMu.Lock()
defer onNegotiationNeededCalledCountMu.Unlock()
assert.NotEqual(t, onNegotiationNeededCalledCount, 0)
} }
func TestOperations_Done(*testing.T) { func TestOperations_Done(*testing.T) {
ops := newOperations() ops := newOperations(&atomicBool{}, func() {
})
ops.Done() ops.Done()
} }

View File

@@ -57,7 +57,7 @@ type PeerConnection struct {
isClosed *atomicBool isClosed *atomicBool
isNegotiationNeeded *atomicBool isNegotiationNeeded *atomicBool
negotiationNeededState negotiationNeededState updateNegotiationNeededFlagOnEmptyChain *atomicBool
lastOffer string lastOffer string
lastAnswer string lastAnswer string
@@ -115,6 +115,7 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
// https://w3c.github.io/webrtc-pc/#constructor (Step #2) // https://w3c.github.io/webrtc-pc/#constructor (Step #2)
// Some variables defined explicitly despite their implicit zero values to // Some variables defined explicitly despite their implicit zero values to
// allow better readability to understand what is happening. // allow better readability to understand what is happening.
pc := &PeerConnection{ pc := &PeerConnection{
statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()), statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()),
configuration: Configuration{ configuration: Configuration{
@@ -125,10 +126,9 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
Certificates: []Certificate{}, Certificates: []Certificate{},
ICECandidatePoolSize: 0, ICECandidatePoolSize: 0,
}, },
ops: newOperations(),
isClosed: &atomicBool{}, isClosed: &atomicBool{},
isNegotiationNeeded: &atomicBool{}, isNegotiationNeeded: &atomicBool{},
negotiationNeededState: negotiationNeededStateEmpty, updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
lastOffer: "", lastOffer: "",
lastAnswer: "", lastAnswer: "",
greaterMid: -1, greaterMid: -1,
@@ -137,6 +137,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
api: api, api: api,
log: api.settingEngine.LoggerFactory.NewLogger("pc"), log: api.settingEngine.LoggerFactory.NewLogger("pc"),
} }
pc.ops = newOperations(pc.updateNegotiationNeededFlagOnEmptyChain, pc.onNegotiationNeeded)
pc.iceConnectionState.Store(ICEConnectionStateNew) pc.iceConnectionState.Store(ICEConnectionStateNew)
pc.connectionState.Store(PeerConnectionStateNew) pc.connectionState.Store(PeerConnectionStateNew)
@@ -293,66 +295,54 @@ func (pc *PeerConnection) OnNegotiationNeeded(f func()) {
// onNegotiationNeeded enqueues negotiationNeededOp if necessary // onNegotiationNeeded enqueues negotiationNeededOp if necessary
// caller of this method should hold `pc.mu` lock // caller of this method should hold `pc.mu` lock
// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
func (pc *PeerConnection) onNegotiationNeeded() { func (pc *PeerConnection) onNegotiationNeeded() {
// https://w3c.github.io/webrtc-pc/#updating-the-negotiation-needed-flag // 4.7.3.1 If the length of connection.[[Operations]] is not 0, then set
// non-canon step 1 // connection.[[UpdateNegotiationNeededFlagOnEmptyChain]] to true, and abort these steps.
if pc.negotiationNeededState == negotiationNeededStateRun { if !pc.ops.IsEmpty() {
pc.negotiationNeededState = negotiationNeededStateQueue pc.updateNegotiationNeededFlagOnEmptyChain.set(true)
return
} else if pc.negotiationNeededState == negotiationNeededStateQueue {
return return
} }
pc.negotiationNeededState = negotiationNeededStateRun
pc.ops.Enqueue(pc.negotiationNeededOp) pc.ops.Enqueue(pc.negotiationNeededOp)
} }
// https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag
func (pc *PeerConnection) negotiationNeededOp() { func (pc *PeerConnection) negotiationNeededOp() {
// non-canon, reset needed state machine and run again if there was a request // 4.7.3.2.1 If connection.[[IsClosed]] is true, abort these steps.
defer func() {
pc.mu.Lock()
defer pc.mu.Unlock()
if pc.negotiationNeededState == negotiationNeededStateQueue {
defer pc.onNegotiationNeeded()
}
pc.negotiationNeededState = negotiationNeededStateEmpty
}()
// Don't run NegotiatedNeeded checks if OnNegotiationNeeded is not set
if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); !ok || handler == nil {
return
}
// https://www.w3.org/TR/webrtc/#updating-the-negotiation-needed-flag
// Step 2.1
if pc.isClosed.get() { if pc.isClosed.get() {
return return
} }
// non-canon step 2.2
// 4.7.3.2.2 If the length of connection.[[Operations]] is not 0,
// then set connection.[[UpdateNegotiationNeededFlagOnEmptyChain]] to
// true, and abort these steps.
if !pc.ops.IsEmpty() { if !pc.ops.IsEmpty() {
pc.ops.Enqueue(pc.negotiationNeededOp) pc.updateNegotiationNeededFlagOnEmptyChain.set(true)
return return
} }
// Step 2.3 // 4.7.3.2.3 If connection's signaling state is not "stable", abort these steps.
if pc.SignalingState() != SignalingStateStable { if pc.SignalingState() != SignalingStateStable {
return return
} }
// Step 2.4 // 4.7.3.2.4 If the result of checking if negotiation is needed is false,
// clear the negotiation-needed flag by setting connection.[[NegotiationNeeded]]
// to false, and abort these steps.
if !pc.checkNegotiationNeeded() { if !pc.checkNegotiationNeeded() {
pc.isNegotiationNeeded.set(false) pc.isNegotiationNeeded.set(false)
return return
} }
// Step 2.5 // 4.7.3.2.5 If connection.[[NegotiationNeeded]] is already true, abort these steps.
if pc.isNegotiationNeeded.get() { if pc.isNegotiationNeeded.get() {
return return
} }
// Step 2.6 // 4.7.3.2.6 Set connection.[[NegotiationNeeded]] to true.
pc.isNegotiationNeeded.set(true) pc.isNegotiationNeeded.set(true)
// Step 2.7 // 4.7.3.2.7 Fire an event named negotiationneeded at connection.
if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); ok && handler != nil { if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); ok && handler != nil {
handler() handler()
} }

View File

@@ -1623,41 +1623,3 @@ func TestPeerConnectionState(t *testing.T) {
assert.NoError(t, pc.Close()) assert.NoError(t, pc.Close())
assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState()) assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState())
} }
// See https://github.com/pion/webrtc/issues/2774
func TestNegotiationNeededAddedAfterOpQueueDone(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
pc, err := NewPeerConnection(Configuration{})
if err != nil {
t.Error(err.Error())
}
var wg sync.WaitGroup
wg.Add(1)
_, err = pc.CreateDataChannel("initial_data_channel", nil)
assert.NoError(t, err)
// after there are no ops left in the queue, a previously faulty version
// of negotiationNeededOp would keep the negotiation needed state in
// negotiationNeededStateQueue which will cause all subsequent
// onNegotiationNeeded calls to never queue again, only if
// OnNegotiationNeeded has not been set yet.
for !pc.ops.IsEmpty() {
time.Sleep(time.Millisecond)
}
pc.OnNegotiationNeeded(wg.Done)
_, err = pc.CreateDataChannel("another_data_channel", nil)
assert.NoError(t, err)
wg.Wait()
assert.NoError(t, pc.Close())
}

View File

@@ -84,14 +84,3 @@ func (t PeerConnectionState) String() string {
return ErrUnknownType.Error() return ErrUnknownType.Error()
} }
} }
type negotiationNeededState int
const (
// NegotiationNeededStateEmpty not running and queue is empty
negotiationNeededStateEmpty = iota
// NegotiationNeededStateEmpty running and queue is empty
negotiationNeededStateRun
// NegotiationNeededStateEmpty running and queue
negotiationNeededStateQueue
)