From 34545becc33d7c127119a43e5fbd4afa1b1f92a1 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 14 Nov 2022 18:39:11 +0100 Subject: [PATCH] rtpreorder: do not freeze in case the stream timestamp resets --- pkg/rtpreorderer/reorderer.go | 19 ++++++++++++++++ pkg/rtpreorderer/reorderer_test.go | 35 ++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index ee437abe..e5235c22 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -18,6 +18,7 @@ type Reorderer struct { expectedSeqNum uint16 buffer []*rtp.Packet absPos uint16 + negativeCount int } // New allocates a Reorderer. @@ -42,8 +43,26 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { // before the first packet processed by Reorderer. // discard. if relPos > negativeThreshold { + r.negativeCount++ + + // stream has been resetted, therefore reset reorderer too + if r.negativeCount > bufferSize { + r.negativeCount = 0 + + // clear buffer + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + r.buffer[p] = nil + } + + // reset position + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt}, 0 + } + return nil, 0 } + r.negativeCount = 0 // there's a missing packet and buffer is full. // return entire buffer and clear it. diff --git a/pkg/rtpreorderer/reorderer_test.go b/pkg/rtpreorderer/reorderer_test.go index bc4a9b68..8017de8f 100644 --- a/pkg/rtpreorderer/reorderer_test.go +++ b/pkg/rtpreorderer/reorderer_test.go @@ -220,3 +220,38 @@ func TestBufferIsFull(t *testing.T) { require.Equal(t, expected, out) } + +func TestReset(t *testing.T) { + r := New() + sn := uint16(1234) + + r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }) + + sn = 0xF234 + for i := 0; i < 64; i++ { + out, missing := r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }) + require.Equal(t, []*rtp.Packet(nil), out) + require.Equal(t, 0, missing) + sn++ + } + + out, missing := r.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }}, out) + require.Equal(t, 0, missing) +}