From 03be5094221cf5351c63161881a5cf1c75d6ccbb Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Fri, 18 Apr 2025 03:06:59 -0700 Subject: [PATCH] Add configurable buffer size to Reorderer (#760) The default of 64 is ~3 seconds which may be too much for some apps. --- pkg/rtpreorderer/reorderer.go | 34 +++++++++++++---------- pkg/rtpreorderer/reorderer_test.go | 43 ++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index 00043b0f..016fe376 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -6,7 +6,7 @@ import ( ) const ( - bufferSize = 64 + defaultBufferSize = 64 ) // Reorderer filters incoming RTP packets, in order to @@ -18,6 +18,9 @@ type Reorderer struct { buffer []*rtp.Packet absPos uint16 negativeCount int + + // Maximum number of packets to buffer for reordering + BufferSize int } // New allocates a Reorderer. @@ -31,7 +34,10 @@ func New() *Reorderer { // Initialize initializes a Reorderer. func (r *Reorderer) Initialize() { - r.buffer = make([]*rtp.Packet, bufferSize) + if r.BufferSize == 0 { + r.BufferSize = defaultBufferSize + } + r.buffer = make([]*rtp.Packet, r.BufferSize) } // Process processes a RTP packet. @@ -52,12 +58,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { r.negativeCount++ // stream has been resetted, therefore reset reorderer too - if r.negativeCount > bufferSize { + if r.negativeCount > r.BufferSize { r.negativeCount = 0 // clear buffer - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) + for i := uint16(0); i < uint16(r.BufferSize); i++ { + p := (r.absPos + i) & (uint16(r.BufferSize) - 1) r.buffer[p] = nil } @@ -72,10 +78,10 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { // there's a missing packet and buffer is full. // return entire buffer and clear it. - if relPos >= bufferSize { + if relPos >= int16(r.BufferSize) { n := 1 - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) + for i := uint16(0); i < uint16(r.BufferSize); i++ { + p := (r.absPos + i) & (uint16(r.BufferSize) - 1) if r.buffer[p] != nil { n++ } @@ -84,8 +90,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { ret := make([]*rtp.Packet, n) pos := 0 - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) + for i := uint16(0); i < uint16(r.BufferSize); i++ { + p := (r.absPos + i) & (uint16(r.BufferSize) - 1) if r.buffer[p] != nil { ret[pos], r.buffer[p] = r.buffer[p], nil pos++ @@ -100,7 +106,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { // there's a missing packet if relPos != 0 { - p := (r.absPos + uint16(relPos)) & (bufferSize - 1) + p := (r.absPos + uint16(relPos)) & (uint16(r.BufferSize) - 1) // current packet is a duplicate. discard if r.buffer[p] != nil { @@ -117,7 +123,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { n := uint16(1) for { - p := (r.absPos + n) & (bufferSize - 1) + p := (r.absPos + n) & (uint16(r.BufferSize) - 1) if r.buffer[p] == nil { break } @@ -128,12 +134,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { ret[0] = pkt r.absPos++ - r.absPos &= (bufferSize - 1) + r.absPos &= (uint16(r.BufferSize) - 1) for i := uint16(1); i < n; i++ { ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil r.absPos++ - r.absPos &= (bufferSize - 1) + r.absPos &= (uint16(r.BufferSize) - 1) } r.expectedSeqNum = pkt.SequenceNumber + n diff --git a/pkg/rtpreorderer/reorderer_test.go b/pkg/rtpreorderer/reorderer_test.go index c80af535..3dfb3d26 100644 --- a/pkg/rtpreorderer/reorderer_test.go +++ b/pkg/rtpreorderer/reorderer_test.go @@ -258,3 +258,46 @@ func TestReset(t *testing.T) { }}, out) require.Equal(t, uint(0), missing) } + +func TestCustomBufferSize(t *testing.T) { + customSize := 128 + r := &Reorderer{ + BufferSize: customSize, + } + r.Initialize() + + // Set absPos to an arbitrary value. + r.absPos = 10 + + // Process first packet; behaves as usual. + firstSeq := uint16(50) + out, missing := r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: firstSeq, + }, + }) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: firstSeq, + }, + }}, out) + require.Equal(t, uint(0), missing) + + // At this point, expectedSeqNum == firstSeq + 1 (i.e. 51). + // Now, send a packet with a gap larger than the custom buffer size. + // For BufferSize = 128, let's send a packet with SequenceNumber = 51 + 130 = 181. + nextSeq := uint16(181) + out, missing = r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: nextSeq, + }, + }) + // Since there are no packets buffered, n remains 1. + // relPos = 181 - 51 = 130; so missing should be 130 + require.Equal(t, uint(130), missing) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: nextSeq, + }, + }}, out) +}