From 1f543a0331000d358ac717517abe2981cfab65e3 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Thu, 4 Sep 2025 19:06:56 +0200 Subject: [PATCH] rename rtcpreceiver into rtpreceiver, rtcpsender into rtpsender (#874) These utilities were originally meant to handle RTCP only, now they are in charge of every QoS mechanism. --- client.go | 18 +- client_format.go | 34 +- client_media.go | 4 +- pkg/rtcpreceiver/rtcpreceiver.go | 463 +---------------- pkg/rtcpsender/rtcpsender.go | 182 +------ pkg/rtpreceiver/receiver.go | 464 ++++++++++++++++++ .../receiver_test.go} | 28 +- pkg/rtpsender/sender.go | 185 +++++++ .../sender_test.go} | 10 +- server_session.go | 20 +- server_session_format.go | 20 +- server_session_media.go | 6 +- server_stream_format.go | 14 +- 13 files changed, 733 insertions(+), 715 deletions(-) create mode 100644 pkg/rtpreceiver/receiver.go rename pkg/{rtcpreceiver/rtcpreceiver_test.go => rtpreceiver/receiver_test.go} (98%) create mode 100644 pkg/rtpsender/sender.go rename pkg/{rtcpsender/rtcpsender_test.go => rtpsender/sender_test.go} (96%) diff --git a/client.go b/client.go index 76f473d8..a9252b33 100644 --- a/client.go +++ b/client.go @@ -31,8 +31,8 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" "github.com/bluenviron/gortsplib/v4/pkg/mikey" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" + "github.com/bluenviron/gortsplib/v4/pkg/rtpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtpsender" "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) @@ -2369,7 +2369,7 @@ func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bo func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { cm := c.setuppedMedias[medi] ct := cm.formats[pkt.PayloadType] - return ct.rtcpReceiver.PacketNTP(pkt.Timestamp) + return ct.rtpReceiver.PacketNTP(pkt.Timestamp) } // Stats returns client statistics. @@ -2389,15 +2389,15 @@ func (c *Client) Stats() *ClientStats { ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) for _, fo := range sm.formats { - recvStats := func() *rtcpreceiver.Stats { - if fo.rtcpReceiver != nil { - return fo.rtcpReceiver.Stats() + recvStats := func() *rtpreceiver.Stats { + if fo.rtpReceiver != nil { + return fo.rtpReceiver.Stats() } return nil }() - sentStats := func() *rtcpsender.Stats { - if fo.rtcpSender != nil { - return fo.rtcpSender.Stats() + sentStats := func() *rtpsender.Stats { + if fo.rtpSender != nil { + return fo.rtpSender.Stats() } return nil }() diff --git a/client_format.go b/client_format.go index 9a2d34ce..befd91a3 100644 --- a/client_format.go +++ b/client_format.go @@ -10,8 +10,8 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" + "github.com/bluenviron/gortsplib/v4/pkg/rtpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtpsender" ) func clientPickLocalSSRC(cf *clientFormat) (uint32, error) { @@ -45,8 +45,8 @@ type clientFormat struct { onPacketRTP OnPacketRTPFunc localSSRC uint32 - rtcpReceiver *rtcpreceiver.RTCPReceiver // play - rtcpSender *rtcpsender.RTCPSender // record or back channel + rtpReceiver *rtpreceiver.Receiver // play + rtpSender *rtpsender.Sender // record or back channel writePacketRTPInQueue func([]byte) error rtpPacketsReceived *uint64 rtpPacketsSent *uint64 @@ -79,7 +79,7 @@ func (cf *clientFormat) start() { } if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel { - cf.rtcpSender = &rtcpsender.RTCPSender{ + cf.rtpSender = &rtpsender.Sender{ ClockRate: cf.format.ClockRate(), Period: cf.cm.c.senderReportPeriod, TimeNow: cf.cm.c.timeNow, @@ -89,9 +89,9 @@ func (cf *clientFormat) start() { } }, } - cf.rtcpSender.Initialize() + cf.rtpSender.Initialize() } else { - cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ + cf.rtpReceiver = &rtpreceiver.Receiver{ ClockRate: cf.format.ClockRate(), LocalSSRC: &cf.localSSRC, UnrealiableTransport: (cf.cm.udpRTPListener != nil), @@ -103,7 +103,7 @@ func (cf *clientFormat) start() { } }, } - err := cf.rtcpReceiver.Initialize() + err := cf.rtpReceiver.Initialize() if err != nil { panic(err) } @@ -111,19 +111,19 @@ func (cf *clientFormat) start() { } func (cf *clientFormat) stop() { - if cf.rtcpReceiver != nil { - cf.rtcpReceiver.Close() - cf.rtcpReceiver = nil + if cf.rtpReceiver != nil { + cf.rtpReceiver.Close() + cf.rtpReceiver = nil } - if cf.rtcpSender != nil { - cf.rtcpSender.Close() + if cf.rtpSender != nil { + cf.rtpSender.Close() } } func (cf *clientFormat) remoteSSRC() (uint32, bool) { - if cf.rtcpReceiver != nil { - stats := cf.rtcpReceiver.Stats() + if cf.rtpReceiver != nil { + stats := cf.rtpReceiver.Stats() if stats != nil { return stats.RemoteSSRC, true } @@ -134,7 +134,7 @@ func (cf *clientFormat) remoteSSRC() (uint32, bool) { func (cf *clientFormat) readPacketRTP(pkt *rtp.Packet) { now := cf.cm.c.timeNow() - pkts, lost, err := cf.rtcpReceiver.ProcessPacket2(pkt, now, cf.format.PTSEqualsDTS(pkt)) + pkts, lost, err := cf.rtpReceiver.ProcessPacket2(pkt, now, cf.format.PTSEqualsDTS(pkt)) if err != nil { cf.cm.onPacketRTPDecodeError(err) return @@ -155,7 +155,7 @@ func (cf *clientFormat) readPacketRTP(pkt *rtp.Packet) { func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error { pkt.SSRC = cf.localSSRC - cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) + cf.rtpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) maxPlainPacketSize := cf.cm.c.MaxPacketSize if cf.cm.srtpOutCtx != nil { diff --git a/client_media.go b/client_media.go index 3c273f62..033146d1 100644 --- a/client_media.go +++ b/client_media.go @@ -305,7 +305,7 @@ func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := cm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { - format.rtcpReceiver.ProcessSenderReport(sr, now) + format.rtpReceiver.ProcessSenderReport(sr, now) } } @@ -389,7 +389,7 @@ func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := cm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { - format.rtcpReceiver.ProcessSenderReport(sr, now) + format.rtpReceiver.ProcessSenderReport(sr, now) } } diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 9c9ec15f..3e04a3f8 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -1,464 +1,9 @@ // Package rtcpreceiver contains a utility to generate RTCP receiver reports. package rtcpreceiver -import ( - "crypto/rand" - "fmt" - "sync" - "time" +import "github.com/bluenviron/gortsplib/v4/pkg/rtpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/ntp" - "github.com/pion/rtcp" - "github.com/pion/rtp" -) - -func randUint32() (uint32, error) { - var b [4]byte - _, err := rand.Read(b[:]) - if err != nil { - return 0, err - } - return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil -} - -// RTCPReceiver is a utility to receive RTP packets. It is in charge of: -// - removing packets with wrong SSRC -// - removing duplicate packets (when transport is unreliable) -// - reordering packets (when transport is unrealiable) -// - counting lost packets -// - generating RTCP receiver reports -type RTCPReceiver struct { - // Track clock rate. - ClockRate int - - // Local SSRC - LocalSSRC *uint32 - - // Whether the transport is unrealiable. - // This enables removing duplicate packets and reordering packets. - UnrealiableTransport bool - - // size of the buffer for reordering packets. - // It defaults to 64. - BufferSize int - - // Period of RTCP receiver reports. - Period time.Duration - - // time.Now function. - TimeNow func() time.Time - - // Called when a RTCP receiver report is ready to be written. - WritePacketRTCP func(rtcp.Packet) - - mutex sync.RWMutex - - // data from RTP packets - firstRTPPacketReceived bool - timeInitialized bool - buffer []*rtp.Packet - absPos uint16 - negativeCount int - sequenceNumberCycles uint16 - lastValidSeqNum uint16 - remoteSSRC uint32 - lastTimeRTP uint32 - lastTimeSystem time.Time - totalLost uint32 - totalLostSinceReport uint32 - totalSinceReport uint32 - jitter float64 - - // data from RTCP packets - firstSenderReportReceived bool - lastSenderReportTimeNTP uint64 - lastSenderReportTimeRTP uint32 - lastSenderReportTimeSystem time.Time - - terminate chan struct{} - done chan struct{} -} - -// New allocates a RTCPReceiver. +// RTCPReceiver is a utility to receive RTP packets. // -// Deprecated: replaced by Initialize(). -func New( - clockRate int, - receiverSSRC *uint32, - period time.Duration, - timeNow func() time.Time, - writePacketRTCP func(rtcp.Packet), -) (*RTCPReceiver, error) { - rr := &RTCPReceiver{ - ClockRate: clockRate, - LocalSSRC: receiverSSRC, - Period: period, - TimeNow: timeNow, - WritePacketRTCP: writePacketRTCP, - } - err := rr.Initialize() - if err != nil { - return nil, err - } - - return rr, nil -} - -// Initialize initializes RTCPReceiver. -func (rr *RTCPReceiver) Initialize() error { - // Deprecated: passing a nil LocalSSRC will be deprecated from next version. - // Please use a fixed LocalSSRC. - if rr.LocalSSRC == nil { - v, err := randUint32() - if err != nil { - return err - } - rr.LocalSSRC = &v - } - - if rr.BufferSize == 0 { - rr.BufferSize = 64 - } - - if rr.Period == 0 { - return fmt.Errorf("invalid Period") - } - - if rr.TimeNow == nil { - rr.TimeNow = time.Now - } - - if rr.UnrealiableTransport { - rr.buffer = make([]*rtp.Packet, rr.BufferSize) - } - - rr.terminate = make(chan struct{}) - rr.done = make(chan struct{}) - - go rr.run() - - return nil -} - -// Close closes the RTCPReceiver. -func (rr *RTCPReceiver) Close() { - close(rr.terminate) - <-rr.done -} - -func (rr *RTCPReceiver) run() { - defer close(rr.done) - - t := time.NewTicker(rr.Period) - defer t.Stop() - - for { - select { - case <-t.C: - report := rr.report() - if report != nil { - rr.WritePacketRTCP(report) - } - - case <-rr.terminate: - return - } - } -} - -func (rr *RTCPReceiver) report() rtcp.Packet { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - if !rr.firstRTPPacketReceived || rr.ClockRate == 0 { - return nil - } - - system := rr.TimeNow() - - report := &rtcp.ReceiverReport{ - SSRC: *rr.LocalSSRC, - Reports: []rtcp.ReceptionReport{ - { - SSRC: rr.remoteSSRC, - LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastValidSeqNum), - // equivalent to taking the integer part after multiplying the - // loss fraction by 256 - FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), - TotalLost: rr.totalLost, - Jitter: uint32(rr.jitter), - }, - }, - } - - if rr.firstSenderReportReceived { - // middle 32 bits out of 64 in the NTP of last sender report - report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) - - // delay, expressed in units of 1/65536 seconds, between - // receiving the last SR packet from source SSRC_n and sending this - // reception report block - report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) - } - - rr.totalLostSinceReport = 0 - rr.totalSinceReport = 0 - - return report -} - -// ProcessPacket extracts the needed data from RTP packets. -// -// Deprecated: replaced by ProcessPacket2. -func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { - _, _, err := rr.ProcessPacket2(pkt, system, ptsEqualsDTS) - return err -} - -// ProcessPacket2 processes an incoming RTP packet. -// It returns reordered packets and number of lost packets. -func (rr *RTCPReceiver) ProcessPacket2( - pkt *rtp.Packet, - system time.Time, - ptsEqualsDTS bool, -) ([]*rtp.Packet, uint64, error) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - // first packet - if !rr.firstRTPPacketReceived { - rr.firstRTPPacketReceived = true - rr.totalSinceReport = 1 - rr.lastValidSeqNum = pkt.SequenceNumber - rr.remoteSSRC = pkt.SSRC - - if ptsEqualsDTS { - rr.timeInitialized = true - rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeSystem = system - } - - return []*rtp.Packet{pkt}, 0, nil - } - - if pkt.SSRC != rr.remoteSSRC { - return nil, 0, fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) - } - - var pkts []*rtp.Packet - var lost uint64 - - if rr.UnrealiableTransport { - pkts, lost = rr.reorder(pkt) - } else { - pkts = []*rtp.Packet{pkt} - lost = uint64(pkt.SequenceNumber - rr.lastValidSeqNum - 1) - } - - rr.totalLost += uint32(lost) - rr.totalLostSinceReport += uint32(lost) - - // allow up to 24 bits - if rr.totalLost > 0xFFFFFF { - rr.totalLost = 0xFFFFFF - } - if rr.totalLostSinceReport > 0xFFFFFF { - rr.totalLostSinceReport = 0xFFFFFF - } - - for _, pkt := range pkts { - diff := int32(pkt.SequenceNumber) - int32(rr.lastValidSeqNum) - - // overflow - if diff < -0x0FFF { - rr.sequenceNumberCycles++ - } - - rr.totalSinceReport += uint32(uint16(diff)) - rr.lastValidSeqNum = pkt.SequenceNumber - - if ptsEqualsDTS { - if rr.timeInitialized && rr.ClockRate != 0 { - // update jitter - // https://tools.ietf.org/html/rfc3550#page-39 - D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) - - (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) - if D < 0 { - D = -D - } - rr.jitter += (D - rr.jitter) / 16 - } - - rr.timeInitialized = true - rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeSystem = system - } - } - - return pkts, lost, nil -} - -func (rr *RTCPReceiver) reorder(pkt *rtp.Packet) ([]*rtp.Packet, uint64) { - relPos := int16(pkt.SequenceNumber - rr.lastValidSeqNum - 1) // rr.expectedSeqNum) - - // packet is a duplicate or has been sent - // before the first packet processed by Reorderer. - // discard. - if relPos < 0 { - rr.negativeCount++ - - // stream has been resetted, therefore reset reorderer too - if rr.negativeCount > len(rr.buffer) { - rr.negativeCount = 0 - - // clear buffer - for i := uint16(0); i < uint16(len(rr.buffer)); i++ { - p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) - rr.buffer[p] = nil - } - - // reset position. - return []*rtp.Packet{pkt}, 0 - } - - return nil, 0 - } - - rr.negativeCount = 0 - - // there's a missing packet and buffer is full. - // return entire buffer and clear it. - if relPos >= int16(len(rr.buffer)) { - n := 1 - for i := uint16(0); i < uint16(len(rr.buffer)); i++ { - p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) - if rr.buffer[p] != nil { - n++ - } - } - - ret := make([]*rtp.Packet, n) - pos := 0 - - for i := uint16(0); i < uint16(len(rr.buffer)); i++ { - p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) - if rr.buffer[p] != nil { - ret[pos], rr.buffer[p] = rr.buffer[p], nil - pos++ - } - } - - ret[pos] = pkt - - return ret, uint64(int(relPos) - n + 1) - } - - // there's a missing packet - if relPos != 0 { - p := (rr.absPos + uint16(relPos)) & (uint16(len(rr.buffer)) - 1) - - // current packet is a duplicate. discard - if rr.buffer[p] != nil { - return nil, 0 - } - - // put current packet in buffer - rr.buffer[p] = pkt - return nil, 0 - } - - // all packets have been received correctly. - // return them. - - n := uint16(1) - for { - p := (rr.absPos + n) & (uint16(len(rr.buffer)) - 1) - if rr.buffer[p] == nil { - break - } - n++ - } - - ret := make([]*rtp.Packet, n) - ret[0] = pkt - rr.absPos++ - rr.absPos &= (uint16(len(rr.buffer)) - 1) - - for i := uint16(1); i < n; i++ { - ret[i], rr.buffer[rr.absPos] = rr.buffer[rr.absPos], nil - rr.absPos++ - rr.absPos &= (uint16(len(rr.buffer)) - 1) - } - - return ret, 0 -} - -// ProcessSenderReport processes an incoming RTCP sender report. -func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - rr.firstSenderReportReceived = true - rr.lastSenderReportTimeNTP = sr.NTPTime - rr.lastSenderReportTimeRTP = sr.RTPTime - rr.lastSenderReportTimeSystem = system -} - -func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { - if !rr.firstSenderReportReceived || rr.ClockRate == 0 { - return time.Time{}, false - } - - timeDiff := int32(ts - rr.lastSenderReportTimeRTP) - timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate) - - return ntp.Decode(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true -} - -// PacketNTP returns the NTP (absolute timestamp) of the packet. -func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { - rr.mutex.RLock() - defer rr.mutex.RUnlock() - - return rr.packetNTPUnsafe(ts) -} - -// SenderSSRC returns the SSRC of incoming RTP packets. -// -// Deprecated: replaced by Stats(). -func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { - stats := rr.Stats() - if stats == nil { - return 0, false - } - return stats.RemoteSSRC, true -} - -// Stats are statistics. -type Stats struct { - RemoteSSRC uint32 - LastSequenceNumber uint16 - LastRTP uint32 - LastNTP time.Time - Jitter float64 -} - -// Stats returns statistics. -func (rr *RTCPReceiver) Stats() *Stats { - rr.mutex.RLock() - defer rr.mutex.RUnlock() - - if !rr.firstRTPPacketReceived { - return nil - } - - ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP) - - return &Stats{ - RemoteSSRC: rr.remoteSSRC, - LastSequenceNumber: rr.lastValidSeqNum, - LastRTP: rr.lastTimeRTP, - LastNTP: ntp, - Jitter: rr.jitter, - } -} +// Deprecated: replaced by rtpreceiver.Receiver +type RTCPReceiver = rtpreceiver.Receiver diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 4f785f3d..9f7d8cb5 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -1,185 +1,9 @@ // Package rtcpsender contains a utility to generate RTCP sender reports. package rtcpsender -import ( - "sync" - "time" - - "github.com/bluenviron/gortsplib/v4/pkg/ntp" - "github.com/pion/rtcp" - "github.com/pion/rtp" -) +import "github.com/bluenviron/gortsplib/v4/pkg/rtpsender" // RTCPSender is a utility to send RTP packets. -// It is in charge of generating RTCP sender reports. -type RTCPSender struct { - ClockRate int - Period time.Duration - TimeNow func() time.Time - WritePacketRTCP func(rtcp.Packet) - - mutex sync.RWMutex - - // data from RTP packets - firstRTPPacketSent bool - lastTimeRTP uint32 - lastTimeNTP time.Time - lastTimeSystem time.Time - localSSRC uint32 - lastSequenceNumber uint16 - packetCount uint32 - octetCount uint32 - - terminate chan struct{} - done chan struct{} -} - -// New allocates a RTCPSender. // -// Deprecated: replaced by Initialize(). -func New( - clockRate int, - period time.Duration, - timeNow func() time.Time, - writePacketRTCP func(rtcp.Packet), -) *RTCPSender { - rs := &RTCPSender{ - ClockRate: clockRate, - Period: period, - TimeNow: timeNow, - WritePacketRTCP: writePacketRTCP, - } - rs.Initialize() - - return rs -} - -// Initialize initializes a RTCPSender. -func (rs *RTCPSender) Initialize() { - if rs.TimeNow == nil { - rs.TimeNow = time.Now - } - - rs.terminate = make(chan struct{}) - rs.done = make(chan struct{}) - - go rs.run() -} - -// Close closes the RTCPSender. -func (rs *RTCPSender) Close() { - close(rs.terminate) - <-rs.done -} - -func (rs *RTCPSender) run() { - defer close(rs.done) - - t := time.NewTicker(rs.Period) - defer t.Stop() - - for { - select { - case <-t.C: - report := rs.report() - if report != nil { - rs.WritePacketRTCP(report) - } - - case <-rs.terminate: - return - } - } -} - -func (rs *RTCPSender) report() rtcp.Packet { - rs.mutex.RLock() - defer rs.mutex.RUnlock() - - if !rs.firstRTPPacketSent || rs.ClockRate == 0 { - return nil - } - - systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem) - ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) - rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate)) - - return &rtcp.SenderReport{ - SSRC: rs.localSSRC, - NTPTime: ntp.Encode(ntpTime), - RTPTime: rtpTime, - PacketCount: rs.packetCount, - OctetCount: rs.octetCount, - } -} - -// ProcessPacket extracts data from RTP packets. -func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { - rs.mutex.Lock() - defer rs.mutex.Unlock() - - if ptsEqualsDTS { - rs.firstRTPPacketSent = true - rs.lastTimeRTP = pkt.Timestamp - rs.lastTimeNTP = ntp - rs.lastTimeSystem = rs.TimeNow() - rs.localSSRC = pkt.SSRC - } - - rs.lastSequenceNumber = pkt.SequenceNumber - - rs.packetCount++ - rs.octetCount += uint32(len(pkt.Payload)) -} - -// SenderSSRC returns the SSRC of outgoing RTP packets. -// -// Deprecated: replaced by Stats(). -func (rs *RTCPSender) SenderSSRC() (uint32, bool) { - stats := rs.Stats() - if stats == nil { - return 0, false - } - - return stats.LocalSSRC, true -} - -// LastPacketData returns metadata of the last RTP packet. -// -// Deprecated: replaced by Stats(). -func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { - stats := rs.Stats() - if stats == nil { - return 0, 0, time.Time{}, false - } - - return stats.LastSequenceNumber, stats.LastRTP, stats.LastNTP, true -} - -// Stats are statistics. -type Stats struct { - // Deprecated: this is not a statistics anymore but a fixed parameter. - // it will be removed in next version. - LocalSSRC uint32 - - LastSequenceNumber uint16 - LastRTP uint32 - LastNTP time.Time -} - -// Stats returns statistics. -func (rs *RTCPSender) Stats() *Stats { - rs.mutex.RLock() - defer rs.mutex.RUnlock() - - if !rs.firstRTPPacketSent { - return nil - } - - return &Stats{ - LocalSSRC: rs.localSSRC, - LastSequenceNumber: rs.lastSequenceNumber, - LastRTP: rs.lastTimeRTP, - LastNTP: rs.lastTimeNTP, - } -} +// Deprecated: replaced by rtpsender.Sender +type RTCPSender = rtpsender.Sender diff --git a/pkg/rtpreceiver/receiver.go b/pkg/rtpreceiver/receiver.go new file mode 100644 index 00000000..c40ca47b --- /dev/null +++ b/pkg/rtpreceiver/receiver.go @@ -0,0 +1,464 @@ +// Package rtpreceiver contains a utility to receive RTP packets. +package rtpreceiver + +import ( + "crypto/rand" + "fmt" + "sync" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/ntp" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +func randUint32() (uint32, error) { + var b [4]byte + _, err := rand.Read(b[:]) + if err != nil { + return 0, err + } + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil +} + +// Receiver is a utility to receive RTP packets. It is in charge of: +// - removing packets with wrong SSRC +// - removing duplicate packets (when transport is unreliable) +// - reordering packets (when transport is unrealiable) +// - counting lost packets +// - generating RTCP receiver reports +type Receiver struct { + // Track clock rate. + ClockRate int + + // Local SSRC + LocalSSRC *uint32 + + // Whether the transport is unrealiable. + // This enables removing duplicate packets and reordering packets. + UnrealiableTransport bool + + // size of the buffer for reordering packets. + // It defaults to 64. + BufferSize int + + // Period of RTCP receiver reports. + Period time.Duration + + // time.Now function. + TimeNow func() time.Time + + // Called when a RTCP receiver report is ready to be written. + WritePacketRTCP func(rtcp.Packet) + + mutex sync.RWMutex + + // data from RTP packets + firstRTPPacketReceived bool + timeInitialized bool + buffer []*rtp.Packet + absPos uint16 + negativeCount int + sequenceNumberCycles uint16 + lastValidSeqNum uint16 + remoteSSRC uint32 + lastTimeRTP uint32 + lastTimeSystem time.Time + totalLost uint32 + totalLostSinceReport uint32 + totalSinceReport uint32 + jitter float64 + + // data from RTCP packets + firstSenderReportReceived bool + lastSenderReportTimeNTP uint64 + lastSenderReportTimeRTP uint32 + lastSenderReportTimeSystem time.Time + + terminate chan struct{} + done chan struct{} +} + +// New allocates a Receiver. +// +// Deprecated: replaced by Initialize(). +func New( + clockRate int, + receiverSSRC *uint32, + period time.Duration, + timeNow func() time.Time, + writePacketRTCP func(rtcp.Packet), +) (*Receiver, error) { + rr := &Receiver{ + ClockRate: clockRate, + LocalSSRC: receiverSSRC, + Period: period, + TimeNow: timeNow, + WritePacketRTCP: writePacketRTCP, + } + err := rr.Initialize() + if err != nil { + return nil, err + } + + return rr, nil +} + +// Initialize initializes Receiver. +func (rr *Receiver) Initialize() error { + // Deprecated: passing a nil LocalSSRC will be deprecated from next version. + // Please use a fixed LocalSSRC. + if rr.LocalSSRC == nil { + v, err := randUint32() + if err != nil { + return err + } + rr.LocalSSRC = &v + } + + if rr.BufferSize == 0 { + rr.BufferSize = 64 + } + + if rr.Period == 0 { + return fmt.Errorf("invalid Period") + } + + if rr.TimeNow == nil { + rr.TimeNow = time.Now + } + + if rr.UnrealiableTransport { + rr.buffer = make([]*rtp.Packet, rr.BufferSize) + } + + rr.terminate = make(chan struct{}) + rr.done = make(chan struct{}) + + go rr.run() + + return nil +} + +// Close closes the Receiver. +func (rr *Receiver) Close() { + close(rr.terminate) + <-rr.done +} + +func (rr *Receiver) run() { + defer close(rr.done) + + t := time.NewTicker(rr.Period) + defer t.Stop() + + for { + select { + case <-t.C: + report := rr.report() + if report != nil { + rr.WritePacketRTCP(report) + } + + case <-rr.terminate: + return + } + } +} + +func (rr *Receiver) report() rtcp.Packet { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + if !rr.firstRTPPacketReceived || rr.ClockRate == 0 { + return nil + } + + system := rr.TimeNow() + + report := &rtcp.ReceiverReport{ + SSRC: *rr.LocalSSRC, + Reports: []rtcp.ReceptionReport{ + { + SSRC: rr.remoteSSRC, + LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastValidSeqNum), + // equivalent to taking the integer part after multiplying the + // loss fraction by 256 + FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), + TotalLost: rr.totalLost, + Jitter: uint32(rr.jitter), + }, + }, + } + + if rr.firstSenderReportReceived { + // middle 32 bits out of 64 in the NTP of last sender report + report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) + + // delay, expressed in units of 1/65536 seconds, between + // receiving the last SR packet from source SSRC_n and sending this + // reception report block + report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) + } + + rr.totalLostSinceReport = 0 + rr.totalSinceReport = 0 + + return report +} + +// ProcessPacket extracts the needed data from RTP packets. +// +// Deprecated: replaced by ProcessPacket2. +func (rr *Receiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { + _, _, err := rr.ProcessPacket2(pkt, system, ptsEqualsDTS) + return err +} + +// ProcessPacket2 processes an incoming RTP packet. +// It returns reordered packets and number of lost packets. +func (rr *Receiver) ProcessPacket2( + pkt *rtp.Packet, + system time.Time, + ptsEqualsDTS bool, +) ([]*rtp.Packet, uint64, error) { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + // first packet + if !rr.firstRTPPacketReceived { + rr.firstRTPPacketReceived = true + rr.totalSinceReport = 1 + rr.lastValidSeqNum = pkt.SequenceNumber + rr.remoteSSRC = pkt.SSRC + + if ptsEqualsDTS { + rr.timeInitialized = true + rr.lastTimeRTP = pkt.Timestamp + rr.lastTimeSystem = system + } + + return []*rtp.Packet{pkt}, 0, nil + } + + if pkt.SSRC != rr.remoteSSRC { + return nil, 0, fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) + } + + var pkts []*rtp.Packet + var lost uint64 + + if rr.UnrealiableTransport { + pkts, lost = rr.reorder(pkt) + } else { + pkts = []*rtp.Packet{pkt} + lost = uint64(pkt.SequenceNumber - rr.lastValidSeqNum - 1) + } + + rr.totalLost += uint32(lost) + rr.totalLostSinceReport += uint32(lost) + + // allow up to 24 bits + if rr.totalLost > 0xFFFFFF { + rr.totalLost = 0xFFFFFF + } + if rr.totalLostSinceReport > 0xFFFFFF { + rr.totalLostSinceReport = 0xFFFFFF + } + + for _, pkt := range pkts { + diff := int32(pkt.SequenceNumber) - int32(rr.lastValidSeqNum) + + // overflow + if diff < -0x0FFF { + rr.sequenceNumberCycles++ + } + + rr.totalSinceReport += uint32(uint16(diff)) + rr.lastValidSeqNum = pkt.SequenceNumber + + if ptsEqualsDTS { + if rr.timeInitialized && rr.ClockRate != 0 { + // update jitter + // https://tools.ietf.org/html/rfc3550#page-39 + D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) - + (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) + if D < 0 { + D = -D + } + rr.jitter += (D - rr.jitter) / 16 + } + + rr.timeInitialized = true + rr.lastTimeRTP = pkt.Timestamp + rr.lastTimeSystem = system + } + } + + return pkts, lost, nil +} + +func (rr *Receiver) reorder(pkt *rtp.Packet) ([]*rtp.Packet, uint64) { + relPos := int16(pkt.SequenceNumber - rr.lastValidSeqNum - 1) // rr.expectedSeqNum) + + // packet is a duplicate or has been sent + // before the first packet processed by Reorderer. + // discard. + if relPos < 0 { + rr.negativeCount++ + + // stream has been resetted, therefore reset reorderer too + if rr.negativeCount > len(rr.buffer) { + rr.negativeCount = 0 + + // clear buffer + for i := uint16(0); i < uint16(len(rr.buffer)); i++ { + p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) + rr.buffer[p] = nil + } + + // reset position. + return []*rtp.Packet{pkt}, 0 + } + + return nil, 0 + } + + rr.negativeCount = 0 + + // there's a missing packet and buffer is full. + // return entire buffer and clear it. + if relPos >= int16(len(rr.buffer)) { + n := 1 + for i := uint16(0); i < uint16(len(rr.buffer)); i++ { + p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) + if rr.buffer[p] != nil { + n++ + } + } + + ret := make([]*rtp.Packet, n) + pos := 0 + + for i := uint16(0); i < uint16(len(rr.buffer)); i++ { + p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) + if rr.buffer[p] != nil { + ret[pos], rr.buffer[p] = rr.buffer[p], nil + pos++ + } + } + + ret[pos] = pkt + + return ret, uint64(int(relPos) - n + 1) + } + + // there's a missing packet + if relPos != 0 { + p := (rr.absPos + uint16(relPos)) & (uint16(len(rr.buffer)) - 1) + + // current packet is a duplicate. discard + if rr.buffer[p] != nil { + return nil, 0 + } + + // put current packet in buffer + rr.buffer[p] = pkt + return nil, 0 + } + + // all packets have been received correctly. + // return them. + + n := uint16(1) + for { + p := (rr.absPos + n) & (uint16(len(rr.buffer)) - 1) + if rr.buffer[p] == nil { + break + } + n++ + } + + ret := make([]*rtp.Packet, n) + ret[0] = pkt + rr.absPos++ + rr.absPos &= (uint16(len(rr.buffer)) - 1) + + for i := uint16(1); i < n; i++ { + ret[i], rr.buffer[rr.absPos] = rr.buffer[rr.absPos], nil + rr.absPos++ + rr.absPos &= (uint16(len(rr.buffer)) - 1) + } + + return ret, 0 +} + +// ProcessSenderReport processes an incoming RTCP sender report. +func (rr *Receiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + rr.firstSenderReportReceived = true + rr.lastSenderReportTimeNTP = sr.NTPTime + rr.lastSenderReportTimeRTP = sr.RTPTime + rr.lastSenderReportTimeSystem = system +} + +func (rr *Receiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { + if !rr.firstSenderReportReceived || rr.ClockRate == 0 { + return time.Time{}, false + } + + timeDiff := int32(ts - rr.lastSenderReportTimeRTP) + timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate) + + return ntp.Decode(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true +} + +// PacketNTP returns the NTP (absolute timestamp) of the packet. +func (rr *Receiver) PacketNTP(ts uint32) (time.Time, bool) { + rr.mutex.RLock() + defer rr.mutex.RUnlock() + + return rr.packetNTPUnsafe(ts) +} + +// SenderSSRC returns the SSRC of incoming RTP packets. +// +// Deprecated: replaced by Stats(). +func (rr *Receiver) SenderSSRC() (uint32, bool) { + stats := rr.Stats() + if stats == nil { + return 0, false + } + return stats.RemoteSSRC, true +} + +// Stats are statistics. +type Stats struct { + RemoteSSRC uint32 + LastSequenceNumber uint16 + LastRTP uint32 + LastNTP time.Time + Jitter float64 +} + +// Stats returns statistics. +func (rr *Receiver) Stats() *Stats { + rr.mutex.RLock() + defer rr.mutex.RUnlock() + + if !rr.firstRTPPacketReceived { + return nil + } + + ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP) + + return &Stats{ + RemoteSSRC: rr.remoteSSRC, + LastSequenceNumber: rr.lastValidSeqNum, + LastRTP: rr.lastTimeRTP, + LastNTP: ntp, + Jitter: rr.jitter, + } +} diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtpreceiver/receiver_test.go similarity index 98% rename from pkg/rtcpreceiver/rtcpreceiver_test.go rename to pkg/rtpreceiver/receiver_test.go index ea65f97a..563d7348 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtpreceiver/receiver_test.go @@ -1,4 +1,4 @@ -package rtcpreceiver +package rtpreceiver import ( "testing" @@ -14,7 +14,7 @@ func uint32Ptr(v uint32) *uint32 { } func TestErrorInvalidPeriod(t *testing.T) { - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), } @@ -23,7 +23,7 @@ func TestErrorInvalidPeriod(t *testing.T) { } func TestErrorDifferentSSRC(t *testing.T) { - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, @@ -64,7 +64,7 @@ func TestErrorDifferentSSRC(t *testing.T) { } func TestStatsBeforeData(t *testing.T) { - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, @@ -85,7 +85,7 @@ func TestStandard(t *testing.T) { t.Run(ca, func(t *testing.T) { pktGenerated := make(chan rtcp.Packet) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), UnrealiableTransport: ca == "unrealiable", @@ -202,7 +202,7 @@ func TestStandard(t *testing.T) { func TestZeroClockRate(t *testing.T) { pktGenerated := make(chan rtcp.Packet) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 0, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, @@ -306,7 +306,7 @@ func TestZeroClockRate(t *testing.T) { func TestSequenceNumberOverflow(t *testing.T) { rtcpGenerated := make(chan struct{}) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 250 * time.Millisecond, @@ -380,7 +380,7 @@ func TestSequenceNumberOverflow(t *testing.T) { func TestJitter(t *testing.T) { rtcpGenerated := make(chan struct{}) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, @@ -468,7 +468,7 @@ func TestJitter(t *testing.T) { func TestReliablePacketsLost(t *testing.T) { done := make(chan struct{}) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, @@ -545,7 +545,7 @@ func TestReliablePacketsLost(t *testing.T) { func TestReliableOverflowAndPacketsLost(t *testing.T) { done := make(chan struct{}) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, @@ -769,7 +769,7 @@ func TestUnrealiableReorder(t *testing.T) { }, } - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), UnrealiableTransport: true, @@ -794,7 +794,7 @@ func TestUnrealiableReorder(t *testing.T) { func TestUnrealiableBufferFull(t *testing.T) { rtcpReceived := make(chan struct{}) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), UnrealiableTransport: true, @@ -878,7 +878,7 @@ func TestUnrealiableBufferFull(t *testing.T) { func TestUnrealiableReset(t *testing.T) { rtcpGenerated := make(chan struct{}) - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), UnrealiableTransport: true, @@ -940,7 +940,7 @@ func TestUnrealiableReset(t *testing.T) { func TestUnrealiableCustomBufferSize(t *testing.T) { customSize := 128 - rr := &RTCPReceiver{ + rr := &Receiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), UnrealiableTransport: true, diff --git a/pkg/rtpsender/sender.go b/pkg/rtpsender/sender.go new file mode 100644 index 00000000..67f1409a --- /dev/null +++ b/pkg/rtpsender/sender.go @@ -0,0 +1,185 @@ +// Package rtpsender contains a utility to send RTP packets. +package rtpsender + +import ( + "sync" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/ntp" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// Sender is a utility to send RTP packets. +// It is in charge of generating RTCP sender reports. +type Sender struct { + ClockRate int + Period time.Duration + TimeNow func() time.Time + WritePacketRTCP func(rtcp.Packet) + + mutex sync.RWMutex + + // data from RTP packets + firstRTPPacketSent bool + lastTimeRTP uint32 + lastTimeNTP time.Time + lastTimeSystem time.Time + localSSRC uint32 + lastSequenceNumber uint16 + packetCount uint32 + octetCount uint32 + + terminate chan struct{} + done chan struct{} +} + +// New allocates a Sender. +// +// Deprecated: replaced by Initialize(). +func New( + clockRate int, + period time.Duration, + timeNow func() time.Time, + writePacketRTCP func(rtcp.Packet), +) *Sender { + rs := &Sender{ + ClockRate: clockRate, + Period: period, + TimeNow: timeNow, + WritePacketRTCP: writePacketRTCP, + } + rs.Initialize() + + return rs +} + +// Initialize initializes a Sender. +func (rs *Sender) Initialize() { + if rs.TimeNow == nil { + rs.TimeNow = time.Now + } + + rs.terminate = make(chan struct{}) + rs.done = make(chan struct{}) + + go rs.run() +} + +// Close closes the Sender. +func (rs *Sender) Close() { + close(rs.terminate) + <-rs.done +} + +func (rs *Sender) run() { + defer close(rs.done) + + t := time.NewTicker(rs.Period) + defer t.Stop() + + for { + select { + case <-t.C: + report := rs.report() + if report != nil { + rs.WritePacketRTCP(report) + } + + case <-rs.terminate: + return + } + } +} + +func (rs *Sender) report() rtcp.Packet { + rs.mutex.RLock() + defer rs.mutex.RUnlock() + + if !rs.firstRTPPacketSent || rs.ClockRate == 0 { + return nil + } + + systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem) + ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) + rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate)) + + return &rtcp.SenderReport{ + SSRC: rs.localSSRC, + NTPTime: ntp.Encode(ntpTime), + RTPTime: rtpTime, + PacketCount: rs.packetCount, + OctetCount: rs.octetCount, + } +} + +// ProcessPacket extracts data from RTP packets. +func (rs *Sender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if ptsEqualsDTS { + rs.firstRTPPacketSent = true + rs.lastTimeRTP = pkt.Timestamp + rs.lastTimeNTP = ntp + rs.lastTimeSystem = rs.TimeNow() + rs.localSSRC = pkt.SSRC + } + + rs.lastSequenceNumber = pkt.SequenceNumber + + rs.packetCount++ + rs.octetCount += uint32(len(pkt.Payload)) +} + +// SenderSSRC returns the SSRC of outgoing RTP packets. +// +// Deprecated: replaced by Stats(). +func (rs *Sender) SenderSSRC() (uint32, bool) { + stats := rs.Stats() + if stats == nil { + return 0, false + } + + return stats.LocalSSRC, true +} + +// LastPacketData returns metadata of the last RTP packet. +// +// Deprecated: replaced by Stats(). +func (rs *Sender) LastPacketData() (uint16, uint32, time.Time, bool) { + stats := rs.Stats() + if stats == nil { + return 0, 0, time.Time{}, false + } + + return stats.LastSequenceNumber, stats.LastRTP, stats.LastNTP, true +} + +// Stats are statistics. +type Stats struct { + // Deprecated: this is not a statistics anymore but a fixed parameter. + // it will be removed in next version. + LocalSSRC uint32 + + LastSequenceNumber uint16 + LastRTP uint32 + LastNTP time.Time +} + +// Stats returns statistics. +func (rs *Sender) Stats() *Stats { + rs.mutex.RLock() + defer rs.mutex.RUnlock() + + if !rs.firstRTPPacketSent { + return nil + } + + return &Stats{ + LocalSSRC: rs.localSSRC, + LastSequenceNumber: rs.lastSequenceNumber, + LastRTP: rs.lastTimeRTP, + LastNTP: rs.lastTimeNTP, + } +} diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtpsender/sender_test.go similarity index 96% rename from pkg/rtcpsender/rtcpsender_test.go rename to pkg/rtpsender/sender_test.go index 6583c294..b9c3beb3 100644 --- a/pkg/rtcpsender/rtcpsender_test.go +++ b/pkg/rtpsender/sender_test.go @@ -1,4 +1,4 @@ -package rtcpsender +package rtpsender import ( "sync" @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestRTCPSender(t *testing.T) { +func TestSender(t *testing.T) { var curTime time.Time var mutex sync.Mutex @@ -22,7 +22,7 @@ func TestRTCPSender(t *testing.T) { pktGenerated := make(chan rtcp.Packet) - rs := &RTCPSender{ + rs := &Sender{ ClockRate: 90000, Period: 100 * time.Millisecond, TimeNow: func() time.Time { @@ -108,7 +108,7 @@ func TestRTCPSender(t *testing.T) { }, stats) } -func TestRTCPSenderZeroClockRate(t *testing.T) { +func TestSenderZeroClockRate(t *testing.T) { var curTime time.Time var mutex sync.Mutex @@ -120,7 +120,7 @@ func TestRTCPSenderZeroClockRate(t *testing.T) { pktGenerated := make(chan rtcp.Packet) - rs := &RTCPSender{ + rs := &Sender{ ClockRate: 0, Period: 100 * time.Millisecond, TimeNow: func() time.Time { diff --git a/server_session.go b/server_session.go index 65d29f96..dc3c2f30 100644 --- a/server_session.go +++ b/server_session.go @@ -23,8 +23,8 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/liberrors" "github.com/bluenviron/gortsplib/v4/pkg/mikey" "github.com/bluenviron/gortsplib/v4/pkg/ntp" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" + "github.com/bluenviron/gortsplib/v4/pkg/rtpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtpsender" "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) @@ -321,7 +321,7 @@ func generateRTPInfoEntry(ssm *serverStreamMedia, now time.Time) *headers.RTPInf format := ssm.formats[ssm.media.Formats[0].PayloadType()] - stats := format.rtcpSender.Stats() + stats := format.rtpSender.Stats() if stats == nil { return nil } @@ -566,19 +566,19 @@ func (ss *ServerSession) Stats() *StatsSession { ret := make(map[format.Format]StatsSessionFormat, len(sm.formats)) for _, fo := range sm.formats { - recvStats := func() *rtcpreceiver.Stats { - if fo.rtcpReceiver != nil { - return fo.rtcpReceiver.Stats() + recvStats := func() *rtpreceiver.Stats { + if fo.rtpReceiver != nil { + return fo.rtpReceiver.Stats() } return nil }() - rtcpSender := func() *rtcpsender.RTCPSender { + rtcpSender := func() *rtpsender.Sender { if ss.setuppedStream != nil { - return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtcpSender + return ss.setuppedStream.medias[med].formats[fo.format.PayloadType()].rtpSender } return nil }() - sentStats := func() *rtcpsender.Stats { + sentStats := func() *rtpsender.Stats { if rtcpSender != nil { return rtcpSender.Stats() } @@ -1772,7 +1772,7 @@ func (ss *ServerSession) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (i func (ss *ServerSession) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) { sm := ss.setuppedMedias[medi] sf := sm.formats[pkt.PayloadType] - return sf.rtcpReceiver.PacketNTP(pkt.Timestamp) + return sf.rtpReceiver.PacketNTP(pkt.Timestamp) } func (ss *ServerSession) handleRequest(req sessionRequestReq) (*base.Response, *ServerSession, error) { diff --git a/server_session_format.go b/server_session_format.go index a3339f68..dcaf8b18 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -11,7 +11,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtpreceiver" ) func serverSessionPickLocalSSRC(sf *serverSessionFormat) (uint32, error) { @@ -45,7 +45,7 @@ type serverSessionFormat struct { onPacketRTP OnPacketRTPFunc localSSRC uint32 - rtcpReceiver *rtcpreceiver.RTCPReceiver + rtpReceiver *rtpreceiver.Receiver writePacketRTPInQueue func([]byte) error rtpPacketsReceived *uint64 rtpPacketsSent *uint64 @@ -80,7 +80,7 @@ func (sf *serverSessionFormat) start() { } if sf.sm.ss.state == ServerSessionStateRecord || sf.sm.media.IsBackChannel { - sf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ + sf.rtpReceiver = &rtpreceiver.Receiver{ ClockRate: sf.format.ClockRate(), LocalSSRC: &sf.localSSRC, UnrealiableTransport: udp, @@ -92,7 +92,7 @@ func (sf *serverSessionFormat) start() { } }, } - err := sf.rtcpReceiver.Initialize() + err := sf.rtpReceiver.Initialize() if err != nil { panic(err) } @@ -100,15 +100,15 @@ func (sf *serverSessionFormat) start() { } func (sf *serverSessionFormat) stop() { - if sf.rtcpReceiver != nil { - sf.rtcpReceiver.Close() - sf.rtcpReceiver = nil + if sf.rtpReceiver != nil { + sf.rtpReceiver.Close() + sf.rtpReceiver = nil } } func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) { - if sf.rtcpReceiver != nil { - stats := sf.rtcpReceiver.Stats() + if sf.rtpReceiver != nil { + stats := sf.rtpReceiver.Stats() if stats != nil { return stats.RemoteSSRC, true } @@ -117,7 +117,7 @@ func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) { } func (sf *serverSessionFormat) readPacketRTP(pkt *rtp.Packet, now time.Time) { - pkts, lost, err := sf.rtcpReceiver.ProcessPacket2(pkt, now, sf.format.PTSEqualsDTS(pkt)) + pkts, lost, err := sf.rtpReceiver.ProcessPacket2(pkt, now, sf.format.PTSEqualsDTS(pkt)) if err != nil { sf.sm.onPacketRTPDecodeError(err) return diff --git a/server_session_media.go b/server_session_media.go index ecd82880..7b78f814 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -176,7 +176,7 @@ func (sm *serverSessionMedia) stop() { func (sm *serverSessionMedia) findFormatByRemoteSSRC(ssrc uint32) *serverSessionFormat { for _, format := range sm.formats { - stats := format.rtcpReceiver.Stats() + stats := format.rtpReceiver.Stats() if stats != nil && stats.RemoteSSRC == ssrc { return format } @@ -317,7 +317,7 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := sm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { - format.rtcpReceiver.ProcessSenderReport(sr, now) + format.rtpReceiver.ProcessSenderReport(sr, now) } } @@ -416,7 +416,7 @@ func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool { if sr, ok := pkt.(*rtcp.SenderReport); ok { format := sm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { - format.rtcpReceiver.ProcessSenderReport(sr, now) + format.rtpReceiver.ProcessSenderReport(sr, now) } } diff --git a/server_stream_format.go b/server_stream_format.go index 607e6a43..194bc9b7 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -10,7 +10,7 @@ import ( "github.com/pion/rtp" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" + "github.com/bluenviron/gortsplib/v4/pkg/rtpsender" ) func randUint32() (uint32, error) { @@ -52,7 +52,7 @@ type serverStreamFormat struct { format format.Format localSSRC uint32 - rtcpSender *rtcpsender.RTCPSender + rtpSender *rtpsender.Sender rtpPacketsSent *uint64 } @@ -65,7 +65,7 @@ func (sf *serverStreamFormat) initialize() error { sf.rtpPacketsSent = new(uint64) - sf.rtcpSender = &rtcpsender.RTCPSender{ + sf.rtpSender = &rtpsender.Sender{ ClockRate: sf.format.ClockRate(), Period: sf.sm.st.Server.senderReportPeriod, TimeNow: sf.sm.st.Server.timeNow, @@ -75,21 +75,21 @@ func (sf *serverStreamFormat) initialize() error { } }, } - sf.rtcpSender.Initialize() + sf.rtpSender.Initialize() return nil } func (sf *serverStreamFormat) close() { - if sf.rtcpSender != nil { - sf.rtcpSender.Close() + if sf.rtpSender != nil { + sf.rtpSender.Close() } } func (sf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error { pkt.SSRC = sf.localSSRC - sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) + sf.rtpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) maxPlainPacketSize := sf.sm.st.Server.MaxPacketSize if sf.sm.srtpOutCtx != nil {