From e8fde26d5524b9998e7464ed7c7f8643528604d7 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 31 Oct 2022 15:05:42 +0100 Subject: [PATCH] return error in OnDecodeError when there are lost RTP packets --- client_read_test.go | 26 +++++++++++++++++++++++++- clientudpl.go | 7 ++++++- pkg/rtpreorderer/reorderer.go | 22 ++++++++++++---------- pkg/rtpreorderer/reorderer_test.go | 30 +++++++++++++++++++----------- server_publish_test.go | 26 +++++++++++++++++++++++++- serverudpl.go | 11 ++++++++++- 6 files changed, 97 insertions(+), 25 deletions(-) diff --git a/client_read_test.go b/client_read_test.go index 46d5a4b0..e6feab0b 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -2717,6 +2717,7 @@ func TestClientReadDecodeErrors(t *testing.T) { for _, ca := range []string{ "invalid rtp", "invalid rtcp", + "packets lost", } { t.Run(ca, func(t *testing.T) { errorRecv := make(chan struct{}) @@ -2819,7 +2820,7 @@ func TestClientReadDecodeErrors(t *testing.T) { }) require.NoError(t, err) - switch ca { + switch ca { //nolint:dupl case "invalid rtp": l1.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), @@ -2831,6 +2832,27 @@ func TestClientReadDecodeErrors(t *testing.T) { IP: net.ParseIP("127.0.0.1"), Port: th.ClientPorts[1], }) + + case "packets lost": + byts, _ := rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 30, + }, + }.Marshal() + l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ClientPorts[0], + }) + + byts, _ = rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 100, + }, + }.Marshal() + l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ClientPorts[0], + }) } req, err = conn.ReadRequest() @@ -2855,6 +2877,8 @@ func TestClientReadDecodeErrors(t *testing.T) { require.EqualError(t, err, "RTP header size insufficient: 2 < 4") case "invalid rtcp": require.EqualError(t, err, "rtcp: packet too short") + case "packets lost": + require.EqualError(t, err, "69 RTP packet(s) lost") } close(errorRecv) }, diff --git a/clientudpl.go b/clientudpl.go index 182b3663..d32d20ac 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -2,6 +2,7 @@ package gortsplib import ( "crypto/rand" + "fmt" "net" "strconv" "sync/atomic" @@ -197,7 +198,11 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { return } - packets := u.ct.reorderer.Process(pkt) + packets, missing := u.ct.reorderer.Process(pkt) + + if missing != 0 { + u.c.OnDecodeError(fmt.Errorf("%d RTP packet(s) lost", missing)) + } for _, pkt := range packets { out, err := u.ct.cleaner.Process(pkt) diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index 3a32965f..2f78cdd8 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -6,7 +6,8 @@ import ( ) const ( - bufferSize = 64 + bufferSize = 64 + negativeThreshold = 0xFFFF / 2 ) // Reorderer filters incoming RTP packets, in order to @@ -27,11 +28,12 @@ func New() *Reorderer { } // Process processes a RTP packet. -func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { +// It returns a sequence of ordered packets and the number of missing packets. +func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { if !r.initialized { r.initialized = true r.expectedSeqNum = pkt.SequenceNumber + 1 - return []*rtp.Packet{pkt} + return []*rtp.Packet{pkt}, 0 } relPos := pkt.SequenceNumber - r.expectedSeqNum @@ -39,8 +41,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { // packet is a duplicate or has been sent // before the first packet processed by Reorderer. // discard. - if relPos > 0xFFF { - return nil + if relPos > negativeThreshold { + return nil, 0 } // there's a missing packet and buffer is full. @@ -72,7 +74,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { } r.expectedSeqNum = pkt.SequenceNumber + 1 - return ret + return ret, int(relPos) - n + 1 } // there's a missing packet @@ -81,12 +83,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { // current packet is a duplicate. discard if r.buffer[p] != nil { - return nil + return nil, 0 } // put current packet in buffer r.buffer[p] = pkt - return nil + return nil, 0 } // all packets have been received correctly. @@ -102,8 +104,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { } ret := make([]*rtp.Packet, n) - ret[0] = pkt + ret[0] = pkt r.absPos++ r.absPos &= (bufferSize - 1) @@ -115,5 +117,5 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet { r.expectedSeqNum = pkt.SequenceNumber + n - return ret + return ret, 0 } diff --git a/pkg/rtpreorderer/reorderer_test.go b/pkg/rtpreorderer/reorderer_test.go index 5400460a..bc4a9b68 100644 --- a/pkg/rtpreorderer/reorderer_test.go +++ b/pkg/rtpreorderer/reorderer_test.go @@ -161,52 +161,60 @@ func TestReorder(t *testing.T) { r.absPos = 40 for _, entry := range sequence { - out := r.Process(entry.in) + out, missing := r.Process(entry.in) require.Equal(t, entry.out, out) + require.Equal(t, 0, missing) } } func TestBufferIsFull(t *testing.T) { r := New() r.absPos = 25 + sn := uint16(1564) + toMiss := 34 - out := r.Process(&rtp.Packet{ + out, missing := r.Process(&rtp.Packet{ Header: rtp.Header{ - SequenceNumber: 1, + SequenceNumber: sn, }, }) require.Equal(t, []*rtp.Packet{{ Header: rtp.Header{ - SequenceNumber: 1, + SequenceNumber: sn, }, }}, out) + require.Equal(t, 0, missing) + sn++ var expected []*rtp.Packet - for i := uint16(0); i < 63; i++ { - out := r.Process(&rtp.Packet{ + for i := 0; i < 64-toMiss; i++ { + out, missing := r.Process(&rtp.Packet{ Header: rtp.Header{ - SequenceNumber: 3 + i, + SequenceNumber: sn + uint16(toMiss), }, }) require.Equal(t, []*rtp.Packet(nil), out) + require.Equal(t, 0, missing) expected = append(expected, &rtp.Packet{ Header: rtp.Header{ - SequenceNumber: 3 + i, + SequenceNumber: sn + uint16(toMiss), }, }) + sn++ } - out = r.Process(&rtp.Packet{ + out, missing = r.Process(&rtp.Packet{ Header: rtp.Header{ - SequenceNumber: 3 + 64, + SequenceNumber: sn + uint16(toMiss), }, }) + require.Equal(t, toMiss, missing) expected = append(expected, &rtp.Packet{ Header: rtp.Header{ - SequenceNumber: 3 + 64, + SequenceNumber: sn + uint16(toMiss), }, }) diff --git a/server_publish_test.go b/server_publish_test.go index 9dbf7ac4..cc0069dc 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -1479,6 +1479,7 @@ func TestServerPublishDecodeErrors(t *testing.T) { for _, ca := range []string{ "invalid rtp", "invalid rtcp", + "packets lost", } { t.Run(ca, func(t *testing.T) { errorRecv := make(chan struct{}) @@ -1506,6 +1507,8 @@ func TestServerPublishDecodeErrors(t *testing.T) { require.EqualError(t, ctx.Error, "RTP header size insufficient: 2 < 4") case "invalid rtcp": require.EqualError(t, ctx.Error, "rtcp: packet too short") + case "packets lost": + require.EqualError(t, ctx.Error, "69 RTP packet(s) lost") } close(errorRecv) }, @@ -1594,7 +1597,7 @@ func TestServerPublishDecodeErrors(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - switch ca { + switch ca { //nolint:dupl case "invalid rtp": l1.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), @@ -1606,6 +1609,27 @@ func TestServerPublishDecodeErrors(t *testing.T) { IP: net.ParseIP("127.0.0.1"), Port: resTH.ServerPorts[1], }) + + case "packets lost": + byts, _ := rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 30, + }, + }.Marshal() + l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: resTH.ServerPorts[0], + }) + + byts, _ = rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 100, + }, + }.Marshal() + l1.WriteTo(byts, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: resTH.ServerPorts[0], + }) } <-errorRecv diff --git a/serverudpl.go b/serverudpl.go index 6c8e48b7..a577ad43 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -204,7 +204,16 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) { return } - packets := clientData.track.reorderer.Process(pkt) + packets, missing := clientData.track.reorderer.Process(pkt) + + if missing != 0 { + if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok { + h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{ + Session: clientData.ss, + Error: fmt.Errorf("%d RTP packet(s) lost", missing), + }) + } + } for _, pkt := range packets { now := time.Now()