Fire OnBufferedAmountLow in a goroutine

If a user blocks this routine it would stop inbound message handling in
SCTP. To reduce the sharp edge fire it in a goroutine so users don't
need to worry about blocking.

`data-channels-flow-control` exhibited no differences in throughput from
this change.

Resolves #846
This commit is contained in:
Sean DuBois
2025-09-10 13:37:12 -07:00
parent 1527bfa2e3
commit 634a904ba9
3 changed files with 51 additions and 7 deletions

View File

@@ -707,9 +707,13 @@ func (d *DataChannel) OnBufferedAmountLow(f func()) {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
d.onBufferedAmountLow = f d.onBufferedAmountLow = func() {
go f()
}
if d.dataChannel != nil { if d.dataChannel != nil {
d.dataChannel.OnBufferedAmountLow(f) d.dataChannel.OnBufferedAmountLow(func() {
go f()
})
} }
} }

View File

@@ -277,7 +277,7 @@ func TestDataChannelBufferedAmount(t *testing.T) { //nolint:cyclop
report := test.CheckRoutines(t) report := test.CheckRoutines(t)
defer report() defer report()
var nCbs int var nCbs uint32
buf := make([]byte, 1000) buf := make([]byte, 1000)
offerPC, answerPC, err := newPair() offerPC, answerPC, err := newPair()
@@ -315,7 +315,7 @@ func TestDataChannelBufferedAmount(t *testing.T) { //nolint:cyclop
dc.SetBufferedAmountLowThreshold(1500) dc.SetBufferedAmountLowThreshold(1500)
// The callback function should directly be passed to sctp // The callback function should directly be passed to sctp
dc.OnBufferedAmountLow(func() { dc.OnBufferedAmountLow(func() {
nCbs++ atomic.AddUint32(&nCbs, 1)
}) })
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@@ -332,7 +332,7 @@ func TestDataChannelBufferedAmount(t *testing.T) { //nolint:cyclop
closePair(t, offerPC, answerPC, done) closePair(t, offerPC, answerPC, done)
assert.True(t, nCbs > 0, "callback should be made at least once") assert.True(t, atomic.LoadUint32(&nCbs) > 0, "callback should be made at least once")
}) })
} }
@@ -813,3 +813,45 @@ func TestDataChannelMessageSize(t *testing.T) {
<-messagesSent.Done() <-messagesSent.Done()
closePairNow(t, offerPC, answerPC) closePairNow(t, offerPC, answerPC)
} }
func TestOnBufferedAmountLowDeadlock(t *testing.T) {
offerPC, answerPC, err := newPair()
assert.NoError(t, err)
offerDataChannel, err := offerPC.CreateDataChannel("", nil)
assert.NoError(t, err)
assert.NoError(t, signalPair(offerPC, answerPC))
gotAllMessages, gotAllMessagesCancel := context.WithCancel(context.Background())
offerDataChannel.OnOpen(func() {
for {
select {
case <-gotAllMessages.Done():
return
case <-time.After(5 * time.Millisecond):
assert.NoError(t, offerDataChannel.Send([]byte{0xBE, 0xEF}))
}
}
})
answerPC.OnDataChannel(func(dataChannel *DataChannel) {
dataChannel.SetBufferedAmountLowThreshold(1)
var onBufferedAmountLowFired atomic.Bool
dataChannel.OnBufferedAmountLow(func() {
onBufferedAmountLowFired.Store(true)
<-gotAllMessages.Done()
})
var onMessageCount uint32
dataChannel.OnMessage(func(msg DataChannelMessage) {
if onBufferedAmountLowFired.Load() && atomic.AddUint32(&onMessageCount, 1) == 10 {
gotAllMessagesCancel()
}
})
})
<-gotAllMessages.Done()
closePairNow(t, offerPC, answerPC)
}

View File

@@ -86,8 +86,6 @@ func createOfferer() *webrtc.PeerConnection {
// This callback is made when the current bufferedAmount becomes lower than the threshold // This callback is made when the current bufferedAmount becomes lower than the threshold
dataChannel.OnBufferedAmountLow(func() { dataChannel.OnBufferedAmountLow(func() {
// Make sure to not block this channel or perform long running operations in this callback
// This callback is executed by pion/sctp. If this callback is blocking it will stop operations
select { select {
case sendMoreCh <- struct{}{}: case sendMoreCh <- struct{}{}:
default: default: