add statistics to Client, ServerSession, ServerConn, ServerStream (#556) (#656)

This commit is contained in:
Alessandro Ros
2024-12-25 13:30:08 +01:00
committed by GitHub
parent 8c4a3ca018
commit 87c6d81053
34 changed files with 1246 additions and 404 deletions

279
client.go
View File

@@ -19,6 +19,8 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
"github.com/bluenviron/gortsplib/v4/pkg/auth"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/bytecounter"
@@ -269,8 +271,10 @@ type Client struct {
// explicitly request back channels to the server.
RequestBackChannels bool
// pointer to a variable that stores received bytes.
// Deprecated: use Client.Stats()
BytesReceived *uint64
// pointer to a variable that stores sent bytes.
// Deprecated: use Client.Stats()
BytesSent *uint64
//
@@ -326,7 +330,7 @@ type Client struct {
effectiveTransport *Transport
backChannelSetupped bool
stdChannelSetupped bool
medias map[*description.Media]*clientMedia
setuppedMedias map[*description.Media]*clientMedia
tcpCallbackByChannel map[int]readFunc
lastRange *headers.Range
checkTimeoutTimer *time.Timer
@@ -341,6 +345,8 @@ type Client struct {
mustClose bool
tcpFrame *base.InterleavedFrame
tcpBuffer []byte
bytesReceived *uint64
bytesSent *uint64
// in
chOptions chan optionsReq
@@ -380,12 +386,6 @@ func (c *Client) Start(scheme string, host string) error {
if c.UserAgent == "" {
c.UserAgent = "gortsplib"
}
if c.BytesReceived == nil {
c.BytesReceived = new(uint64)
}
if c.BytesSent == nil {
c.BytesSent = new(uint64)
}
// system functions
if c.DialContext == nil {
@@ -454,6 +454,18 @@ func (c *Client) Start(scheme string, host string) error {
c.checkTimeoutTimer = emptyTimer()
c.keepalivePeriod = 30 * time.Second
c.keepaliveTimer = emptyTimer()
if c.BytesReceived != nil {
c.bytesReceived = c.BytesReceived
} else {
c.bytesReceived = new(uint64)
}
if c.BytesSent != nil {
c.bytesSent = c.BytesSent
} else {
c.bytesSent = new(uint64)
}
c.chOptions = make(chan optionsReq)
c.chDescribe = make(chan describeReq)
c.chAnnounce = make(chan announceReq)
@@ -739,7 +751,7 @@ func (c *Client) doClose() {
c.conn = nil
}
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
cm.close()
}
}
@@ -757,7 +769,7 @@ func (c *Client) reset() {
c.effectiveTransport = nil
c.backChannelSetupped = false
c.stdChannelSetupped = false
c.medias = nil
c.setuppedMedias = nil
c.tcpCallbackByChannel = nil
}
@@ -781,7 +793,7 @@ func (c *Client) trySwitchingProtocol() error {
prevConnURL := c.connURL
prevBaseURL := c.baseURL
prevMedias := c.medias
prevMedias := c.setuppedMedias
c.reset()
@@ -801,9 +813,9 @@ func (c *Client) trySwitchingProtocol() error {
return err
}
c.medias[i].onPacketRTCP = cm.onPacketRTCP
c.setuppedMedias[i].onPacketRTCP = cm.onPacketRTCP
for j, tr := range cm.formats {
c.medias[i].formats[j].onPacketRTP = tr.onPacketRTP
c.setuppedMedias[i].formats[j].onPacketRTP = tr.onPacketRTP
}
}
@@ -854,7 +866,7 @@ func (c *Client) startTransportRoutines() {
c.timeDecoder = rtptime.NewGlobalDecoder2()
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
cm.start()
}
@@ -894,7 +906,7 @@ func (c *Client) stopTransportRoutines() {
c.checkTimeoutTimer = emptyTimer()
c.keepaliveTimer = emptyTimer()
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
cm.stop()
}
@@ -935,7 +947,7 @@ func (c *Client) connOpen() error {
}
c.nconn = nconn
bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent)
bc := bytecounter.New(c.nconn, c.bytesReceived, c.bytesSent)
c.conn = conn.NewConn(bc)
c.reader = &clientReader{
c: c,
@@ -1021,7 +1033,7 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error
}
func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool {
for _, ct := range c.medias {
for _, ct := range c.setuppedMedias {
lft := atomic.LoadInt64(ct.udpRTPListener.lastPacketTime)
if lft != 0 {
return true
@@ -1037,7 +1049,7 @@ func (c *Client) atLeastOneUDPPacketHasBeenReceived() bool {
func (c *Client) isInUDPTimeout() bool {
now := c.timeNow()
for _, ct := range c.medias {
for _, ct := range c.setuppedMedias {
lft := time.Unix(atomic.LoadInt64(ct.udpRTPListener.lastPacketTime), 0)
if now.Sub(lft) < c.ReadTimeout {
return false
@@ -1347,9 +1359,10 @@ func (c *Client) doSetup(
}
cm := &clientMedia{
c: c,
onPacketRTCP: func(rtcp.Packet) {},
c: c,
media: medi,
}
cm.initialize()
if c.effectiveTransport == nil {
if c.connURL.Scheme == "rtsps" { // always use TCP if encrypted
@@ -1583,12 +1596,11 @@ func (c *Client) doSetup(
cm.tcpChannel = thRes.InterleavedIDs[0]
}
if c.medias == nil {
c.medias = make(map[*description.Media]*clientMedia)
if c.setuppedMedias == nil {
c.setuppedMedias = make(map[*description.Media]*clientMedia)
}
c.medias[medi] = cm
cm.setMedia(medi)
c.setuppedMedias[medi] = cm
c.baseURL = baseURL
c.effectiveTransport = &desiredTransport
@@ -1607,7 +1619,7 @@ func (c *Client) doSetup(
}
func (c *Client) isChannelPairInUse(channel int) bool {
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
if (cm.tcpChannel+1) == channel || cm.tcpChannel == channel || cm.tcpChannel == (channel+1) {
return true
}
@@ -1712,7 +1724,7 @@ func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) {
// don't do this with multicast, otherwise the RTP packet is going to be broadcasted
// to all listeners, including us, messing up the stream.
if *c.effectiveTransport == TransportUDP {
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal()
cm.udpRTPListener.write(byts) //nolint:errcheck
@@ -1852,9 +1864,9 @@ func (c *Client) Seek(ra *headers.Range) (*base.Response, error) {
return c.Play(ra)
}
// OnPacketRTPAny sets the callback that is called when a RTP packet is read from any setupped media.
// OnPacketRTPAny sets a callback that is called when a RTP packet is read from any setupped media.
func (c *Client) OnPacketRTPAny(cb OnPacketRTPAnyFunc) {
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
cmedia := cm.media
for _, forma := range cm.media.Formats {
c.OnPacketRTP(cm.media, forma, func(pkt *rtp.Packet) {
@@ -1864,9 +1876,9 @@ func (c *Client) OnPacketRTPAny(cb OnPacketRTPAnyFunc) {
}
}
// OnPacketRTCPAny sets the callback that is called when a RTCP packet is read from any setupped media.
// OnPacketRTCPAny sets a callback that is called when a RTCP packet is read from any setupped media.
func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
for _, cm := range c.medias {
for _, cm := range c.setuppedMedias {
cmedia := cm.media
c.OnPacketRTCP(cm.media, func(pkt rtcp.Packet) {
cb(cmedia, pkt)
@@ -1874,16 +1886,16 @@ func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
}
}
// OnPacketRTP sets the callback that is called when a RTP packet is read.
// OnPacketRTP sets a callback that is called when a RTP packet is read.
func (c *Client) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) {
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
ct := cm.formats[forma.PayloadType()]
ct.onPacketRTP = cb
}
// OnPacketRTCP sets the callback that is called when a RTCP packet is read.
// OnPacketRTCP sets a callback that is called when a RTCP packet is read.
func (c *Client) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) {
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
cm.onPacketRTCP = cb
}
@@ -1908,13 +1920,13 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet,
default:
}
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
cf := cm.formats[pkt.PayloadType]
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
cf.rtcpSender.ProcessPacketRTP(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
ok := c.writer.push(func() error {
return cm.writePacketRTPInQueue(byts)
return cf.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
@@ -1936,7 +1948,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
default:
}
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
ok := c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
@@ -1953,7 +1965,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
//
// Deprecated: replaced by PacketPTS2.
func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
ct := cm.formats[pkt.PayloadType]
v, ok := c.timeDecoder.Decode(ct.format, pkt)
@@ -1967,7 +1979,7 @@ func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Durat
// PacketPTS2 returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) {
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder.Decode(ct.format, pkt)
}
@@ -1975,7 +1987,194 @@ func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bo
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from RTCP sender reports.
func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) {
cm := c.medias[medi]
cm := c.setuppedMedias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.rtcpReceiver.PacketNTP(pkt.Timestamp)
}
// Stats returns client statistics.
func (c *Client) Stats() *ClientStats {
return &ClientStats{
Conn: StatsConn{
BytesReceived: atomic.LoadUint64(c.bytesReceived),
BytesSent: atomic.LoadUint64(c.bytesSent),
},
Session: StatsSession{
BytesReceived: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
v += atomic.LoadUint64(sm.bytesReceived)
}
return v
}(),
BytesSent: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
v += atomic.LoadUint64(sm.bytesSent)
}
return v
}(),
RTPPacketsReceived: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
for _, f := range sm.formats {
v += atomic.LoadUint64(f.rtpPacketsReceived)
}
}
return v
}(),
RTPPacketsSent: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
for _, f := range sm.formats {
v += atomic.LoadUint64(f.rtpPacketsSent)
}
}
return v
}(),
RTPPacketsLost: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
for _, f := range sm.formats {
v += atomic.LoadUint64(f.rtpPacketsLost)
}
}
return v
}(),
RTPPacketsInError: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
v += atomic.LoadUint64(sm.rtpPacketsInError)
}
return v
}(),
RTPJitter: func() float64 {
v := float64(0)
n := float64(0)
for _, sm := range c.setuppedMedias {
for _, fo := range sm.formats {
if fo.rtcpReceiver != nil {
stats := fo.rtcpReceiver.Stats()
if stats != nil {
v += stats.Jitter
n++
}
}
}
}
return v / n
}(),
RTCPPacketsReceived: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
v += atomic.LoadUint64(sm.rtcpPacketsReceived)
}
return v
}(),
RTCPPacketsSent: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
v += atomic.LoadUint64(sm.rtcpPacketsSent)
}
return v
}(),
RTCPPacketsInError: func() uint64 {
v := uint64(0)
for _, sm := range c.setuppedMedias {
v += atomic.LoadUint64(sm.rtcpPacketsInError)
}
return v
}(),
Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl
ret := make(map[*description.Media]StatsSessionMedia, len(c.setuppedMedias))
for med, sm := range c.setuppedMedias {
ret[med] = StatsSessionMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
Formats: func() map[format.Format]StatsSessionFormat {
ret := make(map[format.Format]StatsSessionFormat, len(sm.formats))
for _, fo := range sm.formats {
recvStats := func() *rtcpreceiver.Stats {
if fo.rtcpReceiver != nil {
return fo.rtcpReceiver.Stats()
}
return nil
}()
sentStats := func() *rtcpsender.Stats {
if fo.rtcpSender != nil {
return fo.rtcpSender.Stats()
}
return nil
}()
ret[fo.format] = StatsSessionFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: func() uint32 {
if fo.rtcpReceiver != nil {
return *fo.rtcpReceiver.LocalSSRC
}
if sentStats != nil {
return sentStats.LocalSSRC
}
return 0
}(),
RemoteSSRC: func() uint32 {
if recvStats != nil {
return recvStats.RemoteSSRC
}
return 0
}(),
RTPPacketsLastSequenceNumber: func() uint16 {
if recvStats != nil {
return recvStats.LastSequenceNumber
}
if sentStats != nil {
return sentStats.LastSequenceNumber
}
return 0
}(),
RTPPacketsLastRTP: func() uint32 {
if recvStats != nil {
return recvStats.LastRTP
}
if sentStats != nil {
return sentStats.LastRTP
}
return 0
}(),
RTPPacketsLastNTP: func() time.Time {
if recvStats != nil {
return recvStats.LastNTP
}
if sentStats != nil {
return sentStats.LastNTP
}
return time.Time{}
}(),
RTPJitter: func() float64 {
if recvStats != nil {
return recvStats.Jitter
}
return 0
}(),
}
}
return ret
}(),
}
}
return ret
}(),
},
}
}

View File

@@ -1,6 +1,9 @@
package gortsplib
import (
"sync/atomic"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
@@ -17,13 +20,29 @@ type clientFormat struct {
format format.Format
onPacketRTP OnPacketRTPFunc
udpReorderer *rtpreorderer.Reorderer // play
tcpLossDetector *rtplossdetector.LossDetector // play
rtcpReceiver *rtcpreceiver.RTCPReceiver // play
rtcpSender *rtcpsender.RTCPSender // record or back channel
udpReorderer *rtpreorderer.Reorderer // play
tcpLossDetector *rtplossdetector.LossDetector // play
rtcpReceiver *rtcpreceiver.RTCPReceiver // play
rtcpSender *rtcpsender.RTCPSender // record or back channel
writePacketRTPInQueue func([]byte) error
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
rtpPacketsLost *uint64
}
func (cf *clientFormat) initialize() {
cf.rtpPacketsReceived = new(uint64)
cf.rtpPacketsSent = new(uint64)
cf.rtpPacketsLost = new(uint64)
}
func (cf *clientFormat) start() {
if cf.cm.udpRTPListener != nil {
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueUDP
} else {
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueTCP
}
if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel {
cf.rtcpSender = &rtcpsender.RTCPSender{
ClockRate: cf.format.ClockRate(),
@@ -72,40 +91,70 @@ func (cf *clientFormat) stop() {
}
}
func (cf *clientFormat) readRTPUDP(pkt *rtp.Packet) {
func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {
cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost})
cf.onPacketRTPLost(lost)
// do not return
}
now := cf.cm.c.timeNow()
for _, pkt := range packets {
err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt))
if err != nil {
cf.cm.c.OnDecodeError(err)
continue
}
cf.onPacketRTP(pkt)
cf.handlePacketRTP(pkt, now)
}
}
func (cf *clientFormat) readRTPTCP(pkt *rtp.Packet) {
func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) {
lost := cf.tcpLossDetector.Process(pkt)
if lost != 0 {
cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost})
cf.onPacketRTPLost(lost)
// do not return
}
now := cf.cm.c.timeNow()
err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt))
cf.handlePacketRTP(pkt, now)
}
func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) {
err := cf.rtcpReceiver.ProcessPacketRTP(pkt, now, cf.format.PTSEqualsDTS(pkt))
if err != nil {
cf.cm.c.OnDecodeError(err)
cf.cm.onPacketRTPDecodeError(err)
return
}
atomic.AddUint64(cf.rtpPacketsReceived, 1)
cf.onPacketRTP(pkt)
}
func (cf *clientFormat) onPacketRTPLost(lost uint) {
atomic.AddUint64(cf.rtpPacketsLost, uint64(lost))
cf.cm.c.OnPacketLost(liberrors.ErrClientRTPPacketsLost{Lost: lost})
}
func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := cf.cm.udpRTPListener.write(payload)
if err != nil {
return err
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cf.rtpPacketsSent, 1)
return nil
}
func (cf *clientFormat) writePacketRTPInQueueTCP(payload []byte) error {
cf.cm.c.tcpFrame.Channel = cf.cm.tcpChannel
cf.cm.c.tcpFrame.Payload = payload
cf.cm.c.nconn.SetWriteDeadline(time.Now().Add(cf.cm.c.WriteTimeout))
err := cf.cm.c.conn.WriteInterleavedFrame(cf.cm.c.tcpFrame, cf.cm.c.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cf.rtpPacketsSent, 1)
return nil
}

View File

@@ -13,16 +13,43 @@ import (
)
type clientMedia struct {
c *Client
onPacketRTCP OnPacketRTCPFunc
c *Client
media *description.Media
media *description.Media
onPacketRTCP OnPacketRTCPFunc
formats map[uint8]*clientFormat
tcpChannel int
udpRTPListener *clientUDPListener
udpRTCPListener *clientUDPListener
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
bytesReceived *uint64
bytesSent *uint64
rtpPacketsInError *uint64
rtcpPacketsReceived *uint64
rtcpPacketsSent *uint64
rtcpPacketsInError *uint64
}
func (cm *clientMedia) initialize() {
cm.onPacketRTCP = func(rtcp.Packet) {}
cm.bytesReceived = new(uint64)
cm.bytesSent = new(uint64)
cm.rtpPacketsInError = new(uint64)
cm.rtcpPacketsReceived = new(uint64)
cm.rtcpPacketsSent = new(uint64)
cm.rtcpPacketsInError = new(uint64)
cm.formats = make(map[uint8]*clientFormat)
for _, forma := range cm.media.Formats {
f := &clientFormat{
cm: cm,
format: forma,
onPacketRTP: func(*rtp.Packet) {},
}
f.initialize()
cm.formats[forma.PayloadType()] = f
}
}
func (cm *clientMedia) close() {
@@ -71,33 +98,18 @@ func (cm *clientMedia) allocateUDPListeners(
return err
}
func (cm *clientMedia) setMedia(medi *description.Media) {
cm.media = medi
cm.formats = make(map[uint8]*clientFormat)
for _, forma := range medi.Formats {
cm.formats[forma.PayloadType()] = &clientFormat{
cm: cm,
format: forma,
onPacketRTP: func(*rtp.Packet) {},
}
}
}
func (cm *clientMedia) start() {
if cm.udpRTPListener != nil {
cm.writePacketRTPInQueue = cm.writePacketRTPInQueueUDP
cm.writePacketRTCPInQueue = cm.writePacketRTCPInQueueUDP
if cm.c.state == clientStateRecord || cm.media.IsBackChannel {
cm.udpRTPListener.readFunc = cm.readRTPUDPRecord
cm.udpRTCPListener.readFunc = cm.readRTCPUDPRecord
cm.udpRTPListener.readFunc = cm.readPacketRTPUDPRecord
cm.udpRTCPListener.readFunc = cm.readPacketRTCPUDPRecord
} else {
cm.udpRTPListener.readFunc = cm.readRTPUDPPlay
cm.udpRTCPListener.readFunc = cm.readRTCPUDPPlay
cm.udpRTPListener.readFunc = cm.readPacketRTPUDPPlay
cm.udpRTCPListener.readFunc = cm.readPacketRTCPUDPPlay
}
} else {
cm.writePacketRTPInQueue = cm.writePacketRTPInQueueTCP
cm.writePacketRTCPInQueue = cm.writePacketRTCPInQueueTCP
if cm.c.tcpCallbackByChannel == nil {
@@ -105,11 +117,11 @@ func (cm *clientMedia) start() {
}
if cm.c.state == clientStateRecord || cm.media.IsBackChannel {
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPRecord
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPRecord
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readPacketRTPTCPRecord
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readPacketRTCPTCPRecord
} else {
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readRTPTCPPlay
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readRTCPTCPPlay
cm.c.tcpCallbackByChannel[cm.tcpChannel] = cm.readPacketRTPTCPPlay
cm.c.tcpCallbackByChannel[cm.tcpChannel+1] = cm.readPacketRTCPTCPPlay
}
}
@@ -136,73 +148,82 @@ func (cm *clientMedia) stop() {
func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat {
for _, format := range cm.formats {
tssrc, ok := format.rtcpReceiver.SenderSSRC()
if ok && tssrc == ssrc {
stats := format.rtcpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
return format
}
}
return nil
}
func (cm *clientMedia) writePacketRTPInQueueUDP(payload []byte) error {
return cm.udpRTPListener.write(payload)
}
func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
return cm.udpRTCPListener.write(payload)
}
err := cm.udpRTCPListener.write(payload)
if err != nil {
return err
}
func (cm *clientMedia) writePacketRTPInQueueTCP(payload []byte) error {
cm.c.tcpFrame.Channel = cm.tcpChannel
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
cm.c.tcpFrame.Channel = cm.tcpChannel + 1
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
return cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
err := cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
func (cm *clientMedia) readRTPTCPPlay(payload []byte) bool {
func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
cm.c.OnDecodeError(err)
cm.onPacketRTPDecodeError(err)
return false
}
forma, ok := cm.formats[pkt.PayloadType]
if !ok {
cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readRTPTCP(pkt)
forma.readPacketRTPTCP(pkt)
return true
}
func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool {
func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
if len(payload) > udpMaxPayloadSize {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
cm.c.OnDecodeError(err)
cm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatWithSSRC(sr.SSRC)
@@ -217,22 +238,26 @@ func (cm *clientMedia) readRTCPTCPPlay(payload []byte) bool {
return true
}
func (cm *clientMedia) readRTPTCPRecord(_ []byte) bool {
func (cm *clientMedia) readPacketRTPTCPRecord(_ []byte) bool {
return false
}
func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool {
func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
if len(payload) > udpMaxPayloadSize {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
cm.c.OnDecodeError(err)
cm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
cm.onPacketRTCP(pkt)
}
@@ -240,47 +265,50 @@ func (cm *clientMedia) readRTCPTCPRecord(payload []byte) bool {
return true
}
func (cm *clientMedia) readRTPUDPPlay(payload []byte) bool {
plen := len(payload)
func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
if plen == (udpMaxPayloadSize + 1) {
cm.c.OnDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{})
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketTooBigUDP{})
return false
}
pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
cm.c.OnDecodeError(err)
cm.onPacketRTPDecodeError(err)
return false
}
forma, ok := cm.formats[pkt.PayloadType]
if !ok {
cm.c.OnDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readRTPUDP(pkt)
forma.readPacketRTPUDP(pkt)
return true
}
func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool {
now := cm.c.timeNow()
plen := len(payload)
func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
if plen == (udpMaxPayloadSize + 1) {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
cm.c.OnDecodeError(err)
cm.onPacketRTCPDecodeError(err)
return false
}
now := cm.c.timeNow()
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatWithSSRC(sr.SSRC)
@@ -295,27 +323,39 @@ func (cm *clientMedia) readRTCPUDPPlay(payload []byte) bool {
return true
}
func (cm *clientMedia) readRTPUDPRecord(_ []byte) bool {
func (cm *clientMedia) readPacketRTPUDPRecord(_ []byte) bool {
return false
}
func (cm *clientMedia) readRTCPUDPRecord(payload []byte) bool {
plen := len(payload)
func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
if plen == (udpMaxPayloadSize + 1) {
cm.c.OnDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
if len(payload) == (udpMaxPayloadSize + 1) {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBigUDP{})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
cm.c.OnDecodeError(err)
cm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
cm.onPacketRTCP(pkt)
}
return true
}
func (cm *clientMedia) onPacketRTPDecodeError(err error) {
atomic.AddUint64(cm.rtpPacketsInError, 1)
cm.c.OnDecodeError(err)
}
func (cm *clientMedia) onPacketRTCPDecodeError(err error) {
atomic.AddUint64(cm.rtcpPacketsInError, 1)
cm.c.OnDecodeError(err)
}

View File

@@ -545,10 +545,11 @@ func TestClientPlay(t *testing.T) {
<-packetRecv
require.Greater(t, atomic.LoadUint64(c.BytesSent), uint64(620))
require.Less(t, atomic.LoadUint64(c.BytesSent), uint64(850))
require.Greater(t, atomic.LoadUint64(c.BytesReceived), uint64(580))
require.Less(t, atomic.LoadUint64(c.BytesReceived), uint64(650))
s := c.Stats()
require.Greater(t, s.Session.BytesSent, uint64(19))
require.Less(t, s.Session.BytesSent, uint64(41))
require.Greater(t, s.Session.BytesReceived, uint64(31))
require.Less(t, s.Session.BytesReceived, uint64(37))
})
}
}

View File

@@ -7,7 +7,6 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -336,10 +335,11 @@ func TestClientRecord(t *testing.T) {
<-recvDone
require.Greater(t, atomic.LoadUint64(c.BytesSent), uint64(730))
require.Less(t, atomic.LoadUint64(c.BytesSent), uint64(760))
require.Greater(t, atomic.LoadUint64(c.BytesReceived), uint64(180))
require.Less(t, atomic.LoadUint64(c.BytesReceived), uint64(210))
s := c.Stats()
require.Greater(t, s.Session.BytesSent, uint64(15))
require.Less(t, s.Session.BytesSent, uint64(17))
require.Greater(t, s.Session.BytesReceived, uint64(19))
require.Less(t, s.Session.BytesReceived, uint64(21))
c.Close()
<-done

7
client_stats.go Normal file
View File

@@ -0,0 +1,7 @@
package gortsplib
// ClientStats are client statistics
type ClientStats struct {
Conn StatsConn
Session StatsSession
}

View File

@@ -173,8 +173,6 @@ func (u *clientUDPListener) run() {
now := u.c.timeNow()
atomic.StoreInt64(u.lastPacketTime, now.Unix())
atomic.AddUint64(u.c.BytesReceived, uint64(n))
if u.readFunc(buf[:n]) {
createNewBuffer()
}
@@ -182,8 +180,6 @@ func (u *clientUDPListener) run() {
}
func (u *clientUDPListener) write(payload []byte) error {
atomic.AddUint64(u.c.BytesSent, uint64(len(payload)))
// no mutex is needed here since Write() has an internal lock.
// https://github.com/golang/go/issues/27203#issuecomment-534386117
u.pc.SetWriteDeadline(time.Now().Add(u.c.WriteTimeout))

View File

@@ -30,7 +30,7 @@ func randUint32() (uint32, error) {
// RTCPReceiver is a utility to generate RTCP receiver reports.
type RTCPReceiver struct {
ClockRate int
ReceiverSSRC *uint32
LocalSSRC *uint32
Period time.Duration
TimeNow func() time.Time
WritePacketRTCP func(rtcp.Packet)
@@ -42,7 +42,7 @@ type RTCPReceiver struct {
timeInitialized bool
sequenceNumberCycles uint16
lastSequenceNumber uint16
senderSSRC uint32
remoteSSRC uint32
lastTimeRTP uint32
lastTimeSystem time.Time
totalLost uint32
@@ -62,12 +62,12 @@ type RTCPReceiver struct {
// Initialize initializes RTCPReceiver.
func (rr *RTCPReceiver) Initialize() error {
if rr.ReceiverSSRC == nil {
if rr.LocalSSRC == nil {
v, err := randUint32()
if err != nil {
return err
}
rr.ReceiverSSRC = &v
rr.LocalSSRC = &v
}
if rr.TimeNow == nil {
@@ -119,10 +119,10 @@ func (rr *RTCPReceiver) report() rtcp.Packet {
system := rr.TimeNow()
report := &rtcp.ReceiverReport{
SSRC: *rr.ReceiverSSRC,
SSRC: *rr.LocalSSRC,
Reports: []rtcp.ReceptionReport{
{
SSRC: rr.senderSSRC,
SSRC: rr.remoteSSRC,
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber),
// equivalent to taking the integer part after multiplying the
// loss fraction by 256
@@ -149,8 +149,8 @@ func (rr *RTCPReceiver) report() rtcp.Packet {
return report
}
// ProcessPacket extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
// ProcessPacketRTP extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacketRTP(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
rr.mutex.Lock()
defer rr.mutex.Unlock()
@@ -159,7 +159,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua
rr.firstRTPPacketReceived = true
rr.totalSinceReport = 1
rr.lastSequenceNumber = pkt.SequenceNumber
rr.senderSSRC = pkt.SSRC
rr.remoteSSRC = pkt.SSRC
if ptsEqualsDTS {
rr.timeInitialized = true
@@ -169,8 +169,8 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua
// subsequent packets
} else {
if pkt.SSRC != rr.senderSSRC {
return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.senderSSRC)
if pkt.SSRC != rr.remoteSSRC {
return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC)
}
diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber)
@@ -229,11 +229,7 @@ func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.T
rr.lastSenderReportTimeSystem = system
}
// PacketNTP returns the NTP timestamp of the packet.
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) {
if !rr.firstSenderReportReceived {
return time.Time{}, false
}
@@ -244,9 +240,39 @@ func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) {
// PacketNTP returns the NTP timestamp of the packet.
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
return rr.packetNTPUnsafe(ts)
}
// Stats are statistics.
type Stats struct {
RemoteSSRC uint32
LastSequenceNumber uint16
LastRTP uint32
LastNTP time.Time
Jitter float64
}
// Stats returns statistics.
func (rr *RTCPReceiver) Stats() *Stats {
rr.mutex.RLock()
defer rr.mutex.RUnlock()
return rr.senderSSRC, rr.firstRTPPacketReceived
if !rr.firstRTPPacketReceived {
return nil
}
ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP)
return &Stats{
RemoteSSRC: rr.remoteSSRC,
LastSequenceNumber: rr.lastSequenceNumber,
LastRTP: rr.lastTimeRTP,
LastNTP: ntp,
Jitter: rr.jitter,
}
}

View File

@@ -17,9 +17,9 @@ func TestRTCPReceiverBase(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
ClockRate: 90000,
LocalSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
@@ -64,7 +64,7 @@ func TestRTCPReceiverBase(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
@@ -79,7 +79,7 @@ func TestRTCPReceiverBase(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
@@ -89,9 +89,9 @@ func TestRTCPReceiverOverflow(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 250 * time.Millisecond,
ClockRate: 90000,
LocalSSRC: uint32Ptr(0x65f83afb),
Period: 250 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
},
@@ -138,7 +138,7 @@ func TestRTCPReceiverOverflow(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
@@ -153,7 +153,7 @@ func TestRTCPReceiverOverflow(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
@@ -163,9 +163,9 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
ClockRate: 90000,
LocalSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
},
@@ -215,7 +215,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
@@ -230,7 +230,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
@@ -240,9 +240,9 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
ClockRate: 90000,
LocalSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
},
@@ -292,7 +292,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
@@ -307,7 +307,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
@@ -317,9 +317,9 @@ func TestRTCPReceiverJitter(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
ClockRate: 90000,
LocalSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
@@ -365,7 +365,7 @@ func TestRTCPReceiverJitter(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
@@ -380,7 +380,7 @@ func TestRTCPReceiverJitter(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
@@ -395,7 +395,7 @@ func TestRTCPReceiverJitter(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, false)
err = rr.ProcessPacketRTP(&rtpPkt, ts, false)
require.NoError(t, err)
<-done

View File

@@ -26,11 +26,11 @@ type RTCPSender struct {
mutex sync.RWMutex
// data from RTP packets
initialized bool
firstRTPPacketSent bool
lastTimeRTP uint32
lastTimeNTP time.Time
lastTimeSystem time.Time
senderSSRC uint32
localSSRC uint32
lastSequenceNumber uint16
packetCount uint32
octetCount uint32
@@ -81,7 +81,7 @@ func (rs *RTCPSender) report() rtcp.Packet {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if !rs.initialized {
if !rs.firstRTPPacketSent {
return nil
}
@@ -90,7 +90,7 @@ func (rs *RTCPSender) report() rtcp.Packet {
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate))
return &rtcp.SenderReport{
SSRC: rs.senderSSRC,
SSRC: rs.localSSRC,
NTPTime: ntpTimeGoToRTCP(ntpTime),
RTPTime: rtpTime,
PacketCount: rs.packetCount,
@@ -98,17 +98,17 @@ func (rs *RTCPSender) report() rtcp.Packet {
}
}
// ProcessPacket extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
// ProcessPacketRTP extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacketRTP(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if ptsEqualsDTS {
rs.initialized = true
rs.firstRTPPacketSent = true
rs.lastTimeRTP = pkt.Timestamp
rs.lastTimeNTP = ntp
rs.lastTimeSystem = rs.TimeNow()
rs.senderSSRC = pkt.SSRC
rs.localSSRC = pkt.SSRC
}
rs.lastSequenceNumber = pkt.SequenceNumber
@@ -117,16 +117,27 @@ func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS
rs.octetCount += uint32(len(pkt.Payload))
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rs *RTCPSender) SenderSSRC() (uint32, bool) {
rs.mutex.RLock()
defer rs.mutex.RUnlock()
return rs.senderSSRC, rs.initialized
// Stats are statistics.
type Stats struct {
LocalSSRC uint32
LastSequenceNumber uint16
LastRTP uint32
LastNTP time.Time
}
// LastPacketData returns metadata of the last RTP packet.
func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) {
// Stats returns statistics.
func (rs *RTCPSender) Stats() *Stats {
rs.mutex.RLock()
defer rs.mutex.RUnlock()
return rs.lastSequenceNumber, rs.lastTimeRTP, rs.lastTimeNTP, rs.initialized
if !rs.firstRTPPacketSent {
return nil
}
return &Stats{
LocalSSRC: rs.localSSRC,
LastSequenceNumber: rs.lastSequenceNumber,
LastRTP: rs.lastTimeRTP,
LastNTP: rs.lastTimeNTP,
}
}

View File

@@ -63,7 +63,7 @@ func TestRTCPSender(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
rs.ProcessPacketRTP(&rtpPkt, ts, true)
setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC))
rtpPkt = rtp.Packet{
@@ -78,7 +78,7 @@ func TestRTCPSender(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
rs.ProcessPacketRTP(&rtpPkt, ts, true)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -92,7 +92,7 @@ func TestRTCPSender(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, false)
rs.ProcessPacketRTP(&rtpPkt, ts, false)
setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC))

View File

@@ -13,7 +13,7 @@ type LossDetector struct {
// Process processes a RTP packet.
// It returns the number of lost packets.
func (r *LossDetector) Process(pkt *rtp.Packet) int {
func (r *LossDetector) Process(pkt *rtp.Packet) uint {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
@@ -23,7 +23,7 @@ func (r *LossDetector) Process(pkt *rtp.Packet) int {
if pkt.SequenceNumber != r.expectedSeqNum {
diff := pkt.SequenceNumber - r.expectedSeqNum
r.expectedSeqNum = pkt.SequenceNumber + 1
return int(diff)
return uint(diff)
}
r.expectedSeqNum = pkt.SequenceNumber + 1

View File

@@ -15,19 +15,19 @@ func TestLossDetector(t *testing.T) {
SequenceNumber: 65530,
},
})
require.Equal(t, 0, c)
require.Equal(t, uint(0), c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65531,
},
})
require.Equal(t, 0, c)
require.Equal(t, uint(0), c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65535,
},
})
require.Equal(t, 3, c)
require.Equal(t, uint(3), c)
}

View File

@@ -27,7 +27,7 @@ func (r *Reorderer) Initialize() {
// Process processes a RTP packet.
// It returns a sequence of ordered packets and the number of lost packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
@@ -86,7 +86,7 @@ func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
ret[pos] = pkt
r.expectedSeqNum = pkt.SequenceNumber + 1
return ret, int(relPos) - n + 1
return ret, uint(int(relPos) - n + 1)
}
// there's a missing packet

View File

@@ -164,7 +164,7 @@ func TestReorder(t *testing.T) {
for _, entry := range sequence {
out, missing := r.Process(entry.in)
require.Equal(t, entry.out, out)
require.Equal(t, 0, missing)
require.Equal(t, uint(0), missing)
}
}
@@ -173,7 +173,7 @@ func TestBufferIsFull(t *testing.T) {
r.Initialize()
r.absPos = 25
sn := uint16(1564)
toMiss := 34
toMiss := uint(34)
out, missing := r.Process(&rtp.Packet{
Header: rtp.Header{
@@ -185,19 +185,19 @@ func TestBufferIsFull(t *testing.T) {
SequenceNumber: sn,
},
}}, out)
require.Equal(t, 0, missing)
require.Equal(t, uint(0), missing)
sn++
var expected []*rtp.Packet
for i := 0; i < 64-toMiss; i++ {
for i := uint(0); i < 64-toMiss; i++ {
out, missing = r.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: sn + uint16(toMiss),
},
})
require.Equal(t, []*rtp.Packet(nil), out)
require.Equal(t, 0, missing)
require.Equal(t, uint(0), missing)
expected = append(expected, &rtp.Packet{
Header: rtp.Header{
@@ -242,7 +242,7 @@ func TestReset(t *testing.T) {
},
})
require.Equal(t, []*rtp.Packet(nil), out)
require.Equal(t, 0, missing)
require.Equal(t, uint(0), missing)
sn++
}
@@ -256,5 +256,5 @@ func TestReset(t *testing.T) {
SequenceNumber: sn,
},
}}, out)
require.Equal(t, 0, missing)
require.Equal(t, uint(0), missing)
}

View File

@@ -251,7 +251,7 @@ func (e ErrClientWriteQueueFull) Error() string {
// ErrClientRTPPacketsLost is an error that can be returned by a client.
type ErrClientRTPPacketsLost struct {
Lost int
Lost uint
}
// Error implements the error interface.

View File

@@ -24,7 +24,7 @@ func New(
) (*RTCPReceiver, error) {
rr := &rtcpreceiver.RTCPReceiver{
ClockRate: clockRate,
ReceiverSSRC: receiverSSRC,
LocalSSRC: receiverSSRC,
Period: period,
TimeNow: timeNow,
WritePacketRTCP: writePacketRTCP,
@@ -44,7 +44,7 @@ func (rr *RTCPReceiver) Close() {
// ProcessPacket extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacket(pkt, system, ptsEqualsDTS)
return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacketRTP(pkt, system, ptsEqualsDTS)
}
// ProcessSenderReport extracts the needed data from RTCP sender reports.
@@ -59,5 +59,9 @@ func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) {
return (*rtcpreceiver.RTCPReceiver)(rr).SenderSSRC()
stats := (*rtcpreceiver.RTCPReceiver)(rr).Stats()
if stats == nil {
return 0, false
}
return stats.RemoteSSRC, true
}

View File

@@ -39,15 +39,25 @@ func (rs *RTCPSender) Close() {
// ProcessPacket extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
(*rtcpsender.RTCPSender)(rs).ProcessPacket(pkt, ntp, ptsEqualsDTS)
(*rtcpsender.RTCPSender)(rs).ProcessPacketRTP(pkt, ntp, ptsEqualsDTS)
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rs *RTCPSender) SenderSSRC() (uint32, bool) {
return (*rtcpsender.RTCPSender)(rs).SenderSSRC()
stats := (*rtcpsender.RTCPSender)(rs).Stats()
if stats == nil {
return 0, false
}
return stats.LocalSSRC, true
}
// LastPacketData returns metadata of the last RTP packet.
func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) {
return (*rtcpsender.RTCPSender)(rs).LastPacketData()
stats := (*rtcpsender.RTCPSender)(rs).Stats()
if stats == nil {
return 0, 0, time.Time{}, false
}
return stats.LastSequenceNumber, stats.LastRTP, stats.LastNTP, true
}

View File

@@ -18,6 +18,6 @@ func New() *LossDetector {
// Process processes a RTP packet.
// It returns the number of lost packets.
func (r *LossDetector) Process(pkt *rtp.Packet) int {
func (r *LossDetector) Process(pkt *rtp.Packet) uint {
return (*rtplossdetector.LossDetector)(r).Process(pkt)
}

View File

@@ -15,19 +15,19 @@ func TestLossDetector(t *testing.T) {
SequenceNumber: 65530,
},
})
require.Equal(t, 0, c)
require.Equal(t, uint(0), c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65531,
},
})
require.Equal(t, 0, c)
require.Equal(t, uint(0), c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65535,
},
})
require.Equal(t, 3, c)
require.Equal(t, uint(3), c)
}

View File

@@ -22,6 +22,6 @@ func New() *Reorderer {
// Process processes a RTP packet.
// It returns a sequence of ordered packets and the number of lost packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
return (*rtpreorderer.Reorderer)(r).Process(pkt)
}

View File

@@ -101,11 +101,15 @@ func (sc *ServerConn) NetConn() net.Conn {
}
// BytesReceived returns the number of read bytes.
//
// Deprecated: replaced by Stats()
func (sc *ServerConn) BytesReceived() uint64 {
return sc.bc.BytesReceived()
}
// BytesSent returns the number of written bytes.
//
// Deprecated: replaced by Stats()
func (sc *ServerConn) BytesSent() uint64 {
return sc.bc.BytesSent()
}
@@ -120,6 +124,14 @@ func (sc *ServerConn) UserData() interface{} {
return sc.userData
}
// Stats returns connection statistics.
func (sc *ServerConn) Stats() *StatsConn {
return &StatsConn{
BytesReceived: sc.bc.BytesReceived(),
BytesSent: sc.bc.BytesSent(),
}
}
func (sc *ServerConn) ip() net.IP {
return sc.remoteAddr.IP
}

View File

@@ -3,7 +3,6 @@ package gortsplib
import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/base"
@@ -131,8 +130,6 @@ func (cr *serverConnReader) readFuncTCP() error {
return liberrors.ErrServerUnexpectedResponse{}
case *base.InterleavedFrame:
atomic.AddUint64(cr.sc.session.bytesReceived, uint64(len(what.Payload)))
if cb, ok := cr.sc.session.tcpCallbackByChannel[what.Channel]; ok {
cb(what.Payload)
}

View File

@@ -609,10 +609,11 @@ func TestServerPlay(t *testing.T) {
close(nconnOpened)
},
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
require.Greater(t, ctx.Conn.BytesSent(), uint64(810))
require.Less(t, ctx.Conn.BytesSent(), uint64(1150))
require.Greater(t, ctx.Conn.BytesReceived(), uint64(440))
require.Less(t, ctx.Conn.BytesReceived(), uint64(660))
s := ctx.Conn.Stats()
require.Greater(t, s.BytesSent, uint64(810))
require.Less(t, s.BytesSent, uint64(1150))
require.Greater(t, s.BytesReceived, uint64(440))
require.Less(t, s.BytesReceived, uint64(660))
close(nconnClosed)
},
@@ -621,10 +622,11 @@ func TestServerPlay(t *testing.T) {
},
onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) {
if transport != "multicast" {
require.Greater(t, ctx.Session.BytesSent(), uint64(50))
require.Less(t, ctx.Session.BytesSent(), uint64(60))
require.Greater(t, ctx.Session.BytesReceived(), uint64(15))
require.Less(t, ctx.Session.BytesReceived(), uint64(25))
s := ctx.Session.Stats()
require.Greater(t, s.BytesSent, uint64(50))
require.Less(t, s.BytesSent, uint64(60))
require.Greater(t, s.BytesReceived, uint64(15))
require.Less(t, s.BytesReceived, uint64(25))
}
close(sessionClosed)
@@ -2325,7 +2327,7 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
}
}
func TestServerPlayBytesSent(t *testing.T) {
func TestServerPlayStreamStats(t *testing.T) {
var stream *ServerStream
s := &Server{
@@ -2355,7 +2357,6 @@ func TestServerPlayBytesSent(t *testing.T) {
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = NewServerStream(s, &description.Session{Medias: []*description.Media{testH264Media}})
defer stream.Close()
@@ -2393,5 +2394,6 @@ func TestServerPlayBytesSent(t *testing.T) {
err = stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
require.Equal(t, uint64(16*2), stream.BytesSent())
st := stream.Stats()
require.Equal(t, uint64(16*2), st.BytesSent)
}

View File

@@ -545,10 +545,11 @@ func TestServerRecord(t *testing.T) {
close(nconnOpened)
},
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
require.Greater(t, ctx.Conn.BytesSent(), uint64(510))
require.Less(t, ctx.Conn.BytesSent(), uint64(560))
require.Greater(t, ctx.Conn.BytesReceived(), uint64(1000))
require.Less(t, ctx.Conn.BytesReceived(), uint64(1200))
s := ctx.Conn.Stats()
require.Greater(t, s.BytesSent, uint64(510))
require.Less(t, s.BytesSent, uint64(560))
require.Greater(t, s.BytesReceived, uint64(1000))
require.Less(t, s.BytesReceived, uint64(1200))
close(nconnClosed)
},
@@ -556,10 +557,11 @@ func TestServerRecord(t *testing.T) {
close(sessionOpened)
},
onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) {
require.Greater(t, ctx.Session.BytesSent(), uint64(75))
require.Less(t, ctx.Session.BytesSent(), uint64(130))
require.Greater(t, ctx.Session.BytesReceived(), uint64(70))
require.Less(t, ctx.Session.BytesReceived(), uint64(80))
s := ctx.Session.Stats()
require.Greater(t, s.BytesSent, uint64(75))
require.Less(t, s.BytesSent, uint64(130))
require.Greater(t, s.BytesReceived, uint64(70))
require.Less(t, s.BytesReceived, uint64(80))
close(sessionClosed)
},

View File

@@ -14,6 +14,8 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
@@ -186,7 +188,7 @@ func generateRTPInfo(
Scheme: u.Scheme,
Host: u.Host,
Path: setuppedPath + "/trackID=" +
strconv.FormatInt(int64(setuppedStream.streamMedias[sm.media].trackID), 10),
strconv.FormatInt(int64(setuppedStream.medias[sm.media].trackID), 10),
}).String()
ri = append(ri, entry)
}
@@ -235,8 +237,6 @@ type ServerSession struct {
secretID string // must not be shared, allows to take ownership of the session
ctx context.Context
ctxCancel func()
bytesReceived *uint64
bytesSent *uint64
userData interface{}
conns map[*ServerConn]struct{}
state ServerSessionState
@@ -272,11 +272,10 @@ func (ss *ServerSession) initialize() {
ss.secretID = secretID
ss.ctx = ctx
ss.ctxCancel = ctxCancel
ss.bytesReceived = new(uint64)
ss.bytesSent = new(uint64)
ss.conns = make(map[*ServerConn]struct{})
ss.lastRequestTime = ss.s.timeNow()
ss.udpCheckStreamTimer = emptyTimer()
ss.chHandleRequest = make(chan sessionRequestReq)
ss.chRemoveConn = make(chan *ServerConn)
ss.chStartWriter = make(chan struct{})
@@ -291,13 +290,25 @@ func (ss *ServerSession) Close() {
}
// BytesReceived returns the number of read bytes.
//
// Deprecated: replaced by Stats()
func (ss *ServerSession) BytesReceived() uint64 {
return atomic.LoadUint64(ss.bytesReceived)
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesReceived)
}
return v
}
// BytesSent returns the number of written bytes.
//
// Deprecated: replaced by Stats()
func (ss *ServerSession) BytesSent() uint64 {
return atomic.LoadUint64(ss.bytesSent)
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesSent)
}
return v
}
// State returns the state of the session.
@@ -349,25 +360,190 @@ func (ss *ServerSession) UserData() interface{} {
return ss.userData
}
func (ss *ServerSession) onPacketLost(err error) {
if h, ok := ss.s.Handler.(ServerHandlerOnPacketLost); ok {
h.OnPacketLost(&ServerHandlerOnPacketLostCtx{
Session: ss,
Error: err,
})
} else {
log.Println(err.Error())
}
}
// Stats returns server session statistics.
func (ss *ServerSession) Stats() *StatsSession {
return &StatsSession{
BytesReceived: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesReceived)
}
return v
}(),
BytesSent: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.bytesSent)
}
return v
}(),
RTPPacketsReceived: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
for _, f := range sm.formats {
v += atomic.LoadUint64(f.rtpPacketsReceived)
}
}
return v
}(),
RTPPacketsSent: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
for _, f := range sm.formats {
v += atomic.LoadUint64(f.rtpPacketsSent)
}
}
return v
}(),
RTPPacketsLost: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
for _, f := range sm.formats {
v += atomic.LoadUint64(f.rtpPacketsLost)
}
}
return v
}(),
RTPPacketsInError: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.rtpPacketsInError)
}
return v
}(),
RTPJitter: func() float64 {
v := float64(0)
n := float64(0)
for _, sm := range ss.setuppedMedias {
for _, fo := range sm.formats {
if fo.rtcpReceiver != nil {
stats := fo.rtcpReceiver.Stats()
if stats != nil {
v += stats.Jitter
n++
}
}
}
}
return v / n
}(),
RTCPPacketsReceived: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.rtcpPacketsReceived)
}
return v
}(),
RTCPPacketsSent: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.rtcpPacketsSent)
}
return v
}(),
RTCPPacketsInError: func() uint64 {
v := uint64(0)
for _, sm := range ss.setuppedMedias {
v += atomic.LoadUint64(sm.rtcpPacketsInError)
}
return v
}(),
Medias: func() map[*description.Media]StatsSessionMedia { //nolint:dupl
ret := make(map[*description.Media]StatsSessionMedia, len(ss.setuppedMedias))
func (ss *ServerSession) onDecodeError(err error) {
if h, ok := ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: ss,
Error: err,
})
} else {
log.Println(err.Error())
for med, sm := range ss.setuppedMedias {
ret[med] = StatsSessionMedia{
BytesReceived: atomic.LoadUint64(sm.bytesReceived),
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTPPacketsInError: atomic.LoadUint64(sm.rtpPacketsInError),
RTCPPacketsReceived: atomic.LoadUint64(sm.rtcpPacketsReceived),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
RTCPPacketsInError: atomic.LoadUint64(sm.rtcpPacketsInError),
Formats: func() map[format.Format]StatsSessionFormat {
ret := make(map[format.Format]StatsSessionFormat, len(sm.formats))
for _, fo := range sm.formats {
recvStats := func() *rtcpreceiver.Stats {
if fo.rtcpReceiver != nil {
return fo.rtcpReceiver.Stats()
}
return nil
}()
rtcpSender := func() *rtcpsender.RTCPSender {
if ss.setuppedStream != nil {
return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender
}
return nil
}()
sentStats := func() *rtcpsender.Stats {
if rtcpSender != nil {
return rtcpSender.Stats()
}
return nil
}()
ret[fo.format] = StatsSessionFormat{ //nolint:dupl
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: func() uint32 {
if fo.rtcpReceiver != nil {
return *fo.rtcpReceiver.LocalSSRC
}
if sentStats != nil {
return sentStats.LocalSSRC
}
return 0
}(),
RemoteSSRC: func() uint32 {
if recvStats != nil {
return recvStats.RemoteSSRC
}
return 0
}(),
RTPPacketsLastSequenceNumber: func() uint16 {
if recvStats != nil {
return recvStats.LastSequenceNumber
}
if sentStats != nil {
return sentStats.LastSequenceNumber
}
return 0
}(),
RTPPacketsLastRTP: func() uint32 {
if recvStats != nil {
return recvStats.LastRTP
}
if sentStats != nil {
return sentStats.LastRTP
}
return 0
}(),
RTPPacketsLastNTP: func() time.Time {
if recvStats != nil {
return recvStats.LastNTP
}
if sentStats != nil {
return sentStats.LastNTP
}
return time.Time{}
}(),
RTPJitter: func() float64 {
if recvStats != nil {
return recvStats.Jitter
}
return 0
}(),
}
}
return ret
}(),
}
}
return ret
}(),
}
}
@@ -848,7 +1024,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
th := headers.Transport{}
if ss.state == ServerSessionStatePrePlay {
ssrc, ok := stream.senderSSRC(medi)
ssrc, ok := stream.localSSRC(medi)
if ok {
th.SSRC = &ssrc
}
@@ -894,7 +1070,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
th.Delivery = &de
v := uint(127)
th.TTL = &v
d := stream.streamMedias[medi].multicastWriter.ip()
d := stream.medias[medi].multicastWriter.ip()
th.Destination = &d
th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort}
@@ -1229,7 +1405,7 @@ func (ss *ServerSession) findFreeChannelPair() int {
}
}
// OnPacketRTPAny sets the callback that is called when a RTP packet is read from any setupped media.
// OnPacketRTPAny sets a callback that is called when a RTP packet is read from any setupped media.
func (ss *ServerSession) OnPacketRTPAny(cb OnPacketRTPAnyFunc) {
for _, sm := range ss.setuppedMedias {
cmedia := sm.media
@@ -1241,7 +1417,7 @@ func (ss *ServerSession) OnPacketRTPAny(cb OnPacketRTPAnyFunc) {
}
}
// OnPacketRTCPAny sets the callback that is called when a RTCP packet is read from any setupped media.
// OnPacketRTCPAny sets a callback that is called when a RTCP packet is read from any setupped media.
func (ss *ServerSession) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
for _, sm := range ss.setuppedMedias {
cmedia := sm.media
@@ -1251,24 +1427,25 @@ func (ss *ServerSession) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
}
}
// OnPacketRTP sets the callback that is called when a RTP packet is read.
// OnPacketRTP sets a callback that is called when a RTP packet is read.
func (ss *ServerSession) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) {
sm := ss.setuppedMedias[medi]
st := sm.formats[forma.PayloadType()]
st.onPacketRTP = cb
}
// OnPacketRTCP sets the callback that is called when a RTCP packet is read.
// OnPacketRTCP sets a callback that is called when a RTCP packet is read.
func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) {
sm := ss.setuppedMedias[medi]
sm.onPacketRTCP = cb
}
func (ss *ServerSession) writePacketRTP(medi *description.Media, byts []byte) error {
func (ss *ServerSession) writePacketRTP(medi *description.Media, payloadType uint8, byts []byte) error {
sm := ss.setuppedMedias[medi]
sf := sm.formats[payloadType]
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTPInQueue(byts)
ok := ss.writer.push(func() error {
return sf.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
@@ -1286,7 +1463,7 @@ func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet
}
byts = byts[:n]
return ss.writePacketRTP(medi, byts)
return ss.writePacketRTP(medi, pkt.PayloadType, byts)
}
func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error {

View File

@@ -1,6 +1,8 @@
package gortsplib
import (
"log"
"sync/atomic"
"time"
"github.com/pion/rtcp"
@@ -18,12 +20,30 @@ type serverSessionFormat struct {
format format.Format
onPacketRTP OnPacketRTPFunc
udpReorderer *rtpreorderer.Reorderer
tcpLossDetector *rtplossdetector.LossDetector
rtcpReceiver *rtcpreceiver.RTCPReceiver
udpReorderer *rtpreorderer.Reorderer
tcpLossDetector *rtplossdetector.LossDetector
rtcpReceiver *rtcpreceiver.RTCPReceiver
writePacketRTPInQueue func([]byte) error
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
rtpPacketsLost *uint64
}
func (sf *serverSessionFormat) initialize() {
sf.rtpPacketsReceived = new(uint64)
sf.rtpPacketsSent = new(uint64)
sf.rtpPacketsLost = new(uint64)
}
func (sf *serverSessionFormat) start() {
switch *sf.sm.ss.setuppedTransport {
case TransportUDP, TransportUDPMulticast:
sf.writePacketRTPInQueue = sf.writePacketRTPInQueueUDP
default:
sf.writePacketRTPInQueue = sf.writePacketRTPInQueueTCP
}
if sf.sm.ss.state != ServerSessionStatePlay {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.udpReorderer = &rtpreorderer.Reorderer{}
@@ -56,38 +76,76 @@ func (sf *serverSessionFormat) stop() {
}
}
func (sf *serverSessionFormat) readRTPUDP(pkt *rtp.Packet, now time.Time) {
func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) {
packets, lost := sf.udpReorderer.Process(pkt)
if lost != 0 {
sf.sm.ss.onPacketLost(liberrors.ErrServerRTPPacketsLost{Lost: lost})
sf.onPacketRTPLost(lost)
// do not return
}
for _, pkt := range packets {
err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
if err != nil {
sf.sm.ss.onDecodeError(err)
continue
}
sf.onPacketRTP(pkt)
sf.handlePacketRTP(pkt, now)
}
}
func (sf *serverSessionFormat) readRTPTCP(pkt *rtp.Packet) {
func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) {
lost := sf.tcpLossDetector.Process(pkt)
if lost != 0 {
sf.sm.ss.onPacketLost(liberrors.ErrServerRTPPacketsLost{Lost: lost})
sf.onPacketRTPLost(lost)
// do not return
}
now := sf.sm.ss.s.timeNow()
err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
sf.handlePacketRTP(pkt, now)
}
func (sf *serverSessionFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) {
err := sf.rtcpReceiver.ProcessPacketRTP(pkt, now, sf.format.PTSEqualsDTS(pkt))
if err != nil {
sf.sm.ss.onDecodeError(err)
sf.sm.onPacketRTPDecodeError(err)
return
}
atomic.AddUint64(sf.rtpPacketsReceived, 1)
sf.onPacketRTP(pkt)
}
func (sf *serverSessionFormat) onPacketRTPLost(lost uint) {
atomic.AddUint64(sf.rtpPacketsLost, uint64(lost))
if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketLost); ok {
h.OnPacketLost(&ServerHandlerOnPacketLostCtx{
Session: sf.sm.ss,
Error: liberrors.ErrServerRTPPacketsLost{Lost: lost},
})
} else {
log.Println(liberrors.ErrServerRTPPacketsLost{Lost: lost}.Error())
}
}
func (sf *serverSessionFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr)
if err != nil {
return err
}
atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
return nil
}
func (sf *serverSessionFormat) writePacketRTPInQueueTCP(payload []byte) error {
sf.sm.ss.tcpFrame.Channel = sf.sm.tcpChannel
sf.sm.ss.tcpFrame.Payload = payload
sf.sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sf.sm.ss.s.WriteTimeout))
err := sf.sm.ss.tcpConn.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(sf.sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sf.rtpPacketsSent, 1)
return nil
}

View File

@@ -1,6 +1,7 @@
package gortsplib
import (
"log"
"net"
"sync/atomic"
"time"
@@ -23,20 +24,33 @@ type serverSessionMedia struct {
udpRTCPReadPort int
udpRTCPWriteAddr *net.UDPAddr
formats map[uint8]*serverSessionFormat // record only
writePacketRTPInQueue func([]byte) error
writePacketRTCPInQueue func([]byte) error
bytesReceived *uint64
bytesSent *uint64
rtpPacketsInError *uint64
rtcpPacketsReceived *uint64
rtcpPacketsSent *uint64
rtcpPacketsInError *uint64
}
func (sm *serverSessionMedia) initialize() {
if sm.ss.state == ServerSessionStatePreRecord {
sm.formats = make(map[uint8]*serverSessionFormat)
for _, forma := range sm.media.Formats {
sm.formats[forma.PayloadType()] = &serverSessionFormat{
sm: sm,
format: forma,
onPacketRTP: func(*rtp.Packet) {},
}
sm.bytesReceived = new(uint64)
sm.bytesSent = new(uint64)
sm.rtpPacketsInError = new(uint64)
sm.rtcpPacketsReceived = new(uint64)
sm.rtcpPacketsSent = new(uint64)
sm.rtcpPacketsInError = new(uint64)
sm.formats = make(map[uint8]*serverSessionFormat)
for _, forma := range sm.media.Formats {
f := &serverSessionFormat{
sm: sm,
format: forma,
onPacketRTP: func(*rtp.Packet) {},
}
f.initialize()
sm.formats[forma.PayloadType()] = f
}
}
@@ -49,7 +63,6 @@ func (sm *serverSessionMedia) start() {
switch *sm.ss.setuppedTransport {
case TransportUDP, TransportUDPMulticast:
sm.writePacketRTPInQueue = sm.writePacketRTPInQueueUDP
sm.writePacketRTCPInQueue = sm.writePacketRTCPInQueueUDP
if *sm.ss.setuppedTransport == TransportUDP {
@@ -57,7 +70,7 @@ func (sm *serverSessionMedia) start() {
// firewall opening is performed with RTCP sender reports generated by ServerStream
// readers can send RTCP packets only
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readRTCPUDPPlay)
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readPacketRTCPUDPPlay)
} else {
// open the firewall by sending empty packets to the counterpart.
byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal()
@@ -66,13 +79,12 @@ func (sm *serverSessionMedia) start() {
byts, _ = (&rtcp.ReceiverReport{}).Marshal()
sm.ss.s.udpRTCPListener.write(byts, sm.udpRTCPWriteAddr) //nolint:errcheck
sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readRTPUDPRecord)
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readRTCPUDPRecord)
sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readPacketRTPUDPRecord)
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readPacketRTCPUDPRecord)
}
}
case TransportTCP:
sm.writePacketRTPInQueue = sm.writePacketRTPInQueueTCP
sm.writePacketRTCPInQueue = sm.writePacketRTCPInQueueTCP
if sm.ss.tcpCallbackByChannel == nil {
@@ -80,11 +92,11 @@ func (sm *serverSessionMedia) start() {
}
if sm.ss.state == ServerSessionStatePlay {
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPPlay
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPPlay
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readPacketRTPTCPPlay
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readPacketRTCPTCPPlay
} else {
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readRTPTCPRecord
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readRTCPTCPRecord
sm.ss.tcpCallbackByChannel[sm.tcpChannel] = sm.readPacketRTPTCPRecord
sm.ss.tcpCallbackByChannel[sm.tcpChannel+1] = sm.readPacketRTCPTCPRecord
}
}
}
@@ -102,59 +114,58 @@ func (sm *serverSessionMedia) stop() {
func (sm *serverSessionMedia) findFormatWithSSRC(ssrc uint32) *serverSessionFormat {
for _, format := range sm.formats {
tssrc, ok := format.rtcpReceiver.SenderSSRC()
if ok && tssrc == ssrc {
stats := format.rtcpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
return format
}
}
return nil
}
func (sm *serverSessionMedia) writePacketRTPInQueueUDP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
return sm.ss.s.udpRTPListener.write(payload, sm.udpRTPWriteAddr)
}
func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
return sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr)
}
err := sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr)
if err != nil {
return err
}
func (sm *serverSessionMedia) writePacketRTPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.ss.tcpFrame.Channel = sm.tcpChannel
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
atomic.AddUint64(sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
return nil
}
func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
atomic.AddUint64(sm.ss.bytesSent, uint64(len(payload)))
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
return sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
return nil
}
func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool {
plen := len(payload)
func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
sm.onPacketRTCPDecodeError(err)
return false
}
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
sm.onPacketRTCP(pkt)
}
@@ -162,56 +173,54 @@ func (sm *serverSessionMedia) readRTCPUDPPlay(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) readRTPUDPRecord(payload []byte) bool {
plen := len(payload)
func (sm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return false
}
pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
forma.readRTPUDP(pkt, now)
forma.readPacketRTPUDP(pkt, now)
return true
}
func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) bool {
plen := len(payload)
func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
atomic.AddUint64(sm.ss.bytesReceived, uint64(plen))
if plen == (udpMaxPayloadSize + 1) {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
sm.onPacketRTCPDecodeError(err)
return false
}
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatWithSSRC(sr.SSRC)
@@ -226,22 +235,26 @@ func (sm *serverSessionMedia) readRTCPUDPRecord(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) readRTPTCPPlay(_ []byte) bool {
func (sm *serverSessionMedia) readPacketRTPTCPPlay(_ []byte) bool {
return false
}
func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) bool {
func (sm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) > udpMaxPayloadSize {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
sm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
sm.onPacketRTCP(pkt)
}
@@ -249,39 +262,45 @@ func (sm *serverSessionMedia) readRTCPTCPPlay(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) readRTPTCPRecord(payload []byte) bool {
func (sm *serverSessionMedia) readPacketRTPTCPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.ss.onDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readRTPTCP(pkt)
forma.readPacketRTPTCP(pkt)
return true
}
func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) bool {
func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) > udpMaxPayloadSize {
sm.ss.onDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
}
packets, err := rtcp.Unmarshal(payload)
if err != nil {
sm.ss.onDecodeError(err)
sm.onPacketRTCPDecodeError(err)
return false
}
now := sm.ss.s.timeNow()
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatWithSSRC(sr.SSRC)
@@ -295,3 +314,29 @@ func (sm *serverSessionMedia) readRTCPTCPRecord(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) onPacketRTPDecodeError(err error) {
atomic.AddUint64(sm.rtpPacketsInError, 1)
if h, ok := sm.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: sm.ss,
Error: err,
})
} else {
log.Println(err.Error())
}
}
func (sm *serverSessionMedia) onPacketRTCPDecodeError(err error) {
atomic.AddUint64(sm.rtcpPacketsInError, 1)
if h, ok := sm.ss.s.Handler.(ServerHandlerOnDecodeError); ok {
h.OnDecodeError(&ServerHandlerOnDecodeErrorCtx{
Session: sm.ss,
Error: err,
})
} else {
log.Println(err.Error())
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
@@ -36,9 +37,8 @@ type ServerStream struct {
readers map[*ServerSession]struct{}
multicastReaderCount int
activeUnicastReaders map[*ServerSession]struct{}
streamMedias map[*description.Media]*serverStreamMedia
medias map[*description.Media]*serverStreamMedia
closed bool
bytesSent *uint64
}
// NewServerStream allocates a ServerStream.
@@ -48,10 +48,9 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream {
desc: desc,
readers: make(map[*ServerSession]struct{}),
activeUnicastReaders: make(map[*ServerSession]struct{}),
bytesSent: new(uint64),
}
st.streamMedias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias))
st.medias = make(map[*description.Media]*serverStreamMedia, len(desc.Medias))
for i, medi := range desc.Medias {
sm := &serverStreamMedia{
st: st,
@@ -59,7 +58,7 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream {
trackID: i,
}
sm.initialize()
st.streamMedias[medi] = sm
st.medias[medi] = sm
}
return st
@@ -75,14 +74,20 @@ func (st *ServerStream) Close() {
ss.Close()
}
for _, sm := range st.streamMedias {
for _, sm := range st.medias {
sm.close()
}
}
// BytesSent returns the number of written bytes.
//
// Deprecated: replaced by Stats()
func (st *ServerStream) BytesSent() uint64 {
return atomic.LoadUint64(st.bytesSent)
v := uint64(0)
for _, me := range st.medias {
v += atomic.LoadUint64(me.bytesSent)
}
return v
}
// Description returns the description of the stream.
@@ -90,27 +95,84 @@ func (st *ServerStream) Description() *description.Session {
return st.desc
}
func (st *ServerStream) senderSSRC(medi *description.Media) (uint32, bool) {
// Stats returns stream statistics.
func (st *ServerStream) Stats() *ServerStreamStats {
return &ServerStreamStats{
BytesSent: func() uint64 {
v := uint64(0)
for _, me := range st.medias {
v += atomic.LoadUint64(me.bytesSent)
}
return v
}(),
RTPPacketsSent: func() uint64 {
v := uint64(0)
for _, me := range st.medias {
for _, f := range me.formats {
v += atomic.LoadUint64(f.rtpPacketsSent)
}
}
return v
}(),
RTCPPacketsSent: func() uint64 {
v := uint64(0)
for _, me := range st.medias {
v += atomic.LoadUint64(me.rtcpPacketsSent)
}
return v
}(),
Medias: func() map[*description.Media]ServerStreamStatsMedia {
ret := make(map[*description.Media]ServerStreamStatsMedia, len(st.medias))
for med, sm := range st.medias {
ret[med] = ServerStreamStatsMedia{
BytesSent: atomic.LoadUint64(sm.bytesSent),
RTCPPacketsSent: atomic.LoadUint64(sm.rtcpPacketsSent),
Formats: func() map[format.Format]ServerStreamStatsFormat {
ret := make(map[format.Format]ServerStreamStatsFormat)
for _, fo := range sm.formats {
ret[fo.format] = ServerStreamStatsFormat{
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
}
}
return ret
}(),
}
}
return ret
}(),
}
}
func (st *ServerStream) localSSRC(medi *description.Media) (uint32, bool) {
st.mutex.Lock()
defer st.mutex.Unlock()
sm := st.streamMedias[medi]
sm := st.medias[medi]
// senderSSRC() is used to fill SSRC inside the Transport header.
// localSSRC() is used to fill SSRC inside the Transport header.
// if there are multiple formats inside a single media stream,
// do not return anything, since Transport headers don't support multiple SSRCs.
if len(sm.formats) > 1 {
return 0, false
}
return firstFormat(sm.formats).rtcpSender.SenderSSRC()
stats := firstFormat(sm.formats).rtcpSender.Stats()
if stats == nil {
return 0, false
}
return stats.LocalSSRC, true
}
func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *headers.RTPInfoEntry {
st.mutex.Lock()
defer st.mutex.Unlock()
sm := st.streamMedias[medi]
sm := st.medias[medi]
// if there are multiple formats inside a single media stream,
// do not generate a RTP-Info entry, since RTP-Info doesn't support
@@ -121,8 +183,8 @@ func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *he
format := firstFormat(sm.formats)
lastSeqNum, lastTimeRTP, lastTimeNTP, ok := format.rtcpSender.LastPacketData()
if !ok {
stats := format.rtcpSender.Stats()
if stats == nil {
return nil
}
@@ -132,13 +194,13 @@ func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *he
}
// sequence number of the first packet of the stream
seqNum := lastSeqNum + 1
seqNum := stats.LastSequenceNumber + 1
// RTP timestamp corresponding to the time value in
// the Range response header.
// remove a small quantity in order to avoid DTS > PTS
ts := uint32(uint64(lastTimeRTP) +
uint64(now.Sub(lastTimeNTP).Seconds()*float64(clockRate)) -
ts := uint32(uint64(stats.LastRTP) +
uint64(now.Sub(stats.LastNTP).Seconds()*float64(clockRate)) -
uint64(clockRate)/10)
return &headers.RTPInfoEntry{
@@ -175,7 +237,7 @@ func (st *ServerStream) readerAdd(
case TransportUDPMulticast:
if st.multicastReaderCount == 0 {
for _, media := range st.streamMedias {
for _, media := range st.medias {
mw := &serverMulticastWriter{
s: st.s,
}
@@ -207,7 +269,7 @@ func (st *ServerStream) readerRemove(ss *ServerSession) {
if *ss.setuppedTransport == TransportUDPMulticast {
st.multicastReaderCount--
if st.multicastReaderCount == 0 {
for _, media := range st.streamMedias {
for _, media := range st.medias {
media.multicastWriter.close()
media.multicastWriter = nil
}
@@ -225,9 +287,9 @@ func (st *ServerStream) readerSetActive(ss *ServerSession) {
if *ss.setuppedTransport == TransportUDPMulticast {
for medi, sm := range ss.setuppedMedias {
streamMedia := st.streamMedias[medi]
streamMedia := st.medias[medi]
streamMedia.multicastWriter.rtcpl.addClient(
ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm.readRTCPUDPPlay)
ss.author.ip(), streamMedia.multicastWriter.rtcpl.port(), sm.readPacketRTCPUDPPlay)
}
} else {
st.activeUnicastReaders[ss] = struct{}{}
@@ -244,7 +306,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
if *ss.setuppedTransport == TransportUDPMulticast {
for medi := range ss.setuppedMedias {
streamMedia := st.streamMedias[medi]
streamMedia := st.medias[medi]
streamMedia.multicastWriter.rtcpl.removeClient(ss.author.ip(), streamMedia.multicastWriter.rtcpl.port())
}
} else {
@@ -274,7 +336,7 @@ func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.
return liberrors.ErrServerStreamClosed{}
}
sm := st.streamMedias[medi]
sm := st.medias[medi]
sf := sm.formats[pkt.PayloadType]
return sf.writePacketRTP(byts, pkt, ntp)
}
@@ -293,6 +355,6 @@ func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet
return liberrors.ErrServerStreamClosed{}
}
sm := st.streamMedias[medi]
sm := st.medias[medi]
return sm.writePacketRTCP(byts)
}

View File

@@ -15,10 +15,13 @@ type serverStreamFormat struct {
sm *serverStreamMedia
format format.Format
rtcpSender *rtcpsender.RTCPSender
rtcpSender *rtcpsender.RTCPSender
rtpPacketsSent *uint64
}
func (sf *serverStreamFormat) initialize() {
sf.rtpPacketsSent = new(uint64)
sf.rtcpSender = &rtcpsender.RTCPSender{
ClockRate: sf.format.ClockRate(),
Period: sf.sm.st.s.senderReportPeriod,
@@ -33,19 +36,21 @@ func (sf *serverStreamFormat) initialize() {
}
func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
sf.rtcpSender.ProcessPacketRTP(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
le := uint64(len(byts))
// send unicast
for r := range sf.sm.st.activeUnicastReaders {
if _, ok := r.setuppedMedias[sf.sm.media]; ok {
err := r.writePacketRTP(sf.sm.media, byts)
err := r.writePacketRTP(sf.sm.media, pkt.PayloadType, byts)
if err != nil {
r.onStreamWriteError(err)
} else {
atomic.AddUint64(sf.sm.st.bytesSent, le)
continue
}
atomic.AddUint64(sf.sm.bytesSent, le)
atomic.AddUint64(sf.rtpPacketsSent, 1)
}
}
@@ -55,7 +60,9 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
if err != nil {
return err
}
atomic.AddUint64(sf.sm.st.bytesSent, le)
atomic.AddUint64(sf.sm.bytesSent, le)
atomic.AddUint64(sf.rtpPacketsSent, 1)
}
return nil

View File

@@ -1,6 +1,8 @@
package gortsplib
import (
"sync/atomic"
"github.com/bluenviron/gortsplib/v4/pkg/description"
)
@@ -11,9 +13,14 @@ type serverStreamMedia struct {
formats map[uint8]*serverStreamFormat
multicastWriter *serverMulticastWriter
bytesSent *uint64
rtcpPacketsSent *uint64
}
func (sm *serverStreamMedia) initialize() {
sm.bytesSent = new(uint64)
sm.rtcpPacketsSent = new(uint64)
sm.formats = make(map[uint8]*serverStreamFormat)
for _, forma := range sm.media.Formats {
sf := &serverStreamFormat{
@@ -38,13 +45,19 @@ func (sm *serverStreamMedia) close() {
}
func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
le := len(byts)
// send unicast
for r := range sm.st.activeUnicastReaders {
if _, ok := r.setuppedMedias[sm.media]; ok {
err := r.writePacketRTCP(sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
continue
}
atomic.AddUint64(sm.bytesSent, uint64(le))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
}
}
@@ -54,6 +67,9 @@ func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
if err != nil {
return err
}
atomic.AddUint64(sm.bytesSent, uint64(le))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
}
return nil

36
server_stream_stats.go Normal file
View File

@@ -0,0 +1,36 @@
package gortsplib
import (
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
// ServerStreamStatsFormat are stream format statistics.
type ServerStreamStatsFormat struct {
// number of sent RTP packets
RTPPacketsSent uint64
}
// ServerStreamStatsMedia are stream media statistics.
type ServerStreamStatsMedia struct {
// sent bytes
BytesSent uint64
// number of sent RTCP packets
RTCPPacketsSent uint64
// format statistics
Formats map[format.Format]ServerStreamStatsFormat
}
// ServerStreamStats are stream statistics.
type ServerStreamStats struct {
// sent bytes
BytesSent uint64
// number of sent RTP packets
RTPPacketsSent uint64
// number of sent RTCP packets
RTCPPacketsSent uint64
// media statistics
Medias map[*description.Media]ServerStreamStatsMedia
}

9
stats_conn.go Normal file
View File

@@ -0,0 +1,9 @@
package gortsplib
// StatsConn are connection statistics.
type StatsConn struct {
// received bytes
BytesReceived uint64
// sent bytes
BytesSent uint64
}

76
stats_session.go Normal file
View File

@@ -0,0 +1,76 @@
package gortsplib
import (
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
// StatsSessionFormat are session format statistics.
type StatsSessionFormat struct {
// number of RTP packets correctly received and processed
RTPPacketsReceived uint64
// number of sent RTP packets
RTPPacketsSent uint64
// number of lost RTP packets
RTPPacketsLost uint64
// mean jitter of received RTP packets
RTPJitter float64
// local SSRC
LocalSSRC uint32
// remote SSRC
RemoteSSRC uint32
// last sequence number of incoming/outgoing RTP packets
RTPPacketsLastSequenceNumber uint16
// last RTP time of incoming/outgoing RTP packets
RTPPacketsLastRTP uint32
// last NTP time of incoming/outgoing NTP packets
RTPPacketsLastNTP time.Time
}
// StatsSessionMedia are session media statistics.
type StatsSessionMedia struct {
// received bytes
BytesReceived uint64
// sent bytes
BytesSent uint64
// number of RTP packets that could not be processed
RTPPacketsInError uint64
// number of RTCP packets correctly received and processed
RTCPPacketsReceived uint64
// number of sent RTCP packets
RTCPPacketsSent uint64
// number of RTCP packets that could not be processed
RTCPPacketsInError uint64
// format statistics
Formats map[format.Format]StatsSessionFormat
}
// StatsSession are session statistics.
type StatsSession struct {
// received bytes
BytesReceived uint64
// sent bytes
BytesSent uint64
// number of RTP packets correctly received and processed
RTPPacketsReceived uint64
// number of sent RTP packets
RTPPacketsSent uint64
// number of lost RTP packets
RTPPacketsLost uint64
// number of RTP packets that could not be processed
RTPPacketsInError uint64
// mean jitter of received RTP packets
RTPJitter float64
// number of RTCP packets correctly received and processed
RTCPPacketsReceived uint64
// number of sent RTCP packets
RTCPPacketsSent uint64
// number of RTCP packets that could not be processed
RTCPPacketsInError uint64
// media statistics
Medias map[*description.Media]StatsSessionMedia
}