diff --git a/client.go b/client.go index 7e620ba4..128c471c 100644 --- a/client.go +++ b/client.go @@ -2311,72 +2311,150 @@ func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, // Stats returns client statistics. func (c *Client) Stats() *ClientStats { + mediaStats := func() map[*description.Media]StatsSessionMedia { //nolint:dupl + ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias)) + + for med, sm := range c.setuppedMedias { + ret[med] = StatsSessionMedia{ + BytesReceived: atomic.LoadUint64(sm.bytesReceived), + BytesSent: atomic.LoadUint64(sm.bytesSent), + RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), + RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), + RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), + RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), + Formats: func() map[format.Format]StatsSessionFormat { + ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) + + for _, fo := range sm.formats { + recvStats := func() *rtcpreceiver.Stats { + if fo.rtcpReceiver != nil { + return fo.rtcpReceiver.Stats() + } + return nil + }() + sentStats := func() *rtcpsender.Stats { + if fo.rtcpSender != nil { + return fo.rtcpSender.Stats() + } + return nil + }() + + ret[fo.format] = StatsSessionFormat{ //nolint:dupl + RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), + RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), + RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), + LocalSSRC: fo.localSSRC, + RemoteSSRC: func() uint32 { + if v, ok := fo.remoteSSRC(); ok { + return v + } + return 0 + }(), + RTPPacketsLastSequenceNumber: func() uint16 { + if recvStats != nil { + return recvStats.LastSequenceNumber + } + if sentStats != nil { + return sentStats.LastSequenceNumber + } + return 0 + }(), + RTPPacketsLastRTP: func() uint32 { + if recvStats != nil { + return recvStats.LastRTP + } + if sentStats != nil { + return sentStats.LastRTP + } + return 0 + }(), + RTPPacketsLastNTP: func() time.Time { + if recvStats != nil { + return recvStats.LastNTP + } + if sentStats != nil { + return sentStats.LastNTP + } + return time.Time{} + }(), + RTPPacketsJitter: func() float64 { + if recvStats != nil { + return recvStats.Jitter + } + return 0 + }(), + } + } + + return ret + }(), + } + } + + return ret + }() + return &ClientStats{ Conn: StatsConn{ BytesReceived: atomic.LoadUint64(c.bytesReceived), BytesSent: atomic.LoadUint64(c.bytesSent), }, - Session: StatsSession{ + Session: StatsSession{ //nolint:dupl BytesReceived: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - v += atomic.LoadUint64(sm.bytesReceived) + for _, ms := range mediaStats { + v += ms.BytesReceived } return v }(), BytesSent: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - v += atomic.LoadUint64(sm.bytesSent) + for _, ms := range mediaStats { + v += ms.BytesSent } return v }(), RTPPacketsReceived: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - for _, f := range sm.formats { - v += atomic.LoadUint64(f.rtpPacketsReceived) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsReceived } } return v }(), RTPPacketsSent: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - for _, f := range sm.formats { - v += atomic.LoadUint64(f.rtpPacketsSent) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsSent } } return v }(), RTPPacketsLost: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - for _, f := range sm.formats { - v += atomic.LoadUint64(f.rtpPacketsLost) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsLost } } return v }(), RTPPacketsInError: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - v += atomic.LoadUint64(sm.rtpPacketsInError) + for _, ms := range mediaStats { + v += ms.RTPPacketsInError } return v }(), RTPPacketsJitter: func() float64 { v := float64(0) n := float64(0) - for _, sm := range c.setuppedMedias { - for _, fo := range sm.formats { - if fo.rtcpReceiver != nil { - stats := fo.rtcpReceiver.Stats() - if stats != nil { - v += stats.Jitter - n++ - } - } + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsJitter + n++ } } if n != 0 { @@ -2386,107 +2464,26 @@ func (c *Client) Stats() *ClientStats { }(), RTCPPacketsReceived: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - v += atomic.LoadUint64(sm.rtcpPacketsReceived) + for _, ms := range mediaStats { + v += ms.RTCPPacketsReceived } return v }(), RTCPPacketsSent: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - v += atomic.LoadUint64(sm.rtcpPacketsSent) + for _, ms := range mediaStats { + v += ms.RTCPPacketsSent } return v }(), RTCPPacketsInError: func() uint64 { v := uint64(0) - for _, sm := range c.setuppedMedias { - v += atomic.LoadUint64(sm.rtcpPacketsInError) + for _, ms := range mediaStats { + v += ms.RTCPPacketsInError } return v }(), - Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl - ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias)) - - for med, sm := range c.setuppedMedias { - ret[med] = StatsSessionMedia{ - BytesReceived: atomic.LoadUint64(sm.bytesReceived), - BytesSent: atomic.LoadUint64(sm.bytesSent), - RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), - RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), - RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), - RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), - Formats: func() map[format.Format]StatsSessionFormat { - ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) - - for _, fo := range sm.formats { - recvStats := func() *rtcpreceiver.Stats { - if fo.rtcpReceiver != nil { - return fo.rtcpReceiver.Stats() - } - return nil - }() - sentStats := func() *rtcpsender.Stats { - if fo.rtcpSender != nil { - return fo.rtcpSender.Stats() - } - return nil - }() - - ret[fo.format] = StatsSessionFormat{ //nolint:dupl - RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), - RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), - RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), - LocalSSRC: fo.localSSRC, - RemoteSSRC: func() uint32 { - if v, ok := fo.remoteSSRC(); ok { - return v - } - return 0 - }(), - RTPPacketsLastSequenceNumber: func() uint16 { - if recvStats != nil { - return recvStats.LastSequenceNumber - } - if sentStats != nil { - return sentStats.LastSequenceNumber - } - return 0 - }(), - RTPPacketsLastRTP: func() uint32 { - if recvStats != nil { - return recvStats.LastRTP - } - if sentStats != nil { - return sentStats.LastRTP - } - return 0 - }(), - RTPPacketsLastNTP: func() time.Time { - if recvStats != nil { - return recvStats.LastNTP - } - if sentStats != nil { - return sentStats.LastNTP - } - return time.Time{} - }(), - RTPPacketsJitter: func() float64 { - if recvStats != nil { - return recvStats.Jitter - } - return 0 - }(), - } - } - - return ret - }(), - } - } - - return ret - }(), + Medias: mediaStats, }, } } diff --git a/server_session.go b/server_session.go index ee59bfba..7be6490a 100644 --- a/server_session.go +++ b/server_session.go @@ -561,67 +561,151 @@ func (ss *ServerSession) UserData() interface{} { // Stats returns server session statistics. func (ss *ServerSession) Stats() *StatsSession { - return &StatsSession{ + mediaStats := func() map[*description.Media]StatsSessionMedia { //nolint:dupl + ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias)) + + for med, sm := range ss.setuppedMedias { + ret[med] = StatsSessionMedia{ + BytesReceived: atomic.LoadUint64(sm.bytesReceived), + BytesSent: atomic.LoadUint64(sm.bytesSent), + RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), + RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), + RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), + RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), + Formats: func() map[format.Format]StatsSessionFormat { + ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) + + for _, fo := range sm.formats { + recvStats := func() *rtcpreceiver.Stats { + if fo.rtcpReceiver != nil { + return fo.rtcpReceiver.Stats() + } + return nil + }() + rtcpSender := func() *rtcpsender.RTCPSender { + if ss.setuppedStream != nil { + return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender + } + return nil + }() + sentStats := func() *rtcpsender.Stats { + if rtcpSender != nil { + return rtcpSender.Stats() + } + return nil + }() + + ret[fo.format] = StatsSessionFormat{ //nolint:dupl + RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), + RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), + RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), + LocalSSRC: fo.localSSRC, + RemoteSSRC: func() uint32 { + if v, ok := fo.remoteSSRC(); ok { + return v + } + return 0 + }(), + RTPPacketsLastSequenceNumber: func() uint16 { + if recvStats != nil { + return recvStats.LastSequenceNumber + } + if sentStats != nil { + return sentStats.LastSequenceNumber + } + return 0 + }(), + RTPPacketsLastRTP: func() uint32 { + if recvStats != nil { + return recvStats.LastRTP + } + if sentStats != nil { + return sentStats.LastRTP + } + return 0 + }(), + RTPPacketsLastNTP: func() time.Time { + if recvStats != nil { + return recvStats.LastNTP + } + if sentStats != nil { + return sentStats.LastNTP + } + return time.Time{} + }(), + RTPPacketsJitter: func() float64 { + if recvStats != nil { + return recvStats.Jitter + } + return 0 + }(), + } + } + + return ret + }(), + } + } + + return ret + }() + + return &StatsSession{ //nolint:dupl BytesReceived: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - v += atomic.LoadUint64(sm.bytesReceived) + for _, ms := range mediaStats { + v += ms.BytesReceived } return v }(), BytesSent: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - v += atomic.LoadUint64(sm.bytesSent) + for _, ms := range mediaStats { + v += ms.BytesSent } return v }(), RTPPacketsReceived: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - for _, f := range sm.formats { - v += atomic.LoadUint64(f.rtpPacketsReceived) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsReceived } } return v }(), RTPPacketsSent: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - for _, f := range sm.formats { - v += atomic.LoadUint64(f.rtpPacketsSent) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsSent } } return v }(), RTPPacketsLost: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - for _, f := range sm.formats { - v += atomic.LoadUint64(f.rtpPacketsLost) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsLost } } return v }(), RTPPacketsInError: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - v += atomic.LoadUint64(sm.rtpPacketsInError) + for _, ms := range mediaStats { + v += ms.RTPPacketsInError } return v }(), RTPPacketsJitter: func() float64 { v := float64(0) n := float64(0) - for _, sm := range ss.setuppedMedias { - for _, fo := range sm.formats { - if fo.rtcpReceiver != nil { - stats := fo.rtcpReceiver.Stats() - if stats != nil { - v += stats.Jitter - n++ - } - } + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsJitter + n++ } } if n != 0 { @@ -631,113 +715,26 @@ func (ss *ServerSession) Stats() *StatsSession { }(), RTCPPacketsReceived: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - v += atomic.LoadUint64(sm.rtcpPacketsReceived) + for _, ms := range mediaStats { + v += ms.RTCPPacketsReceived } return v }(), RTCPPacketsSent: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - v += atomic.LoadUint64(sm.rtcpPacketsSent) + for _, ms := range mediaStats { + v += ms.RTCPPacketsSent } return v }(), RTCPPacketsInError: func() uint64 { v := uint64(0) - for _, sm := range ss.setuppedMedias { - v += atomic.LoadUint64(sm.rtcpPacketsInError) + for _, ms := range mediaStats { + v += ms.RTCPPacketsInError } return v }(), - Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl - ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias)) - - for med, sm := range ss.setuppedMedias { - ret[med] = StatsSessionMedia{ - BytesReceived: atomic.LoadUint64(sm.bytesReceived), - BytesSent: atomic.LoadUint64(sm.bytesSent), - RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError), - RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived), - RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), - RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError), - Formats: func() map[format.Format]StatsSessionFormat { - ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) - - for _, fo := range sm.formats { - recvStats := func() *rtcpreceiver.Stats { - if fo.rtcpReceiver != nil { - return fo.rtcpReceiver.Stats() - } - return nil - }() - rtcpSender := func() *rtcpsender.RTCPSender { - if ss.setuppedStream != nil { - return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender - } - return nil - }() - sentStats := func() *rtcpsender.Stats { - if rtcpSender != nil { - return rtcpSender.Stats() - } - return nil - }() - - ret[fo.format] = StatsSessionFormat{ //nolint:dupl - RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), - RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), - RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), - LocalSSRC: fo.localSSRC, - RemoteSSRC: func() uint32 { - if v, ok := fo.remoteSSRC(); ok { - return v - } - return 0 - }(), - RTPPacketsLastSequenceNumber: func() uint16 { - if recvStats != nil { - return recvStats.LastSequenceNumber - } - if sentStats != nil { - return sentStats.LastSequenceNumber - } - return 0 - }(), - RTPPacketsLastRTP: func() uint32 { - if recvStats != nil { - return recvStats.LastRTP - } - if sentStats != nil { - return sentStats.LastRTP - } - return 0 - }(), - RTPPacketsLastNTP: func() time.Time { - if recvStats != nil { - return recvStats.LastNTP - } - if sentStats != nil { - return sentStats.LastNTP - } - return time.Time{} - }(), - RTPPacketsJitter: func() float64 { - if recvStats != nil { - return recvStats.Jitter - } - return 0 - }(), - } - } - - return ret - }(), - } - } - - return ret - }(), + Medias: mediaStats, } } diff --git a/server_stream.go b/server_stream.go index 4278b4eb..3f3c7171 100644 --- a/server_stream.go +++ b/server_stream.go @@ -111,54 +111,56 @@ func (st *ServerStream) Description() *description.Session { // Stats returns stream statistics. func (st *ServerStream) Stats() *ServerStreamStats { + mediaStats := func() map[*description.Media]ServerStreamStatsMedia { + ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias)) + + for med, sm := range st.medias { + ret[med] = ServerStreamStatsMedia{ + BytesSent: atomic.LoadUint64(sm.bytesSent), + RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), + Formats: func() map[format.Format]ServerStreamStatsFormat { + ret := make(map[format.Format]ServerStreamStatsFormat) + + for _, fo := range sm.formats { + ret[fo.format] = ServerStreamStatsFormat{ + RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), + LocalSSRC: fo.localSSRC, + } + } + + return ret + }(), + } + } + + return ret + }() + return &ServerStreamStats{ BytesSent: func() uint64 { v := uint64(0) - for _, me := range st.medias { - v += atomic.LoadUint64(me.bytesSent) + for _, ms := range mediaStats { + v += ms.BytesSent } return v }(), RTPPacketsSent: func() uint64 { v := uint64(0) - for _, me := range st.medias { - for _, f := range me.formats { - v += atomic.LoadUint64(f.rtpPacketsSent) + for _, ms := range mediaStats { + for _, f := range ms.Formats { + v += f.RTPPacketsSent } } return v }(), RTCPPacketsSent: func() uint64 { v := uint64(0) - for _, me := range st.medias { - v += atomic.LoadUint64(me.rtcpPacketsSent) + for _, ms := range mediaStats { + v += ms.RTCPPacketsSent } return v }(), - Medias: func() map[*description.Media]ServerStreamStatsMedia { - ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias)) - - for med, sm := range st.medias { - ret[med] = ServerStreamStatsMedia{ - BytesSent: atomic.LoadUint64(sm.bytesSent), - RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent), - Formats: func() map[format.Format]ServerStreamStatsFormat { - ret := make(map[format.Format]ServerStreamStatsFormat) - - for _, fo := range sm.formats { - ret[fo.format] = ServerStreamStatsFormat{ - RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), - LocalSSRC: fo.localSSRC, - } - } - - return ret - }(), - } - } - - return ret - }(), + Medias: mediaStats, } }