allow rtpreceiver and rtpsender to count received, lost and sent packets (#947)

This commit is contained in:
Alessandro Ros
2025-11-12 18:07:09 +01:00
committed by GitHub
parent ece3be55c0
commit 0e56f305d1
9 changed files with 131 additions and 94 deletions

View File

@@ -2462,10 +2462,25 @@ func (c *Client) Stats() *ClientStats {
}()
ret[fo.format] = SessionStatsFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: fo.localSSRC,
RTPPacketsReceived: func() uint64 {
if recvStats != nil {
return recvStats.TotalReceived
}
return 0
}(),
RTPPacketsSent: func() uint64 {
if sentStats != nil {
return sentStats.TotalSent
}
return 0
}(),
RTPPacketsLost: func() uint64 {
if recvStats != nil {
return recvStats.TotalLost
}
return 0
}(),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if v, ok := fo.remoteSSRC(); ok {
return v

View File

@@ -27,16 +27,9 @@ type clientFormat struct {
rtpReceiver *rtpreceiver.Receiver // play
rtpSender *rtpsender.Sender // record or back channel
writePacketRTPInQueue func([]byte) error
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
rtpPacketsLost *uint64
}
func (cf *clientFormat) initialize() {
cf.rtpPacketsReceived = new(uint64)
cf.rtpPacketsSent = new(uint64)
cf.rtpPacketsLost = new(uint64)
if cf.cm.udpRTPListener != nil {
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueUDP
} else {
@@ -116,12 +109,9 @@ func (cf *clientFormat) readPacketRTP(payload []byte, header *rtp.Header, header
pkts, lost := cf.rtpReceiver.ProcessPacket2(pkt, now, cf.format.PTSEqualsDTS(pkt))
if lost != 0 {
atomic.AddUint64(cf.rtpPacketsLost, lost)
cf.cm.c.OnPacketsLost(lost)
}
atomic.AddUint64(cf.rtpPacketsReceived, uint64(len(pkts)))
for _, pkt := range pkts {
cf.onPacketRTP(pkt)
}
@@ -155,6 +145,8 @@ func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {
buf = encr
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(buf)))
cf.cm.c.writerMutex.RLock()
defer cf.cm.c.writerMutex.RUnlock()
@@ -173,26 +165,12 @@ func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {
}
func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := cf.cm.udpRTPListener.write(payload)
if err != nil {
return err
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cf.rtpPacketsSent, 1)
return nil
return cf.cm.udpRTPListener.write(payload)
}
func (cf *clientFormat) writePacketRTPInQueueTCP(payload []byte) error {
cf.cm.c.tcpFrame.Channel = cf.cm.tcpChannel
cf.cm.c.tcpFrame.Payload = payload
cf.cm.c.nconn.SetWriteDeadline(time.Now().Add(cf.cm.c.WriteTimeout))
err := cf.cm.c.conn.WriteInterleavedFrame(cf.cm.c.tcpFrame, cf.cm.c.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cf.rtpPacketsSent, 1)
return nil
return cf.cm.c.conn.WriteInterleavedFrame(cf.cm.c.tcpFrame, cf.cm.c.tcpBuffer)
}

View File

@@ -14,7 +14,7 @@ import (
// Receiver is a utility to receive RTP packets. It is in charge of:
// - removing duplicate packets (when transport is unreliable)
// - reordering packets (when transport is unrealiable)
// - counting lost packets
// - counting received and lost packets
// - generating RTCP receiver reports
type Receiver struct {
// Track clock rate.
@@ -24,10 +24,10 @@ type Receiver struct {
LocalSSRC uint32
// Whether the transport is unrealiable.
// This enables removing duplicate packets and reordering packets.
// When this is enabled, duplicate packets are removed, and packets are reordered.
UnrealiableTransport bool
// size of the buffer for reordering packets.
// Size of the buffer for reordering packets.
// It defaults to 64.
BufferSize int
@@ -56,9 +56,11 @@ type Receiver struct {
totalLost uint32
totalLostSinceReport uint32
totalSinceReport uint32
totalReceived uint64
totalLost2 uint64
jitter float64
// data from RTCP packets
// data from RTCP sender reports
firstSenderReportReceived bool
lastSenderReportTimeNTP uint64
lastSenderReportTimeRTP uint32
@@ -212,6 +214,8 @@ func (rr *Receiver) ProcessPacket2(
rr.totalLost += uint32(lost)
rr.totalLostSinceReport += uint32(lost)
rr.totalLost2 += lost
rr.totalReceived += uint64(len(pkts))
// allow up to 24 bits
if rr.totalLost > 0xFFFFFF {
@@ -387,6 +391,8 @@ type Stats struct {
LastRTP uint32
LastNTP time.Time
Jitter float64
TotalReceived uint64
TotalLost uint64
}
// Stats returns statistics.
@@ -406,5 +412,7 @@ func (rr *Receiver) Stats() *Stats {
LastRTP: rr.lastTimeRTP,
LastNTP: ntp,
Jitter: rr.jitter,
TotalReceived: rr.totalReceived,
TotalLost: rr.totalLost2,
}
}

View File

@@ -190,6 +190,7 @@ func TestStandard(t *testing.T) {
LastRTP: 2947921603,
LastSequenceNumber: 947,
LastNTP: time.Date(2008, 5, 20, 22, 15, 21, 0, time.UTC).Local(),
TotalReceived: 2,
}, stats)
})
}
@@ -296,6 +297,7 @@ func TestZeroClockRate(t *testing.T) {
RemoteSSRC: 0xba9da416,
LastRTP: 2947921603,
LastSequenceNumber: 947,
TotalReceived: 2,
}, stats)
}
@@ -510,7 +512,7 @@ func TestReliablePacketsLost(t *testing.T) {
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0120,
SequenceNumber: 288,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
@@ -525,7 +527,7 @@ func TestReliablePacketsLost(t *testing.T) {
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0122,
SequenceNumber: 290,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
@@ -536,6 +538,16 @@ func TestReliablePacketsLost(t *testing.T) {
require.NoError(t, err)
<-done
stats := rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 0xafb45733,
LastSequenceNumber: 290,
LastNTP: time.Date(2020, 11, 21, 17, 44, 36, 869277776, time.UTC).Local(),
TotalReceived: 1,
TotalLost: 1,
}, stats)
}
func TestReliableOverflowAndPacketsLost(t *testing.T) {
@@ -613,6 +625,16 @@ func TestReliableOverflowAndPacketsLost(t *testing.T) {
require.NoError(t, err)
<-done
stats := rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 0xafb45733,
LastSequenceNumber: 2,
LastNTP: time.Date(2020, 11, 21, 17, 44, 36, 869277776, time.UTC).Local(),
TotalReceived: 1,
TotalLost: 2,
}, stats)
}
func TestUnrealiableReorder(t *testing.T) {
@@ -785,6 +807,14 @@ func TestUnrealiableReorder(t *testing.T) {
require.Equal(t, entry.out, out)
require.Equal(t, uint64(0), missing)
}
stats := rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0,
LastRTP: 0,
LastSequenceNumber: 1,
TotalReceived: 7,
}, stats)
}
func TestUnrealiableBufferFull(t *testing.T) {
@@ -869,6 +899,15 @@ func TestUnrealiableBufferFull(t *testing.T) {
require.Equal(t, expected, out)
<-rtcpReceived
stats := rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0,
LastRTP: 0,
LastSequenceNumber: 1629,
TotalReceived: 31,
TotalLost: 34,
}, stats)
}
func TestUnrealiableReset(t *testing.T) {
@@ -931,6 +970,14 @@ func TestUnrealiableReset(t *testing.T) {
require.Equal(t, uint64(0), missing)
<-rtcpGenerated
stats := rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0,
LastRTP: 0,
LastSequenceNumber: 40064,
TotalReceived: 1,
}, stats)
}
func TestUnrealiableCustomBufferSize(t *testing.T) {

View File

@@ -11,7 +11,9 @@ import (
)
// Sender is a utility to send RTP packets.
// It is in charge of generating RTCP sender reports.
// It is in charge of
// - counting sent packets
// - generating RTCP sender reports.
type Sender struct {
ClockRate int
Period time.Duration
@@ -28,6 +30,7 @@ type Sender struct {
localSSRC uint32
lastSequenceNumber uint16
packetCount uint32
packetCount2 uint64
octetCount uint32
terminate chan struct{}
@@ -109,6 +112,7 @@ func (rs *Sender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS boo
rs.lastSequenceNumber = pkt.SequenceNumber
rs.packetCount++
rs.packetCount2++
rs.octetCount += uint32(len(pkt.Payload))
}
@@ -117,6 +121,7 @@ type Stats struct {
LastSequenceNumber uint16
LastRTP uint32
LastNTP time.Time
TotalSent uint64
}
// Stats returns statistics.
@@ -132,5 +137,6 @@ func (rs *Sender) Stats() *Stats {
LastSequenceNumber: rs.lastSequenceNumber,
LastRTP: rs.lastTimeRTP,
LastNTP: rs.lastTimeNTP,
TotalSent: rs.packetCount2,
}
}

View File

@@ -104,6 +104,7 @@ func TestSender(t *testing.T) {
LastSequenceNumber: 948,
LastRTP: 1287987768,
LastNTP: time.Date(2008, time.May, 20, 22, 15, 21, 0, time.UTC),
TotalSent: 3,
}, stats)
}
@@ -163,5 +164,6 @@ func TestSenderZeroClockRate(t *testing.T) {
LastSequenceNumber: 946,
LastRTP: 1287987768,
LastNTP: time.Date(2008, time.May, 20, 22, 15, 20, 0, time.UTC),
TotalSent: 1,
}, stats)
}

View File

@@ -494,10 +494,20 @@ func (ss *ServerSession) Stats() *SessionStats {
}()
ret[fo.format] = SessionStatsFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: fo.localSSRC,
RTPPacketsReceived: func() uint64 {
if recvStats != nil {
return recvStats.TotalReceived
}
return 0
}(),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: func() uint64 {
if recvStats != nil {
return recvStats.TotalLost
}
return 0
}(),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if v, ok := fo.remoteSSRC(); ok {
return v

View File

@@ -26,15 +26,11 @@ type serverSessionFormat struct {
remoteSSRCValue uint32 // record
rtpReceiver *rtpreceiver.Receiver // record
writePacketRTPInQueue func([]byte) error
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
rtpPacketsLost *uint64
}
func (sf *serverSessionFormat) initialize() {
sf.rtpPacketsReceived = new(uint64)
sf.rtpPacketsSent = new(uint64)
sf.rtpPacketsLost = new(uint64)
udp := sf.sm.ss.setuppedTransport.Protocol == ProtocolUDP ||
sf.sm.ss.setuppedTransport.Protocol == ProtocolUDPMulticast
@@ -102,8 +98,6 @@ func (sf *serverSessionFormat) readPacketRTP(payload []byte, header *rtp.Header,
pkts, lost := sf.rtpReceiver.ProcessPacket2(pkt, now, sf.format.PTSEqualsDTS(pkt))
if lost != 0 {
atomic.AddUint64(sf.rtpPacketsLost, lost)
if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketsLost); ok {
h.OnPacketsLost(&ServerHandlerOnPacketsLostCtx{
Session: sf.sm.ss,
@@ -121,8 +115,6 @@ func (sf *serverSessionFormat) readPacketRTP(payload []byte, header *rtp.Header,
}
}
atomic.AddUint64(sf.rtpPacketsReceived, uint64(len(pkts)))
for _, pkt := range pkts {
sf.onPacketRTP(pkt)
}
@@ -161,6 +153,9 @@ func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error {
}
func (sf *serverSessionFormat) writePacketRTPEncoded(payload []byte) error {
atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
sf.sm.ss.writerMutex.RLock()
defer sf.sm.ss.writerMutex.RUnlock()
@@ -179,26 +174,12 @@ func (sf *serverSessionFormat) writePacketRTPEncoded(payload []byte) error {
}
func (sf *serverSessionFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr)
if err != nil {
return err
}
atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
return nil
return sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr)
}
func (sf *serverSessionFormat) writePacketRTPInQueueTCP(payload []byte) error {
sf.sm.ss.tcpFrame.Channel = sf.sm.tcpChannel
sf.sm.ss.tcpFrame.Payload = payload
sf.sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sf.sm.ss.s.WriteTimeout))
err := sf.sm.ss.tcpConn.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
return nil
return sf.sm.ss.tcpConn.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer)
}

View File

@@ -78,55 +78,45 @@ func (sf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) err
}
}
encrLen := uint64(len(encr))
plainLen := uint64(len(plain))
// send unicast
for r := range sf.sm.st.activeUnicastReaders {
if rsm, ok := r.setuppedMedias[sf.sm.media]; ok {
rsf := rsm.formats[pkt.PayloadType]
var buf []byte
if isSecure(r.setuppedTransport.Profile) {
err = rsf.writePacketRTPEncoded(encr)
if err != nil {
r.onStreamWriteError(err)
continue
}
atomic.AddUint64(sf.sm.bytesSent, encrLen)
buf = encr
} else {
err = rsf.writePacketRTPEncoded(plain)
if err != nil {
r.onStreamWriteError(err)
continue
}
atomic.AddUint64(sf.sm.bytesSent, plainLen)
buf = plain
}
atomic.AddUint64(sf.sm.bytesSent, uint64(len(buf)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
err = rsf.writePacketRTPEncoded(buf)
if err != nil {
r.onStreamWriteError(err)
continue
}
}
}
// send multicast
if sf.sm.multicastWriter != nil {
var buf []byte
if sf.sm.srtpOutCtx != nil {
err = sf.sm.multicastWriter.writePacketRTP(encr)
if err != nil {
return err
}
atomic.AddUint64(sf.sm.bytesSent, encrLen)
buf = encr
} else {
err = sf.sm.multicastWriter.writePacketRTP(plain)
if err != nil {
return err
}
atomic.AddUint64(sf.sm.bytesSent, plainLen)
buf = plain
}
atomic.AddUint64(sf.sm.bytesSent, uint64(len(buf)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
err = sf.sm.multicastWriter.writePacketRTP(buf)
if err != nil {
return err
}
}
return nil