return error in OnDecodeError when there are lost RTP packets

This commit is contained in:
aler9
2022-10-31 15:05:42 +01:00
parent 30e029011b
commit e8fde26d55
6 changed files with 97 additions and 25 deletions

View File

@@ -2717,6 +2717,7 @@ func TestClientReadDecodeErrors(t *testing.T) {
for _, ca := range []string{
"invalid rtp",
"invalid rtcp",
"packets lost",
} {
t.Run(ca, func(t *testing.T) {
errorRecv := make(chan struct{})
@@ -2819,7 +2820,7 @@ func TestClientReadDecodeErrors(t *testing.T) {
})
require.NoError(t, err)
switch ca {
switch ca { //nolint:dupl
case "invalid rtp":
l1.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
@@ -2831,6 +2832,27 @@ func TestClientReadDecodeErrors(t *testing.T) {
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[1],
})
case "packets lost":
byts, _ := rtp.Packet{
Header: rtp.Header{
SequenceNumber: 30,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
byts, _ = rtp.Packet{
Header: rtp.Header{
SequenceNumber: 100,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
}
req, err = conn.ReadRequest()
@@ -2855,6 +2877,8 @@ func TestClientReadDecodeErrors(t *testing.T) {
require.EqualError(t, err, "RTP header size insufficient: 2 < 4")
case "invalid rtcp":
require.EqualError(t, err, "rtcp: packet too short")
case "packets lost":
require.EqualError(t, err, "69 RTP packet(s) lost")
}
close(errorRecv)
},

View File

@@ -2,6 +2,7 @@ package gortsplib
import (
"crypto/rand"
"fmt"
"net"
"strconv"
"sync/atomic"
@@ -197,7 +198,11 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
return
}
packets := u.ct.reorderer.Process(pkt)
packets, missing := u.ct.reorderer.Process(pkt)
if missing != 0 {
u.c.OnDecodeError(fmt.Errorf("%d RTP packet(s) lost", missing))
}
for _, pkt := range packets {
out, err := u.ct.cleaner.Process(pkt)

View File

@@ -6,7 +6,8 @@ import (
)
const (
bufferSize = 64
bufferSize = 64
negativeThreshold = 0xFFFF / 2
)
// Reorderer filters incoming RTP packets, in order to
@@ -27,11 +28,12 @@ func New() *Reorderer {
}
// Process processes a RTP packet.
func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
// It returns a sequence of ordered packets and the number of missing packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}
return []*rtp.Packet{pkt}, 0
}
relPos := pkt.SequenceNumber - r.expectedSeqNum
@@ -39,8 +41,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
// packet is a duplicate or has been sent
// before the first packet processed by Reorderer.
// discard.
if relPos > 0xFFF {
return nil
if relPos > negativeThreshold {
return nil, 0
}
// there's a missing packet and buffer is full.
@@ -72,7 +74,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
}
r.expectedSeqNum = pkt.SequenceNumber + 1
return ret
return ret, int(relPos) - n + 1
}
// there's a missing packet
@@ -81,12 +83,12 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
// current packet is a duplicate. discard
if r.buffer[p] != nil {
return nil
return nil, 0
}
// put current packet in buffer
r.buffer[p] = pkt
return nil
return nil, 0
}
// all packets have been received correctly.
@@ -102,8 +104,8 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
}
ret := make([]*rtp.Packet, n)
ret[0] = pkt
ret[0] = pkt
r.absPos++
r.absPos &= (bufferSize - 1)
@@ -115,5 +117,5 @@ func (r *Reorderer) Process(pkt *rtp.Packet) []*rtp.Packet {
r.expectedSeqNum = pkt.SequenceNumber + n
return ret
return ret, 0
}

View File

@@ -161,52 +161,60 @@ func TestReorder(t *testing.T) {
r.absPos = 40
for _, entry := range sequence {
out := r.Process(entry.in)
out, missing := r.Process(entry.in)
require.Equal(t, entry.out, out)
require.Equal(t, 0, missing)
}
}
func TestBufferIsFull(t *testing.T) {
r := New()
r.absPos = 25
sn := uint16(1564)
toMiss := 34
out := r.Process(&rtp.Packet{
out, missing := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1,
SequenceNumber: sn,
},
})
require.Equal(t, []*rtp.Packet{{
Header: rtp.Header{
SequenceNumber: 1,
SequenceNumber: sn,
},
}}, out)
require.Equal(t, 0, missing)
sn++
var expected []*rtp.Packet
for i := uint16(0); i < 63; i++ {
out := r.Process(&rtp.Packet{
for i := 0; i < 64-toMiss; i++ {
out, missing := r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + i,
SequenceNumber: sn + uint16(toMiss),
},
})
require.Equal(t, []*rtp.Packet(nil), out)
require.Equal(t, 0, missing)
expected = append(expected, &rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + i,
SequenceNumber: sn + uint16(toMiss),
},
})
sn++
}
out = r.Process(&rtp.Packet{
out, missing = r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + 64,
SequenceNumber: sn + uint16(toMiss),
},
})
require.Equal(t, toMiss, missing)
expected = append(expected, &rtp.Packet{
Header: rtp.Header{
SequenceNumber: 3 + 64,
SequenceNumber: sn + uint16(toMiss),
},
})

View File

@@ -1479,6 +1479,7 @@ func TestServerPublishDecodeErrors(t *testing.T) {
for _, ca := range []string{
"invalid rtp",
"invalid rtcp",
"packets lost",
} {
t.Run(ca, func(t *testing.T) {
errorRecv := make(chan struct{})
@@ -1506,6 +1507,8 @@ func TestServerPublishDecodeErrors(t *testing.T) {
require.EqualError(t, ctx.Error, "RTP header size insufficient: 2 < 4")
case "invalid rtcp":
require.EqualError(t, ctx.Error, "rtcp: packet too short")
case "packets lost":
require.EqualError(t, ctx.Error, "69 RTP packet(s) lost")
}
close(errorRecv)
},
@@ -1594,7 +1597,7 @@ func TestServerPublishDecodeErrors(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
switch ca {
switch ca { //nolint:dupl
case "invalid rtp":
l1.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
@@ -1606,6 +1609,27 @@ func TestServerPublishDecodeErrors(t *testing.T) {
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[1],
})
case "packets lost":
byts, _ := rtp.Packet{
Header: rtp.Header{
SequenceNumber: 30,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[0],
})
byts, _ = rtp.Packet{
Header: rtp.Header{
SequenceNumber: 100,
},
}.Marshal()
l1.WriteTo(byts, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[0],
})
}
<-errorRecv

View File

@@ -204,7 +204,16 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) {
return
}
packets := clientData.track.reorderer.Process(pkt)
packets, missing := clientData.track.reorderer.Process(pkt)
if missing != 0 {
if h, ok := clientData.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: clientData.ss,
Error: fmt.Errorf("%d RTP packet(s) lost", missing),
})
}
}
for _, pkt := range packets {
now := time.Now()