From a9a74264127b0b98b0b2ffd16d6ebddde4321212 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 2 May 2023 14:30:32 +0200 Subject: [PATCH] client, server: print number of lost packets even when using TCP (#269) --- client_format.go | 36 ++++++++++++++--- client_play_test.go | 2 +- pkg/rtplossdetector/lossdetector.go | 36 +++++++++++++++++ pkg/rtplossdetector/lossdetector_test.go | 33 +++++++++++++++ pkg/rtpreorderer/reorderer.go | 2 +- server_record_test.go | 2 +- server_session_format.go | 51 ++++++++++++++++++------ 7 files changed, 140 insertions(+), 22 deletions(-) create mode 100644 pkg/rtplossdetector/lossdetector.go create mode 100644 pkg/rtplossdetector/lossdetector_test.go diff --git a/client_format.go b/client_format.go index 65e3f587..cb086916 100644 --- a/client_format.go +++ b/client_format.go @@ -10,6 +10,7 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/rtcpreceiver" "github.com/bluenviron/gortsplib/v3/pkg/rtcpsender" + "github.com/bluenviron/gortsplib/v3/pkg/rtplossdetector" "github.com/bluenviron/gortsplib/v3/pkg/rtpreorderer" ) @@ -17,9 +18,10 @@ type clientFormat struct { c *Client cm *clientMedia format formats.Format - udpReorderer *rtpreorderer.Reorderer // play - udpRTCPReceiver *rtcpreceiver.RTCPReceiver // play - rtcpSender *rtcpsender.RTCPSender // record + udpReorderer *rtpreorderer.Reorderer // play + udpRTCPReceiver *rtcpreceiver.RTCPReceiver // play + tcpLossDetector *rtplossdetector.LossDetector // play + rtcpSender *rtcpsender.RTCPSender // record onPacketRTP func(*rtp.Packet) } @@ -42,6 +44,8 @@ func (ct *clientFormat) start() { ct.format.ClockRate(), func(pkt rtcp.Packet) { ct.cm.writePacketRTCP(pkt) }) + } else { + ct.tcpLossDetector = rtplossdetector.New() } } else { ct.rtcpSender = rtcpsender.New( @@ -93,9 +97,16 @@ func (ct *clientFormat) writePacketRTPWithNTP(pkt *rtp.Packet, ntp time.Time) er } func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) { - packets, missing := ct.udpReorderer.Process(pkt) - if missing != 0 { - ct.c.OnPacketLost(fmt.Errorf("%d RTP packet(s) lost", missing)) + packets, lost := ct.udpReorderer.Process(pkt) + if lost != 0 { + ct.c.OnPacketLost(fmt.Errorf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }())) // do not return } @@ -108,5 +119,18 @@ func (ct *clientFormat) readRTPUDP(pkt *rtp.Packet) { } func (ct *clientFormat) readRTPTCP(pkt *rtp.Packet) { + lost := ct.tcpLossDetector.Process(pkt) + if lost != 0 { + ct.c.OnPacketLost(fmt.Errorf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }())) + // do not return + } + ct.onPacketRTP(pkt) } diff --git a/client_play_test.go b/client_play_test.go index ef9d56fa..01460311 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -3085,7 +3085,7 @@ func TestClientPlayDecodeErrors(t *testing.T) { }(), OnPacketLost: func(err error) { if ca.proto == "udp" && ca.name == "rtp packets lost" { - require.EqualError(t, err, "69 RTP packet(s) lost") + require.EqualError(t, err, "69 RTP packets lost") } close(errorRecv) }, diff --git a/pkg/rtplossdetector/lossdetector.go b/pkg/rtplossdetector/lossdetector.go new file mode 100644 index 00000000..bb4e4bfb --- /dev/null +++ b/pkg/rtplossdetector/lossdetector.go @@ -0,0 +1,36 @@ +// Package rtplossdetector implements an algorithm that detects lost packets. +package rtplossdetector + +import ( + "github.com/pion/rtp" +) + +// LossDetector detects lost packets. +type LossDetector struct { + initialized bool + expectedSeqNum uint16 +} + +// New allocates a LossDetector. +func New() *LossDetector { + return &LossDetector{} +} + +// Process processes a RTP packet. +// It returns the number of lost packets. +func (r *LossDetector) Process(pkt *rtp.Packet) int { + if !r.initialized { + r.initialized = true + r.expectedSeqNum = pkt.SequenceNumber + 1 + return 0 + } + + if pkt.SequenceNumber != r.expectedSeqNum { + diff := pkt.SequenceNumber - r.expectedSeqNum + r.expectedSeqNum = pkt.SequenceNumber + 1 + return int(diff) + } + + r.expectedSeqNum = pkt.SequenceNumber + 1 + return 0 +} diff --git a/pkg/rtplossdetector/lossdetector_test.go b/pkg/rtplossdetector/lossdetector_test.go new file mode 100644 index 00000000..1baaebf2 --- /dev/null +++ b/pkg/rtplossdetector/lossdetector_test.go @@ -0,0 +1,33 @@ +package rtplossdetector + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestLossDetector(t *testing.T) { + d := New() + + c := d.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65530, + }, + }) + require.Equal(t, 0, c) + + c = d.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }) + require.Equal(t, 0, c) + + c = d.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65535, + }, + }) + require.Equal(t, 3, c) +} diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index e5235c22..e114ae15 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -29,7 +29,7 @@ func New() *Reorderer { } // Process processes a RTP packet. -// It returns a sequence of ordered packets and the number of missing packets. +// It returns a sequence of ordered packets and the number of lost packets. func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { if !r.initialized { r.initialized = true diff --git a/server_record_test.go b/server_record_test.go index 5d7b94b8..eba73ed0 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -1218,7 +1218,7 @@ func TestServerRecordDecodeErrors(t *testing.T) { }, onPacketLost: func(ctx *ServerHandlerOnPacketLostCtx) { if ca.proto == "udp" && ca.name == "rtp packets lost" { - require.EqualError(t, ctx.Error, "69 RTP packet(s) lost") + require.EqualError(t, ctx.Error, "69 RTP packets lost") } close(errorRecv) }, diff --git a/server_session_format.go b/server_session_format.go index a96baa16..53f4aa1f 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -9,6 +9,7 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/rtcpreceiver" + "github.com/bluenviron/gortsplib/v3/pkg/rtplossdetector" "github.com/bluenviron/gortsplib/v3/pkg/rtpreorderer" ) @@ -16,6 +17,7 @@ type serverSessionFormat struct { sm *serverSessionMedia format formats.Format udpReorderer *rtpreorderer.Reorderer + tcpLossDetector *rtplossdetector.LossDetector udpRTCPReceiver *rtcpreceiver.RTCPReceiver onPacketRTP func(*rtp.Packet) } @@ -29,16 +31,19 @@ func newServerSessionFormat(sm *serverSessionMedia, forma formats.Format) *serve } func (sf *serverSessionFormat) start() { - if (*sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast) && - sf.sm.ss.state != ServerSessionStatePlay { - sf.udpReorderer = rtpreorderer.New() - sf.udpRTCPReceiver = rtcpreceiver.New( - sf.sm.ss.s.udpReceiverReportPeriod, - nil, - sf.format.ClockRate(), - func(pkt rtcp.Packet) { - sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) - }) + if sf.sm.ss.state != ServerSessionStatePlay { + if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { + sf.udpReorderer = rtpreorderer.New() + sf.udpRTCPReceiver = rtcpreceiver.New( + sf.sm.ss.s.udpReceiverReportPeriod, + nil, + sf.format.ClockRate(), + func(pkt rtcp.Packet) { + sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) + }) + } else { + sf.tcpLossDetector = rtplossdetector.New() + } } } @@ -50,9 +55,16 @@ func (sf *serverSessionFormat) stop() { } func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { - packets, missing := sf.udpReorderer.Process(pkt) - if missing != 0 { - sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP packet(s) lost", missing)) + packets, lost := sf.udpReorderer.Process(pkt) + if lost != 0 { + sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }())) // do not return } @@ -63,5 +75,18 @@ func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) { } func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) { + lost := sf.tcpLossDetector.Process(pkt) + if lost != 0 { + sf.sm.ss.onPacketLost(fmt.Errorf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }())) + // do not return + } + sf.onPacketRTP(pkt) }