diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index 11e9a8a3..69d4e2b1 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -42,51 +42,77 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { return nil } - // buffer is full. clear buffer and return current packet. + // there's a missing packet and buffer is full. + // return entire buffer and clear it. if relPos >= bufferSize { + n := 1 + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + if r.buffer[p] != nil { + n++ + } + } + + ret := make([]*rtp.Packet, n) + pos := 0 + + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + if r.buffer[p] != nil { + ret[pos] = r.buffer[p] + pos++ + } + } + + ret[pos] = pkt + for i := 0; i < bufferSize; i++ { r.buffer[i] = nil } + r.expectedSeqNum = pkt.SequenceNumber + 1 - return []*rtp.Packet{pkt} + return ret } // there's a missing packet if relPos != 0 { p := (r.absPos + relPos) & (bufferSize - 1) - // current packet is a duplicate. discard. + // current packet is a duplicate. discard if r.buffer[p] != nil { return nil } - // put current packet in buffer. + // put current packet in buffer r.buffer[p] = pkt return nil } - count := uint16(1) + // all packets have been received correctly. + // return them + + n := uint16(1) for { - p := (r.absPos + count) & (bufferSize - 1) + p := (r.absPos + n) & (bufferSize - 1) if r.buffer[p] == nil { break } - count++ + n++ } - ret := make([]*rtp.Packet, count) + ret := make([]*rtp.Packet, n) ret[0] = pkt r.absPos++ r.absPos &= (bufferSize - 1) - for i := uint16(1); i < count; i++ { + for i := uint16(1); i < n; i++ { ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil r.absPos++ r.absPos &= (bufferSize - 1) } - r.expectedSeqNum = pkt.SequenceNumber + count + r.expectedSeqNum = pkt.SequenceNumber + n return ret } diff --git a/pkg/rtpreorderer/reorderer_test.go b/pkg/rtpreorderer/reorderer_test.go index 12e6614f..5400460a 100644 --- a/pkg/rtpreorderer/reorderer_test.go +++ b/pkg/rtpreorderer/reorderer_test.go @@ -155,21 +155,6 @@ func TestReorder(t *testing.T) { }, }, }, - { - // buffer is full - &rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: 67, - }, - }, - []*rtp.Packet{ - { - Header: rtp.Header{ - SequenceNumber: 67, - }, - }, - }, - }, } r := New() @@ -180,3 +165,50 @@ func TestReorder(t *testing.T) { require.Equal(t, entry.out, out) } } + +func TestBufferIsFull(t *testing.T) { + r := New() + r.absPos = 25 + + out := r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 1, + }, + }) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: 1, + }, + }}, out) + + var expected []*rtp.Packet + + for i := uint16(0); i < 63; i++ { + out := r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 3 + i, + }, + }) + require.Equal(t, []*rtp.Packet(nil), out) + + expected = append(expected, &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 3 + i, + }, + }) + } + + out = r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 3 + 64, + }, + }) + + expected = append(expected, &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 3 + 64, + }, + }) + + require.Equal(t, expected, out) +}