Add configurable buffer size to Reorderer (#760)

The default of 64 is ~3 seconds which may be too much for some apps.
This commit is contained in:
Josh Allmann
2025-04-18 03:06:59 -07:00
committed by GitHub
parent d162df21ec
commit 03be509422
2 changed files with 63 additions and 14 deletions

View File

@@ -6,7 +6,7 @@ import (
)
const (
bufferSize = 64
defaultBufferSize = 64
)
// Reorderer filters incoming RTP packets, in order to
@@ -18,6 +18,9 @@ type Reorderer struct {
buffer []*rtp.Packet
absPos uint16
negativeCount int
// Maximum number of packets to buffer for reordering
BufferSize int
}
// New allocates a Reorderer.
@@ -31,7 +34,10 @@ func New() *Reorderer {
// Initialize initializes a Reorderer.
func (r *Reorderer) Initialize() {
r.buffer = make([]*rtp.Packet, bufferSize)
if r.BufferSize == 0 {
r.BufferSize = defaultBufferSize
}
r.buffer = make([]*rtp.Packet, r.BufferSize)
}
// Process processes a RTP packet.
@@ -52,12 +58,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
r.negativeCount++
// stream has been resetted, therefore reset reorderer too
if r.negativeCount > bufferSize {
if r.negativeCount > r.BufferSize {
r.negativeCount = 0
// clear buffer
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
for i := uint16(0); i < uint16(r.BufferSize); i++ {
p := (r.absPos + i) & (uint16(r.BufferSize) - 1)
r.buffer[p] = nil
}
@@ -72,10 +78,10 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
// there's a missing packet and buffer is full.
// return entire buffer and clear it.
if relPos >= bufferSize {
if relPos >= int16(r.BufferSize) {
n := 1
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
for i := uint16(0); i < uint16(r.BufferSize); i++ {
p := (r.absPos + i) & (uint16(r.BufferSize) - 1)
if r.buffer[p] != nil {
n++
}
@@ -84,8 +90,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
ret := make([]*rtp.Packet, n)
pos := 0
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
for i := uint16(0); i < uint16(r.BufferSize); i++ {
p := (r.absPos + i) & (uint16(r.BufferSize) - 1)
if r.buffer[p] != nil {
ret[pos], r.buffer[p] = r.buffer[p], nil
pos++
@@ -100,7 +106,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
// there's a missing packet
if relPos != 0 {
p := (r.absPos + uint16(relPos)) & (bufferSize - 1)
p := (r.absPos + uint16(relPos)) & (uint16(r.BufferSize) - 1)
// current packet is a duplicate. discard
if r.buffer[p] != nil {
@@ -117,7 +123,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
n := uint16(1)
for {
p := (r.absPos + n) & (bufferSize - 1)
p := (r.absPos + n) & (uint16(r.BufferSize) - 1)
if r.buffer[p] == nil {
break
}
@@ -128,12 +134,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
ret[0] = pkt
r.absPos++
r.absPos &= (bufferSize - 1)
r.absPos &= (uint16(r.BufferSize) - 1)
for i := uint16(1); i < n; i++ {
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
r.absPos++
r.absPos &= (bufferSize - 1)
r.absPos &= (uint16(r.BufferSize) - 1)
}
r.expectedSeqNum = pkt.SequenceNumber + n

View File

@@ -258,3 +258,46 @@ func TestReset(t *testing.T) {
}}, out)
require.Equal(t, uint(0), missing)
}
func TestCustomBufferSize(t *testing.T) {
customSize := 128
r := &Reorderer{
BufferSize: customSize,
}
r.Initialize()
// Set absPos to an arbitrary value.
r.absPos = 10
// Process first packet; behaves as usual.
firstSeq := uint16(50)
out, missing := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: firstSeq,
},
})
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: firstSeq,
},
}}, out)
require.Equal(t, uint(0), missing)
// At this point, expectedSeqNum == firstSeq + 1 (i.e. 51).
// Now, send a packet with a gap larger than the custom buffer size.
// For BufferSize = 128, let's send a packet with SequenceNumber = 51 + 130 = 181.
nextSeq := uint16(181)
out, missing = r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: nextSeq,
},
})
// Since there are no packets buffered, n remains 1.
// relPos = 181 - 51 = 130; so missing should be 130
require.Equal(t, uint(130), missing)
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: nextSeq,
},
}}, out)
}