rtpreorderer: return all buffered packets when the buffer is full

(https://github.com/aler9/rtsp-simple-server/issues/1049)
This commit is contained in:
aler9
2022-07-23 13:10:20 +02:00
parent 937b36cfb1
commit 2d5211f734
2 changed files with 83 additions and 25 deletions

View File

@@ -42,51 +42,77 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
return nil return nil
} }
// buffer is full. clear buffer and return current packet. // there's a missing packet and buffer is full.
// return entire buffer and clear it.
if relPos >= bufferSize { if relPos >= bufferSize {
n := 1
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
if r.buffer[p] != nil {
n++
}
}
ret := make([]*rtp.Packet, n)
pos := 0
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
if r.buffer[p] != nil {
ret[pos] = r.buffer[p]
pos++
}
}
ret[pos] = pkt
for i := 0; i < bufferSize; i++ { for i := 0; i < bufferSize; i++ {
r.buffer[i] = nil r.buffer[i] = nil
} }
r.expectedSeqNum = pkt.SequenceNumber + 1 r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt} return ret
} }
// there's a missing packet // there's a missing packet
if relPos != 0 { if relPos != 0 {
p := (r.absPos + relPos) & (bufferSize - 1) p := (r.absPos + relPos) & (bufferSize - 1)
// current packet is a duplicate. discard. // current packet is a duplicate. discard
if r.buffer[p] != nil { if r.buffer[p] != nil {
return nil return nil
} }
// put current packet in buffer. // put current packet in buffer
r.buffer[p] = pkt r.buffer[p] = pkt
return nil return nil
} }
count := uint16(1) // all packets have been received correctly.
// return them
n := uint16(1)
for { for {
p := (r.absPos + count) & (bufferSize - 1) p := (r.absPos + n) & (bufferSize - 1)
if r.buffer[p] == nil { if r.buffer[p] == nil {
break break
} }
count++ n++
} }
ret := make([]*rtp.Packet, count) ret := make([]*rtp.Packet, n)
ret[0] = pkt ret[0] = pkt
r.absPos++ r.absPos++
r.absPos &= (bufferSize - 1) r.absPos &= (bufferSize - 1)
for i := uint16(1); i < count; i++ { for i := uint16(1); i < n; i++ {
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
r.absPos++ r.absPos++
r.absPos &= (bufferSize - 1) r.absPos &= (bufferSize - 1)
} }
r.expectedSeqNum = pkt.SequenceNumber + count r.expectedSeqNum = pkt.SequenceNumber + n
return ret return ret
} }

View File

@@ -155,21 +155,6 @@ func TestReorder(t *testing.T) {
}, },
}, },
}, },
{
// buffer is full
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 67,
},
},
[]*rtp.Packet{
{
Header: rtp.Header{
SequenceNumber: 67,
},
},
},
},
} }
r := New() r := New()
@@ -180,3 +165,50 @@ func TestReorder(t *testing.T) {
require.Equal(t, entry.out, out) require.Equal(t, entry.out, out)
} }
} }
func TestBufferIsFull(t *testing.T) {
r := New()
r.absPos = 25
out := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1,
},
})
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: 1,
},
}}, out)
var expected []*rtp.Packet
for i := uint16(0); i < 63; i++ {
out := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + i,
},
})
require.Equal(t, []*rtp.Packet(nil), out)
expected = append(expected, &rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + i,
},
})
}
out = r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + 64,
},
})
expected = append(expected, &rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + 64,
},
})
require.Equal(t, expected, out)
}