improve Stats performance (#839)

Build aggregated statistics by using already-available data.
This commit is contained in:
Alessandro Ros
2025-07-26 14:01:17 +02:00
committed by GitHub
parent c36d441583
commit 63f7ffc3e3
3 changed files with 259 additions and 263 deletions

223
client.go
View File

@@ -2311,72 +2311,150 @@ func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time,
// Stats returns client statistics. // Stats returns client statistics.
func (c *Client) Stats() *ClientStats { func (c *Client) Stats() *ClientStats {
mediaStats := func() map[*description.Media]StatsSessionMedia { //nolint:dupl
ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias))
for med, sm := range c.setuppedMedias {
ret[med] = StatsSessionMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
Formats: func() map[format.Format]StatsSessionFormat {
ret := make(map[format.Format]StatsSessionFormat, len(sm.formats))
for _, fo := range sm.formats {
recvStats := func() *rtcpreceiver.Stats {
if fo.rtcpReceiver != nil {
return fo.rtcpReceiver.Stats()
}
return nil
}()
sentStats := func() *rtcpsender.Stats {
if fo.rtcpSender != nil {
return fo.rtcpSender.Stats()
}
return nil
}()
ret[fo.format] = StatsSessionFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if v, ok := fo.remoteSSRC(); ok {
return v
}
return 0
}(),
RTPPacketsLastSequenceNumber: func() uint16 {
if recvStats != nil {
return recvStats.LastSequenceNumber
}
if sentStats != nil {
return sentStats.LastSequenceNumber
}
return 0
}(),
RTPPacketsLastRTP: func() uint32 {
if recvStats != nil {
return recvStats.LastRTP
}
if sentStats != nil {
return sentStats.LastRTP
}
return 0
}(),
RTPPacketsLastNTP: func() time.Time {
if recvStats != nil {
return recvStats.LastNTP
}
if sentStats != nil {
return sentStats.LastNTP
}
return time.Time{}
}(),
RTPPacketsJitter: func() float64 {
if recvStats != nil {
return recvStats.Jitter
}
return 0
}(),
}
}
return ret
}(),
}
}
return ret
}()
return &ClientStats{ return &ClientStats{
Conn: StatsConn{ Conn: StatsConn{
BytesReceived: atomic.LoadUint64(c.bytesReceived), BytesReceived: atomic.LoadUint64(c.bytesReceived),
BytesSent: atomic.LoadUint64(c.bytesSent), BytesSent: atomic.LoadUint64(c.bytesSent),
}, },
Session: StatsSession{ Session: StatsSession{ //nolint:dupl
BytesReceived: func() uint64 { BytesReceived: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.bytesReceived) v += ms.BytesReceived
} }
return v return v
}(), }(),
BytesSent: func() uint64 { BytesSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.bytesSent) v += ms.BytesSent
} }
return v return v
}(), }(),
RTPPacketsReceived: func() uint64 { RTPPacketsReceived: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
for _, f := range sm.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsReceived) v += f.RTPPacketsReceived
} }
} }
return v return v
}(), }(),
RTPPacketsSent: func() uint64 { RTPPacketsSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
for _, f := range sm.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsSent) v += f.RTPPacketsSent
} }
} }
return v return v
}(), }(),
RTPPacketsLost: func() uint64 { RTPPacketsLost: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
for _, f := range sm.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsLost) v += f.RTPPacketsLost
} }
} }
return v return v
}(), }(),
RTPPacketsInError: func() uint64 { RTPPacketsInError: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtpPacketsInError) v += ms.RTPPacketsInError
} }
return v return v
}(), }(),
RTPPacketsJitter: func() float64 { RTPPacketsJitter: func() float64 {
v := float64(0) v := float64(0)
n := float64(0) n := float64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
for _, fo := range sm.formats { for _, f := range ms.Formats {
if fo.rtcpReceiver != nil { v += f.RTPPacketsJitter
stats := fo.rtcpReceiver.Stats() n++
if stats != nil {
v += stats.Jitter
n++
}
}
} }
} }
if n != 0 { if n != 0 {
@@ -2386,107 +2464,26 @@ func (c *Client) Stats() *ClientStats {
}(), }(),
RTCPPacketsReceived: func() uint64 { RTCPPacketsReceived: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtcpPacketsReceived) v += ms.RTCPPacketsReceived
} }
return v return v
}(), }(),
RTCPPacketsSent: func() uint64 { RTCPPacketsSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtcpPacketsSent) v += ms.RTCPPacketsSent
} }
return v return v
}(), }(),
RTCPPacketsInError: func() uint64 { RTCPPacketsInError: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range c.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtcpPacketsInError) v += ms.RTCPPacketsInError
} }
return v return v
}(), }(),
Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl Medias: mediaStats,
ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias))
for med, sm := range c.setuppedMedias {
ret[med] = StatsSessionMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
Formats: func() map[format.Format]StatsSessionFormat {
ret := make(map[format.Format]StatsSessionFormat, len(sm.formats))
for _, fo := range sm.formats {
recvStats := func() *rtcpreceiver.Stats {
if fo.rtcpReceiver != nil {
return fo.rtcpReceiver.Stats()
}
return nil
}()
sentStats := func() *rtcpsender.Stats {
if fo.rtcpSender != nil {
return fo.rtcpSender.Stats()
}
return nil
}()
ret[fo.format] = StatsSessionFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if v, ok := fo.remoteSSRC(); ok {
return v
}
return 0
}(),
RTPPacketsLastSequenceNumber: func() uint16 {
if recvStats != nil {
return recvStats.LastSequenceNumber
}
if sentStats != nil {
return sentStats.LastSequenceNumber
}
return 0
}(),
RTPPacketsLastRTP: func() uint32 {
if recvStats != nil {
return recvStats.LastRTP
}
if sentStats != nil {
return sentStats.LastRTP
}
return 0
}(),
RTPPacketsLastNTP: func() time.Time {
if recvStats != nil {
return recvStats.LastNTP
}
if sentStats != nil {
return sentStats.LastNTP
}
return time.Time{}
}(),
RTPPacketsJitter: func() float64 {
if recvStats != nil {
return recvStats.Jitter
}
return 0
}(),
}
}
return ret
}(),
}
}
return ret
}(),
}, },
} }
} }

View File

@@ -561,67 +561,151 @@ func (ss *ServerSession) UserData() interface{} {
// Stats returns server session statistics. // Stats returns server session statistics.
func (ss *ServerSession) Stats() *StatsSession { func (ss *ServerSession) Stats() *StatsSession {
return &StatsSession{ mediaStats := func() map[*description.Media]StatsSessionMedia { //nolint:dupl
ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias))
for med, sm := range ss.setuppedMedias {
ret[med] = StatsSessionMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
Formats: func() map[format.Format]StatsSessionFormat {
ret := make(map[format.Format]StatsSessionFormat, len(sm.formats))
for _, fo := range sm.formats {
recvStats := func() *rtcpreceiver.Stats {
if fo.rtcpReceiver != nil {
return fo.rtcpReceiver.Stats()
}
return nil
}()
rtcpSender := func() *rtcpsender.RTCPSender {
if ss.setuppedStream != nil {
return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender
}
return nil
}()
sentStats := func() *rtcpsender.Stats {
if rtcpSender != nil {
return rtcpSender.Stats()
}
return nil
}()
ret[fo.format] = StatsSessionFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if v, ok := fo.remoteSSRC(); ok {
return v
}
return 0
}(),
RTPPacketsLastSequenceNumber: func() uint16 {
if recvStats != nil {
return recvStats.LastSequenceNumber
}
if sentStats != nil {
return sentStats.LastSequenceNumber
}
return 0
}(),
RTPPacketsLastRTP: func() uint32 {
if recvStats != nil {
return recvStats.LastRTP
}
if sentStats != nil {
return sentStats.LastRTP
}
return 0
}(),
RTPPacketsLastNTP: func() time.Time {
if recvStats != nil {
return recvStats.LastNTP
}
if sentStats != nil {
return sentStats.LastNTP
}
return time.Time{}
}(),
RTPPacketsJitter: func() float64 {
if recvStats != nil {
return recvStats.Jitter
}
return 0
}(),
}
}
return ret
}(),
}
}
return ret
}()
return &StatsSession{ //nolint:dupl
BytesReceived: func() uint64 { BytesReceived: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.bytesReceived) v += ms.BytesReceived
} }
return v return v
}(), }(),
BytesSent: func() uint64 { BytesSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.bytesSent) v += ms.BytesSent
} }
return v return v
}(), }(),
RTPPacketsReceived: func() uint64 { RTPPacketsReceived: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
for _, f := range sm.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsReceived) v += f.RTPPacketsReceived
} }
} }
return v return v
}(), }(),
RTPPacketsSent: func() uint64 { RTPPacketsSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
for _, f := range sm.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsSent) v += f.RTPPacketsSent
} }
} }
return v return v
}(), }(),
RTPPacketsLost: func() uint64 { RTPPacketsLost: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
for _, f := range sm.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsLost) v += f.RTPPacketsLost
} }
} }
return v return v
}(), }(),
RTPPacketsInError: func() uint64 { RTPPacketsInError: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtpPacketsInError) v += ms.RTPPacketsInError
} }
return v return v
}(), }(),
RTPPacketsJitter: func() float64 { RTPPacketsJitter: func() float64 {
v := float64(0) v := float64(0)
n := float64(0) n := float64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
for _, fo := range sm.formats { for _, f := range ms.Formats {
if fo.rtcpReceiver != nil { v += f.RTPPacketsJitter
stats := fo.rtcpReceiver.Stats() n++
if stats != nil {
v += stats.Jitter
n++
}
}
} }
} }
if n != 0 { if n != 0 {
@@ -631,113 +715,26 @@ func (ss *ServerSession) Stats() *StatsSession {
}(), }(),
RTCPPacketsReceived: func() uint64 { RTCPPacketsReceived: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtcpPacketsReceived) v += ms.RTCPPacketsReceived
} }
return v return v
}(), }(),
RTCPPacketsSent: func() uint64 { RTCPPacketsSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtcpPacketsSent) v += ms.RTCPPacketsSent
} }
return v return v
}(), }(),
RTCPPacketsInError: func() uint64 { RTCPPacketsInError: func() uint64 {
v := uint64(0) v := uint64(0)
for _, sm := range ss.setuppedMedias { for _, ms := range mediaStats {
v += atomic.LoadUint64(sm.rtcpPacketsInError) v += ms.RTCPPacketsInError
} }
return v return v
}(), }(),
Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl Medias: mediaStats,
ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias))
for med, sm := range ss.setuppedMedias {
ret[med] = StatsSessionMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
Formats: func() map[format.Format]StatsSessionFormat {
ret := make(map[format.Format]StatsSessionFormat, len(sm.formats))
for _, fo := range sm.formats {
recvStats := func() *rtcpreceiver.Stats {
if fo.rtcpReceiver != nil {
return fo.rtcpReceiver.Stats()
}
return nil
}()
rtcpSender := func() *rtcpsender.RTCPSender {
if ss.setuppedStream != nil {
return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender
}
return nil
}()
sentStats := func() *rtcpsender.Stats {
if rtcpSender != nil {
return rtcpSender.Stats()
}
return nil
}()
ret[fo.format] = StatsSessionFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if v, ok := fo.remoteSSRC(); ok {
return v
}
return 0
}(),
RTPPacketsLastSequenceNumber: func() uint16 {
if recvStats != nil {
return recvStats.LastSequenceNumber
}
if sentStats != nil {
return sentStats.LastSequenceNumber
}
return 0
}(),
RTPPacketsLastRTP: func() uint32 {
if recvStats != nil {
return recvStats.LastRTP
}
if sentStats != nil {
return sentStats.LastRTP
}
return 0
}(),
RTPPacketsLastNTP: func() time.Time {
if recvStats != nil {
return recvStats.LastNTP
}
if sentStats != nil {
return sentStats.LastNTP
}
return time.Time{}
}(),
RTPPacketsJitter: func() float64 {
if recvStats != nil {
return recvStats.Jitter
}
return 0
}(),
}
}
return ret
}(),
}
}
return ret
}(),
} }
} }

View File

@@ -111,54 +111,56 @@ func (st *ServerStream) Description() *description.Session {
// Stats returns stream statistics. // Stats returns stream statistics.
func (st *ServerStream) Stats() *ServerStreamStats { func (st *ServerStream) Stats() *ServerStreamStats {
mediaStats := func() map[*description.Media]ServerStreamStatsMedia {
ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias))
for med, sm := range st.medias {
ret[med] = ServerStreamStatsMedia{
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
Formats: func() map[format.Format]ServerStreamStatsFormat {
ret := make(map[format.Format]ServerStreamStatsFormat)
for _, fo := range sm.formats {
ret[fo.format] = ServerStreamStatsFormat{
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
LocalSSRC: fo.localSSRC,
}
}
return ret
}(),
}
}
return ret
}()
return &ServerStreamStats{ return &ServerStreamStats{
BytesSent: func() uint64 { BytesSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, me := range st.medias { for _, ms := range mediaStats {
v += atomic.LoadUint64(me.bytesSent) v += ms.BytesSent
} }
return v return v
}(), }(),
RTPPacketsSent: func() uint64 { RTPPacketsSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, me := range st.medias { for _, ms := range mediaStats {
for _, f := range me.formats { for _, f := range ms.Formats {
v += atomic.LoadUint64(f.rtpPacketsSent) v += f.RTPPacketsSent
} }
} }
return v return v
}(), }(),
RTCPPacketsSent: func() uint64 { RTCPPacketsSent: func() uint64 {
v := uint64(0) v := uint64(0)
for _, me := range st.medias { for _, ms := range mediaStats {
v += atomic.LoadUint64(me.rtcpPacketsSent) v += ms.RTCPPacketsSent
} }
return v return v
}(), }(),
Medias: func() map[*description.Media]ServerStreamStatsMedia { Medias: mediaStats,
ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias))
for med, sm := range st.medias {
ret[med] = ServerStreamStatsMedia{
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
Formats: func() map[format.Format]ServerStreamStatsFormat {
ret := make(map[format.Format]ServerStreamStatsFormat)
for _, fo := range sm.formats {
ret[fo.format] = ServerStreamStatsFormat{
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
LocalSSRC: fo.localSSRC,
}
}
return ret
}(),
}
}
return ret
}(),
} }
} }