From c0c275e6a6c448e90c8850a58e89dacd0ddcd9e5 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 24 Mar 2025 16:39:55 +0100 Subject: [PATCH] expose number of lost packets without passing through an error (#735) --- client.go | 29 +++++++++++++++++-- client_format.go | 7 ++--- internal/rtplossdetector/lossdetector.go | 4 +-- internal/rtplossdetector/lossdetector_test.go | 6 ++-- internal/rtpreorderer/reorderer.go | 4 +-- internal/rtpreorderer/reorderer_test.go | 14 ++++----- pkg/format/opus.go | 1 + pkg/liberrors/client.go | 2 ++ pkg/liberrors/server.go | 2 ++ pkg/rtplossdetector/lossdetector.go | 2 +- pkg/rtpreorderer/reorderer.go | 3 +- server_handler.go | 6 +++- server_session_format.go | 16 +++++++--- 13 files changed, 68 insertions(+), 28 deletions(-) diff --git a/client.go b/client.go index 41eb1531..bdfad137 100644 --- a/client.go +++ b/client.go @@ -217,8 +217,13 @@ type ClientOnResponseFunc func(*base.Response) type ClientOnTransportSwitchFunc func(err error) // ClientOnPacketLostFunc is the prototype of Client.OnPacketLost. +// +// Deprecated: replaced by ClientOnPacketLost2Func type ClientOnPacketLostFunc func(err error) +// ClientOnPacketLost2Func is the prototype of Client.OnPacketLost2. +type ClientOnPacketLost2Func func(lost uint64) + // ClientOnDecodeErrorFunc is the prototype of Client.OnDecodeError. type ClientOnDecodeErrorFunc func(err error) @@ -276,9 +281,11 @@ type Client struct { // explicitly request back channels to the server. RequestBackChannels bool // pointer to a variable that stores received bytes. + // // Deprecated: use Client.Stats() BytesReceived *uint64 // pointer to a variable that stores sent bytes. + // // Deprecated: use Client.Stats() BytesSent *uint64 @@ -306,7 +313,11 @@ type Client struct { // called when the transport protocol changes. OnTransportSwitch ClientOnTransportSwitchFunc // called when the client detects lost packets. + // + // Deprecated: replaced by OnPacketLost2. OnPacketLost ClientOnPacketLostFunc + // called when the client detects lost packets. + OnPacketLost2 ClientOnPacketLost2Func // called when a non-fatal decode error occurs. OnDecodeError ClientOnDecodeErrorFunc @@ -423,9 +434,21 @@ func (c *Client) Start(scheme string, host string) error { log.Println(err.Error()) } } - if c.OnPacketLost == nil { - c.OnPacketLost = func(err error) { - log.Println(err.Error()) + if c.OnPacketLost != nil { + c.OnPacketLost2 = func(lost uint64) { + c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: uint(lost)}) //nolint:staticcheck + } + } + if c.OnPacketLost2 == nil { + c.OnPacketLost2 = func(lost uint64) { + log.Printf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }()) } } if c.OnDecodeError == nil { diff --git a/client_format.go b/client_format.go index ceb7013d..828e1cbd 100644 --- a/client_format.go +++ b/client_format.go @@ -12,7 +12,6 @@ import ( "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) type clientFormat struct { @@ -129,9 +128,9 @@ func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { cf.onPacketRTP(pkt) } -func (cf *clientFormat) onPacketRTPLost(lost uint) { - atomic.AddUint64(cf.rtpPacketsLost, uint64(lost)) - cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost}) +func (cf *clientFormat) onPacketRTPLost(lost uint64) { + atomic.AddUint64(cf.rtpPacketsLost, lost) + cf.cm.c.OnPacketLost2(lost) } func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error { diff --git a/internal/rtplossdetector/lossdetector.go b/internal/rtplossdetector/lossdetector.go index 6261e14d..a8f28bf3 100644 --- a/internal/rtplossdetector/lossdetector.go +++ b/internal/rtplossdetector/lossdetector.go @@ -13,7 +13,7 @@ type LossDetector struct { // Process processes a RTP packet. // It returns the number of lost packets. -func (r *LossDetector) Process(pkt *rtp.Packet) uint { +func (r *LossDetector) Process(pkt *rtp.Packet) uint64 { if !r.initialized { r.initialized = true r.expectedSeqNum = pkt.SequenceNumber + 1 @@ -23,7 +23,7 @@ func (r *LossDetector) Process(pkt *rtp.Packet) uint { if pkt.SequenceNumber != r.expectedSeqNum { diff := pkt.SequenceNumber - r.expectedSeqNum r.expectedSeqNum = pkt.SequenceNumber + 1 - return uint(diff) + return uint64(diff) } r.expectedSeqNum = pkt.SequenceNumber + 1 diff --git a/internal/rtplossdetector/lossdetector_test.go b/internal/rtplossdetector/lossdetector_test.go index 527ddfe9..4e489440 100644 --- a/internal/rtplossdetector/lossdetector_test.go +++ b/internal/rtplossdetector/lossdetector_test.go @@ -15,19 +15,19 @@ func TestLossDetector(t *testing.T) { SequenceNumber: 65530, }, }) - require.Equal(t, uint(0), c) + require.Equal(t, uint64(0), c) c = d.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: 65531, }, }) - require.Equal(t, uint(0), c) + require.Equal(t, uint64(0), c) c = d.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: 65535, }, }) - require.Equal(t, uint(3), c) + require.Equal(t, uint64(3), c) } diff --git a/internal/rtpreorderer/reorderer.go b/internal/rtpreorderer/reorderer.go index be79d9bc..7624d1d3 100644 --- a/internal/rtpreorderer/reorderer.go +++ b/internal/rtpreorderer/reorderer.go @@ -27,7 +27,7 @@ func (r *Reorderer) Initialize() { // Process processes a RTP packet. // It returns a sequence of ordered packets and the number of lost packets. -func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { +func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint64) { if !r.initialized { r.initialized = true r.expectedSeqNum = pkt.SequenceNumber + 1 @@ -86,7 +86,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { ret[pos] = pkt r.expectedSeqNum = pkt.SequenceNumber + 1 - return ret, uint(int(relPos) - n + 1) + return ret, uint64(int(relPos) - n + 1) } // there's a missing packet diff --git a/internal/rtpreorderer/reorderer_test.go b/internal/rtpreorderer/reorderer_test.go index c80af535..6e1eb318 100644 --- a/internal/rtpreorderer/reorderer_test.go +++ b/internal/rtpreorderer/reorderer_test.go @@ -164,7 +164,7 @@ func TestReorder(t *testing.T) { for _, entry := range sequence { out, missing := r.Process(entry.in) require.Equal(t, entry.out, out) - require.Equal(t, uint(0), missing) + require.Equal(t, uint64(0), missing) } } @@ -173,7 +173,7 @@ func TestBufferIsFull(t *testing.T) { r.Initialize() r.absPos = 25 sn := uint16(1564) - toMiss := uint(34) + toMiss := uint64(34) out, missing := r.Process(&rtp.Packet{ Header: rtp.Header{ @@ -185,19 +185,19 @@ func TestBufferIsFull(t *testing.T) { SequenceNumber: sn, }, }}, out) - require.Equal(t, uint(0), missing) + require.Equal(t, uint64(0), missing) sn++ var expected []*rtp.Packet - for i := uint(0); i < 64-toMiss; i++ { + for i := uint64(0); i < 64-toMiss; i++ { out, missing = r.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: sn + uint16(toMiss), }, }) require.Equal(t, []*rtp.Packet(nil), out) - require.Equal(t, uint(0), missing) + require.Equal(t, uint64(0), missing) expected = append(expected, &rtp.Packet{ Header: rtp.Header{ @@ -242,7 +242,7 @@ func TestReset(t *testing.T) { }, }) require.Equal(t, []*rtp.Packet(nil), out) - require.Equal(t, uint(0), missing) + require.Equal(t, uint64(0), missing) sn++ } @@ -256,5 +256,5 @@ func TestReset(t *testing.T) { SequenceNumber: sn, }, }}, out) - require.Equal(t, uint(0), missing) + require.Equal(t, uint64(0), missing) } diff --git a/pkg/format/opus.go b/pkg/format/opus.go index a4030375..09902a4a 100644 --- a/pkg/format/opus.go +++ b/pkg/format/opus.go @@ -17,6 +17,7 @@ type Opus struct { PayloadTyp uint8 ChannelCount int + // // Deprecated: replaced by ChannelCount. IsStereo bool } diff --git a/pkg/liberrors/client.go b/pkg/liberrors/client.go index f8cc3def..41805722 100644 --- a/pkg/liberrors/client.go +++ b/pkg/liberrors/client.go @@ -250,6 +250,8 @@ func (e ErrClientWriteQueueFull) Error() string { } // ErrClientRTPPacketsLost is an error that can be returned by a client. +// +// Deprecated: will be removed in next version. type ErrClientRTPPacketsLost struct { Lost uint } diff --git a/pkg/liberrors/server.go b/pkg/liberrors/server.go index 610faa63..932f58d3 100644 --- a/pkg/liberrors/server.go +++ b/pkg/liberrors/server.go @@ -239,6 +239,8 @@ func (e ErrServerUnexpectedResponse) Error() string { type ErrServerWriteQueueFull = ErrClientWriteQueueFull // ErrServerRTPPacketsLost is an error that can be returned by a server. +// +// Deprecated: will be removed in next version. type ErrServerRTPPacketsLost = ErrClientRTPPacketsLost // ErrServerRTPPacketUnknownPayloadType is an error that can be returned by a server. diff --git a/pkg/rtplossdetector/lossdetector.go b/pkg/rtplossdetector/lossdetector.go index 7fdca19d..d0f9aabe 100644 --- a/pkg/rtplossdetector/lossdetector.go +++ b/pkg/rtplossdetector/lossdetector.go @@ -19,5 +19,5 @@ func New() *LossDetector { // Process processes a RTP packet. // It returns the number of lost packets. func (r *LossDetector) Process(pkt *rtp.Packet) uint { - return (*rtplossdetector.LossDetector)(r).Process(pkt) + return uint((*rtplossdetector.LossDetector)(r).Process(pkt)) } diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index 692de444..ba63dda1 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -23,5 +23,6 @@ func New() *Reorderer { // Process processes a RTP packet. // It returns a sequence of ordered packets and the number of lost packets. func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { - return (*rtpreorderer.Reorderer)(r).Process(pkt) + v1, v2 := (*rtpreorderer.Reorderer)(r).Process(pkt) + return v1, uint(v2) } diff --git a/server_handler.go b/server_handler.go index f6f764c0..7706aec4 100644 --- a/server_handler.go +++ b/server_handler.go @@ -195,7 +195,11 @@ type ServerHandlerOnSetParameter interface { // ServerHandlerOnPacketLostCtx is the context of OnPacketLost. type ServerHandlerOnPacketLostCtx struct { Session *ServerSession - Error error + Lost uint64 + + // + // Deprecated: replaced by Lost + Error error } // ServerHandlerOnPacketLost can be implemented by a ServerHandler. diff --git a/server_session_format.go b/server_session_format.go index f693c289..106a8195 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -112,16 +112,24 @@ func (sf *serverSessionFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { sf.onPacketRTP(pkt) } -func (sf *serverSessionFormat) onPacketRTPLost(lost uint) { - atomic.AddUint64(sf.rtpPacketsLost, uint64(lost)) +func (sf *serverSessionFormat) onPacketRTPLost(lost uint64) { + atomic.AddUint64(sf.rtpPacketsLost, lost) if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketLost); ok { h.OnPacketLost(&ServerHandlerOnPacketLostCtx{ Session: sf.sm.ss, - Error: liberrors.ErrServerRTPPacketsLost{Lost: lost}, + Lost: lost, + Error: liberrors.ErrServerRTPPacketsLost{Lost: uint(lost)}, //nolint:staticcheck }) } else { - log.Println(liberrors.ErrServerRTPPacketsLost{Lost: lost}.Error()) + log.Printf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }()) } }