client, server: print number of lost packets even when using TCP (#269)

This commit is contained in:
Alessandro Ros
2023-05-02 14:30:32 +02:00
committed by GitHub
parent 3bf1725e46
commit a9a7426412
7 changed files with 140 additions and 22 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/rtcpreceiver"
"github.com/bluenviron/gortsplib/v3/pkg/rtcpsender"
"github.com/bluenviron/gortsplib/v3/pkg/rtplossdetector"
"github.com/bluenviron/gortsplib/v3/pkg/rtpreorderer"
)
@@ -17,9 +18,10 @@ type clientFormat struct {
c *Client
cm *clientMedia
format formats.Format
udpReorderer *rtpreorderer.Reorderer // play
udpRTCPReceiver *rtcpreceiver.RTCPReceiver // play
rtcpSender *rtcpsender.RTCPSender // record
udpReorderer *rtpreorderer.Reorderer // play
udpRTCPReceiver *rtcpreceiver.RTCPReceiver // play
tcpLossDetector *rtplossdetector.LossDetector // play
rtcpSender *rtcpsender.RTCPSender // record
onPacketRTP func(*rtp.Packet)
}
@@ -42,6 +44,8 @@ func (ct *clientFormat) start() {
ct.format.ClockRate(), func(pkt rtcp.Packet) {
ct.cm.writePacketRTCP(pkt)
})
} else {
ct.tcpLossDetector = rtplossdetector.New()
}
} else {
ct.rtcpSender = rtcpsender.New(
@@ -93,9 +97,16 @@ func (ct *clientFormat) writePacketRTPWithNTP(pkt *rtp.Packet, ntp time.Time) er
}
func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) {
packets, missing := ct.udpReorderer.Process(pkt)
if missing != 0 {
ct.c.OnPacketLost(fmt.Errorf("%d RTP packet(s) lost", missing))
packets, lost := ct.udpReorderer.Process(pkt)
if lost != 0 {
ct.c.OnPacketLost(fmt.Errorf("%d RTP %s lost",
lost,
func() string {
if lost == 1 {
return "packet"
}
return "packets"
}()))
// do not return
}
@@ -108,5 +119,18 @@ func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) {
}
func (ct *clientFormat) readRTPTCP(pkt *rtp.Packet) {
lost := ct.tcpLossDetector.Process(pkt)
if lost != 0 {
ct.c.OnPacketLost(fmt.Errorf("%d RTP %s lost",
lost,
func() string {
if lost == 1 {
return "packet"
}
return "packets"
}()))
// do not return
}
ct.onPacketRTP(pkt)
}

View File

@@ -3085,7 +3085,7 @@ func TestClientPlayDecodeErrors(t *testing.T) {
}(),
OnPacketLost: func(err error) {
if ca.proto == "udp" && ca.name == "rtp packets lost" {
require.EqualError(t, err, "69 RTP packet(s) lost")
require.EqualError(t, err, "69 RTP packets lost")
}
close(errorRecv)
},

View File

@@ -0,0 +1,36 @@
// Package rtplossdetector implements an algorithm that detects lost packets.
package rtplossdetector
import (
"github.com/pion/rtp"
)
// LossDetector detects lost packets.
type LossDetector struct {
initialized bool
expectedSeqNum uint16
}
// New allocates a LossDetector.
func New() *LossDetector {
return &LossDetector{}
}
// Process processes a RTP packet.
// It returns the number of lost packets.
func (r *LossDetector) Process(pkt *rtp.Packet) int {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return 0
}
if pkt.SequenceNumber != r.expectedSeqNum {
diff := pkt.SequenceNumber - r.expectedSeqNum
r.expectedSeqNum = pkt.SequenceNumber + 1
return int(diff)
}
r.expectedSeqNum = pkt.SequenceNumber + 1
return 0
}

View File

@@ -0,0 +1,33 @@
package rtplossdetector
import (
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestLossDetector(t *testing.T) {
d := New()
c := d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65530,
},
})
require.Equal(t, 0, c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65531,
},
})
require.Equal(t, 0, c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65535,
},
})
require.Equal(t, 3, c)
}

View File

@@ -29,7 +29,7 @@ func New() *Reorderer {
}
// Process processes a RTP packet.
// It returns a sequence of ordered packets and the number of missing packets.
// It returns a sequence of ordered packets and the number of lost packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
if !r.initialized {
r.initialized = true

View File

@@ -1218,7 +1218,7 @@ func TestServerRecordDecodeErrors(t *testing.T) {
},
onPacketLost: func(ctx *ServerHandlerOnPacketLostCtx) {
if ca.proto == "udp" && ca.name == "rtp packets lost" {
require.EqualError(t, ctx.Error, "69 RTP packet(s) lost")
require.EqualError(t, ctx.Error, "69 RTP packets lost")
}
close(errorRecv)
},

View File

@@ -9,6 +9,7 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/rtcpreceiver"
"github.com/bluenviron/gortsplib/v3/pkg/rtplossdetector"
"github.com/bluenviron/gortsplib/v3/pkg/rtpreorderer"
)
@@ -16,6 +17,7 @@ type serverSessionFormat struct {
sm *serverSessionMedia
format formats.Format
udpReorderer *rtpreorderer.Reorderer
tcpLossDetector *rtplossdetector.LossDetector
udpRTCPReceiver *rtcpreceiver.RTCPReceiver
onPacketRTP func(*rtp.Packet)
}
@@ -29,16 +31,19 @@ func newServerSessionFormat(sm *serverSessionMedia, forma formats.Format) *serve
}
func (sf *serverSessionFormat) start() {
if (*sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast) &&
sf.sm.ss.state != ServerSessionStatePlay {
sf.udpReorderer = rtpreorderer.New()
sf.udpRTCPReceiver = rtcpreceiver.New(
sf.sm.ss.s.udpReceiverReportPeriod,
nil,
sf.format.ClockRate(),
func(pkt rtcp.Packet) {
sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt)
})
if sf.sm.ss.state != ServerSessionStatePlay {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.udpReorderer = rtpreorderer.New()
sf.udpRTCPReceiver = rtcpreceiver.New(
sf.sm.ss.s.udpReceiverReportPeriod,
nil,
sf.format.ClockRate(),
func(pkt rtcp.Packet) {
sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt)
})
} else {
sf.tcpLossDetector = rtplossdetector.New()
}
}
}
@@ -50,9 +55,16 @@ func (sf *serverSessionFormat) stop() {
}
func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) {
packets, missing := sf.udpReorderer.Process(pkt)
if missing != 0 {
sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP packet(s) lost", missing))
packets, lost := sf.udpReorderer.Process(pkt)
if lost != 0 {
sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP %s lost",
lost,
func() string {
if lost == 1 {
return "packet"
}
return "packets"
}()))
// do not return
}
@@ -63,5 +75,18 @@ func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) {
}
func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) {
lost := sf.tcpLossDetector.Process(pkt)
if lost != 0 {
sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP %s lost",
lost,
func() string {
if lost == 1 {
return "packet"
}
return "packets"
}()))
// do not return
}
sf.onPacketRTP(pkt)
}