implement packet reordering for UDP / Multicast transports (#132)

* implement packet reordering for UDP transport

* rtcpreceiver: stop handling unordered / duplicate packets

* remote useless Sleep() from tests
This commit is contained in:
Alessandro Ros
2022-07-05 23:29:03 +02:00
committed by GitHub
parent c6221476fc
commit df7336b5e8
12 changed files with 376 additions and 152 deletions

View File

@@ -30,6 +30,7 @@ import (
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtcpsender"
"github.com/aler9/gortsplib/pkg/rtpcleaner"
"github.com/aler9/gortsplib/pkg/rtpreorderer"
"github.com/aler9/gortsplib/pkg/sdp"
"github.com/aler9/gortsplib/pkg/url"
)
@@ -93,6 +94,7 @@ type clientTrack struct {
// play
udpRTPPacketBuffer *rtpPacketMultiBuffer
udpRTCPReceiver *rtcpreceiver.RTCPReceiver
reorderer *rtpreorderer.Reorderer
cleaner *rtpcleaner.Cleaner
// record
@@ -717,8 +719,11 @@ func (c *Client) playRecordStart() {
if c.state == clientStatePlay {
for _, ct := range c.tracks {
if *c.effectiveTransport == TransportUDP || *c.effectiveTransport == TransportUDPMulticast {
ct.reorderer = rtpreorderer.New()
}
_, isH264 := ct.track.(*TrackH264)
ct.cleaner = rtpcleaner.NewCleaner(isH264, *c.effectiveTransport == TransportTCP)
ct.cleaner = rtpcleaner.New(isH264, *c.effectiveTransport == TransportTCP)
}
c.keepaliveTimer = time.NewTimer(c.keepalivePeriod)
@@ -817,7 +822,7 @@ func (c *Client) runReader() {
return err
}
out, err := track.cleaner.Clear(pkt)
out, err := track.cleaner.Process(pkt)
if err != nil {
return err
}
@@ -944,6 +949,7 @@ func (c *Client) playRecordStop(isClosing bool) {
for _, ct := range c.tracks {
ct.cleaner = nil
ct.reorderer = nil
}
// stop timers

View File

@@ -342,14 +342,12 @@ func TestClientRead(t *testing.T) {
// server -> client (RTP)
switch transport {
case "udp":
time.Sleep(1 * time.Second)
l1.WriteTo(testRTPPacketMarshaled, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
case "multicast":
time.Sleep(1 * time.Second)
l1.WriteTo(testRTPPacketMarshaled, &net.UDPAddr{
IP: net.ParseIP("224.1.0.1"),
Port: 25000,

View File

@@ -196,7 +196,10 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
return
}
out, err := u.ct.cleaner.Clear(pkt)
packets := u.ct.reorderer.Process(pkt)
for _, pkt := range packets {
out, err := u.ct.cleaner.Process(pkt)
if err != nil {
return
}
@@ -211,6 +214,7 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
H264NALUs: out0.H264NALUs,
H264PTS: out0.H264PTS,
})
}
}
func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) {

View File

@@ -151,8 +151,6 @@ func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqual
} else {
diff := int32(pkt.Header.SequenceNumber) - int32(*rr.lastSequenceNumber)
// following packet or following packet after an overflow
if diff > 0 || diff < -0x0FFF {
// overflow
if diff < -0x0FFF {
rr.sequenceNumberCycles++
@@ -193,8 +191,6 @@ func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqual
rr.lastRTPTimeTime = ts
}
}
// ignore invalid packets (diff = 0) or reordered packets (diff < 0)
}
}
// ProcessPacketRTCP extracts the needed data from RTCP packets.

View File

@@ -276,70 +276,6 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
<-done
}
func TestRTCPReceiverReorderedPackets(t *testing.T) {
done := make(chan struct{})
now = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
rr := New(500*time.Millisecond, &v, 90000, func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 0x43a7,
LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
},
},
}, pkt)
close(done)
})
defer rr.Close()
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 1287981738,
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessPacketRTCP(ts, &srPkt)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x43a7,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessPacketRTP(ts, &rtpPkt, true)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x43a6,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessPacketRTP(ts, &rtpPkt, true)
<-done
}
func TestRTCPReceiverJitter(t *testing.T) {
done := make(chan struct{})
now = func() time.Time {

View File

@@ -35,8 +35,8 @@ type Cleaner struct {
h264Encoder *rtph264.Encoder
}
// NewCleaner allocates a Cleaner.
func NewCleaner(isH264 bool, isTCP bool) *Cleaner {
// New allocates a Cleaner.
func New(isH264 bool, isTCP bool) *Cleaner {
p := &Cleaner{
isH264: isH264,
isTCP: isTCP,
@@ -120,8 +120,8 @@ func (p *Cleaner) processH264(pkt *rtp.Packet) ([]*Output, error) {
}}, nil
}
// Clear processes a RTP packet.
func (p *Cleaner) Clear(pkt *rtp.Packet) ([]*Output, error) {
// Process processes a RTP packet.
func (p *Cleaner) Process(pkt *rtp.Packet) ([]*Output, error) {
// remove padding
pkt.Header.Padding = false
pkt.PaddingSize = 0

View File

@@ -9,9 +9,9 @@ import (
)
func TestRemovePadding(t *testing.T) {
cleaner := NewCleaner(false, false)
cleaner := New(false, false)
out, err := cleaner.Clear(&rtp.Packet{
out, err := cleaner.Process(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
@@ -38,9 +38,9 @@ func TestRemovePadding(t *testing.T) {
}
func TestGenericOversized(t *testing.T) {
cleaner := NewCleaner(false, true)
cleaner := New(false, true)
_, err := cleaner.Clear(&rtp.Packet{
_, err := cleaner.Process(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
@@ -53,9 +53,9 @@ func TestGenericOversized(t *testing.T) {
}
func TestH264Oversized(t *testing.T) {
cleaner := NewCleaner(true, true)
cleaner := New(true, true)
out, err := cleaner.Clear(&rtp.Packet{
out, err := cleaner.Process(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
@@ -70,7 +70,7 @@ func TestH264Oversized(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []*Output(nil), out)
out, err = cleaner.Clear(&rtp.Packet{
out, err = cleaner.Process(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,

View File

@@ -0,0 +1,92 @@
package rtpreorderer
import (
"github.com/pion/rtp"
)
const (
bufferSize = 64
)
// Reorderer filters incoming RTP packets, in order to
// - order packets
// - remove duplicate packets
type Reorderer struct {
initialized bool
expectedSeqNum uint16
buffer []*rtp.Packet
absPos uint16
}
// New allocates a Reorderer.
func New() *Reorderer {
return &Reorderer{
buffer: make([]*rtp.Packet, bufferSize),
}
}
// Process processes a RTP packet.
func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}
}
relPos := pkt.SequenceNumber - r.expectedSeqNum
// packet is a duplicate or has been sent
// before the first packet processed by Reorderer.
// discard.
if relPos > 0xFFF {
return nil
}
// buffer is full. clear buffer and return current packet.
if relPos >= bufferSize {
for i := 0; i < bufferSize; i++ {
r.buffer[i] = nil
}
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}
}
// there's a missing packet
if relPos != 0 {
p := (r.absPos + relPos) & (bufferSize - 1)
// current packet is a duplicate. discard.
if r.buffer[p] != nil {
return nil
}
// put current packet in buffer.
r.buffer[p] = pkt
return nil
}
count := uint16(1)
for {
p := (r.absPos + count) & (bufferSize - 1)
if r.buffer[p] == nil {
break
}
count++
}
ret := make([]*rtp.Packet, count)
ret[0] = pkt
r.absPos++
r.absPos &= (bufferSize - 1)
for i := uint16(1); i < count; i++ {
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
r.absPos++
r.absPos &= (bufferSize - 1)
}
r.expectedSeqNum = pkt.SequenceNumber + count
return ret
}

View File

@@ -0,0 +1,182 @@
package rtpreorderer
import (
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestReorder(t *testing.T) {
sequence := []struct {
in *rtp.Packet
out []*rtp.Packet
}{
{
// first packet
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65530,
},
},
[]*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: 65530,
},
}},
},
{
// packet sent before first packet
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65529,
},
},
[]*rtp.Packet(nil),
},
{
// ok
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65531,
},
},
[]*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: 65531,
},
}},
},
{
// duplicated
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65531,
},
},
[]*rtp.Packet(nil),
},
{
// gap
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65535,
},
},
[]*rtp.Packet(nil),
},
{
// unordered
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65533,
PayloadType: 96,
},
},
[]*rtp.Packet(nil),
},
{
// unordered + duplicated
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65533,
PayloadType: 97,
},
},
[]*rtp.Packet(nil),
},
{
// unordered
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65532,
},
},
[]*rtp.Packet{
{
Header: rtp.Header{
SequenceNumber: 65532,
},
},
{
Header: rtp.Header{
SequenceNumber: 65533,
PayloadType: 96,
},
},
},
},
{
// unordered
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65534,
},
},
[]*rtp.Packet{
{
Header: rtp.Header{
SequenceNumber: 65534,
},
},
{
Header: rtp.Header{
SequenceNumber: 65535,
},
},
},
},
{
// overflow + gap
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1,
},
},
[]*rtp.Packet(nil),
},
{
// unordered
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 0,
},
},
[]*rtp.Packet{
{
Header: rtp.Header{
SequenceNumber: 0,
},
},
{
Header: rtp.Header{
SequenceNumber: 1,
},
},
},
},
{
// buffer is full
&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 67,
},
},
[]*rtp.Packet{
{
Header: rtp.Header{
SequenceNumber: 67,
},
},
},
},
}
r := New()
r.absPos = 40
for _, entry := range sequence {
out := r.Process(entry.in)
require.Equal(t, entry.out, out)
}
}

View File

@@ -257,7 +257,7 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error {
return err
}
out, err := sc.session.setuppedTracks[trackID].cleaner.Clear(pkt)
out, err := sc.session.setuppedTracks[trackID].cleaner.Process(pkt)
if err != nil {
return err
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtpcleaner"
"github.com/aler9/gortsplib/pkg/rtpreorderer"
"github.com/aler9/gortsplib/pkg/url"
)
@@ -151,6 +152,7 @@ type ServerSessionSetuppedTrack struct {
// publish
udpRTCPReceiver *rtcpreceiver.RTCPReceiver
reorderer *rtpreorderer.Reorderer
cleaner *rtpcleaner.Cleaner
}
@@ -974,8 +976,11 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.state = ServerSessionStateRecord
for trackID, st := range ss.setuppedTracks {
if *ss.setuppedTransport == TransportUDP {
st.reorderer = rtpreorderer.New()
}
_, isH264 := ss.announcedTracks[trackID].(*TrackH264)
st.cleaner = rtpcleaner.NewCleaner(isH264, *ss.setuppedTransport == TransportTCP)
st.cleaner = rtpcleaner.New(isH264, *ss.setuppedTransport == TransportTCP)
}
switch *ss.setuppedTransport {
@@ -1100,6 +1105,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
for _, st := range ss.setuppedTracks {
st.cleaner = nil
st.reorderer = nil
}
ss.state = ServerSessionStatePreRecord

View File

@@ -198,10 +198,13 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
return
}
packets := clientData.track.reorderer.Process(pkt)
for _, pkt := range packets {
now := time.Now()
atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix())
out, err := clientData.track.cleaner.Clear(pkt)
out, err := clientData.track.cleaner.Process(pkt)
if err != nil {
return
}
@@ -219,6 +222,7 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
H264PTS: out0.H264PTS,
})
}
}
}
func (u *serverUDPListener) processRTCP(clientData *clientData, payload []byte) {