From 634a904ba9d5e0a71ad62b33d4a2983bd9599104 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Wed, 10 Sep 2025 13:37:12 -0700 Subject: [PATCH] Fire OnBufferedAmountLow in a goroutine If a user blocks this routine it would stop inbound message handling in SCTP. To reduce the sharp edge fire it in a goroutine so users don't need to worry about blocking. `data-channels-flow-control` exhibited no differences in throughput from this change. Resolves #846 --- datachannel.go | 8 +++- datachannel_go_test.go | 48 +++++++++++++++++++-- examples/data-channels-flow-control/main.go | 2 - 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/datachannel.go b/datachannel.go index c8cf50da..dbaa88c8 100644 --- a/datachannel.go +++ b/datachannel.go @@ -707,9 +707,13 @@ func (d *DataChannel) OnBufferedAmountLow(f func()) { d.mu.Lock() defer d.mu.Unlock() - d.onBufferedAmountLow = f + d.onBufferedAmountLow = func() { + go f() + } if d.dataChannel != nil { - d.dataChannel.OnBufferedAmountLow(f) + d.dataChannel.OnBufferedAmountLow(func() { + go f() + }) } } diff --git a/datachannel_go_test.go b/datachannel_go_test.go index 6d4f4edc..82ea0754 100644 --- a/datachannel_go_test.go +++ b/datachannel_go_test.go @@ -277,7 +277,7 @@ func TestDataChannelBufferedAmount(t *testing.T) { //nolint:cyclop report := test.CheckRoutines(t) defer report() - var nCbs int + var nCbs uint32 buf := make([]byte, 1000) offerPC, answerPC, err := newPair() @@ -315,7 +315,7 @@ func TestDataChannelBufferedAmount(t *testing.T) { //nolint:cyclop dc.SetBufferedAmountLowThreshold(1500) // The callback function should directly be passed to sctp dc.OnBufferedAmountLow(func() { - nCbs++ + atomic.AddUint32(&nCbs, 1) }) for i := 0; i < 10; i++ { @@ -332,7 +332,7 @@ func TestDataChannelBufferedAmount(t *testing.T) { //nolint:cyclop closePair(t, offerPC, answerPC, done) - assert.True(t, nCbs > 0, "callback should be made at least once") + assert.True(t, atomic.LoadUint32(&nCbs) > 0, "callback should be made at least once") }) } @@ -813,3 +813,45 @@ func TestDataChannelMessageSize(t *testing.T) { <-messagesSent.Done() closePairNow(t, offerPC, answerPC) } + +func TestOnBufferedAmountLowDeadlock(t *testing.T) { + offerPC, answerPC, err := newPair() + assert.NoError(t, err) + + offerDataChannel, err := offerPC.CreateDataChannel("", nil) + assert.NoError(t, err) + + assert.NoError(t, signalPair(offerPC, answerPC)) + + gotAllMessages, gotAllMessagesCancel := context.WithCancel(context.Background()) + offerDataChannel.OnOpen(func() { + for { + select { + case <-gotAllMessages.Done(): + return + case <-time.After(5 * time.Millisecond): + assert.NoError(t, offerDataChannel.Send([]byte{0xBE, 0xEF})) + } + } + }) + + answerPC.OnDataChannel(func(dataChannel *DataChannel) { + dataChannel.SetBufferedAmountLowThreshold(1) + + var onBufferedAmountLowFired atomic.Bool + dataChannel.OnBufferedAmountLow(func() { + onBufferedAmountLowFired.Store(true) + <-gotAllMessages.Done() + }) + + var onMessageCount uint32 + dataChannel.OnMessage(func(msg DataChannelMessage) { + if onBufferedAmountLowFired.Load() && atomic.AddUint32(&onMessageCount, 1) == 10 { + gotAllMessagesCancel() + } + }) + }) + + <-gotAllMessages.Done() + closePairNow(t, offerPC, answerPC) +} diff --git a/examples/data-channels-flow-control/main.go b/examples/data-channels-flow-control/main.go index 38d8ba93..052dec15 100644 --- a/examples/data-channels-flow-control/main.go +++ b/examples/data-channels-flow-control/main.go @@ -86,8 +86,6 @@ func createOfferer() *webrtc.PeerConnection { // This callback is made when the current bufferedAmount becomes lower than the threshold dataChannel.OnBufferedAmountLow(func() { - // Make sure to not block this channel or perform long running operations in this callback - // This callback is executed by pion/sctp. If this callback is blocking it will stop operations select { case sendMoreCh <- struct{}{}: default: