From 8c4a3ca018936f8de114fedc730d3cc854099f7d Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 24 Dec 2024 21:21:11 +0100 Subject: [PATCH] unexport RTCPSender, RTCPReceiver, LossDetector, Reorderer (#667) --- client_format.go | 43 +- internal/rtcpreceiver/rtcpreceiver.go | 252 +++++++++++ internal/rtcpreceiver/rtcpreceiver_test.go | 402 ++++++++++++++++++ internal/rtcpsender/rtcpsender.go | 132 ++++++ internal/rtcpsender/rtcpsender_test.go | 100 +++++ internal/rtplossdetector/lossdetector.go | 31 ++ internal/rtplossdetector/lossdetector_test.go | 33 ++ internal/rtpreorderer/reorderer.go | 133 ++++++ .../rtpreorderer/reorderer_test.go | 9 +- pkg/rtcpreceiver/rtcpreceiver.go | 241 +---------- pkg/rtcpsender/rtcpsender.go | 121 +----- pkg/rtplossdetector/lossdetector.go | 23 +- pkg/rtpreorderer/reorderer.go | 124 +----- scripts/test.mk | 5 +- server_session_format.go | 27 +- server_stream_format.go | 15 +- 16 files changed, 1186 insertions(+), 505 deletions(-) create mode 100644 internal/rtcpreceiver/rtcpreceiver.go create mode 100644 internal/rtcpreceiver/rtcpreceiver_test.go create mode 100644 internal/rtcpsender/rtcpsender.go create mode 100644 internal/rtcpsender/rtcpsender_test.go create mode 100644 internal/rtplossdetector/lossdetector.go create mode 100644 internal/rtplossdetector/lossdetector_test.go create mode 100644 internal/rtpreorderer/reorderer.go rename {pkg => internal}/rtpreorderer/reorderer_test.go (97%) diff --git a/client_format.go b/client_format.go index 8f63e5a3..26263af5 100644 --- a/client_format.go +++ b/client_format.go @@ -4,12 +4,12 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" + "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" + "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" - "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" - "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) type clientFormat struct { @@ -25,33 +25,36 @@ type clientFormat struct { func (cf *clientFormat) start() { if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel { - cf.rtcpSender = rtcpsender.New( - cf.format.ClockRate(), - cf.cm.c.senderReportPeriod, - cf.cm.c.timeNow, - func(pkt rtcp.Packet) { + cf.rtcpSender = &rtcpsender.RTCPSender{ + ClockRate: cf.format.ClockRate(), + Period: cf.cm.c.senderReportPeriod, + TimeNow: cf.cm.c.timeNow, + WritePacketRTCP: func(pkt rtcp.Packet) { if !cf.cm.c.DisableRTCPSenderReports { cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck } - }) + }, + } + cf.rtcpSender.Initialize() } else { if cf.cm.udpRTPListener != nil { - cf.udpReorderer = rtpreorderer.New() + cf.udpReorderer = &rtpreorderer.Reorderer{} + cf.udpReorderer.Initialize() } else { - cf.tcpLossDetector = rtplossdetector.New() + cf.tcpLossDetector = &rtplossdetector.LossDetector{} } - var err error - cf.rtcpReceiver, err = rtcpreceiver.New( - cf.format.ClockRate(), - nil, - cf.cm.c.receiverReportPeriod, - cf.cm.c.timeNow, - func(pkt rtcp.Packet) { + cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ + ClockRate: cf.format.ClockRate(), + Period: cf.cm.c.receiverReportPeriod, + TimeNow: cf.cm.c.timeNow, + WritePacketRTCP: func(pkt rtcp.Packet) { if cf.cm.udpRTPListener != nil { cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck } - }) + }, + } + err := cf.rtcpReceiver.Initialize() if err != nil { panic(err) } diff --git a/internal/rtcpreceiver/rtcpreceiver.go b/internal/rtcpreceiver/rtcpreceiver.go new file mode 100644 index 00000000..928a10e8 --- /dev/null +++ b/internal/rtcpreceiver/rtcpreceiver.go @@ -0,0 +1,252 @@ +// Package rtcpreceiver contains a utility to generate RTCP receiver reports. +package rtcpreceiver + +import ( + "crypto/rand" + "fmt" + "sync" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// seconds since 1st January 1900 +// higher 32 bits are the integer part, lower 32 bits are the fractional part +func ntpTimeRTCPToGo(v uint64) time.Time { + nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000 + return time.Unix(0, nano) +} + +func randUint32() (uint32, error) { + var b [4]byte + _, err := rand.Read(b[:]) + if err != nil { + return 0, err + } + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil +} + +// RTCPReceiver is a utility to generate RTCP receiver reports. +type RTCPReceiver struct { + ClockRate int + ReceiverSSRC *uint32 + Period time.Duration + TimeNow func() time.Time + WritePacketRTCP func(rtcp.Packet) + + mutex sync.RWMutex + + // data from RTP packets + firstRTPPacketReceived bool + timeInitialized bool + sequenceNumberCycles uint16 + lastSequenceNumber uint16 + senderSSRC uint32 + lastTimeRTP uint32 + lastTimeSystem time.Time + totalLost uint32 + totalLostSinceReport uint32 + totalSinceReport uint32 + jitter float64 + + // data from RTCP packets + firstSenderReportReceived bool + lastSenderReportTimeNTP uint64 + lastSenderReportTimeRTP uint32 + lastSenderReportTimeSystem time.Time + + terminate chan struct{} + done chan struct{} +} + +// Initialize initializes RTCPReceiver. +func (rr *RTCPReceiver) Initialize() error { + if rr.ReceiverSSRC == nil { + v, err := randUint32() + if err != nil { + return err + } + rr.ReceiverSSRC = &v + } + + if rr.TimeNow == nil { + rr.TimeNow = time.Now + } + + rr.terminate = make(chan struct{}) + rr.done = make(chan struct{}) + + go rr.run() + + return nil +} + +// Close closes the RTCPReceiver. +func (rr *RTCPReceiver) Close() { + close(rr.terminate) + <-rr.done +} + +func (rr *RTCPReceiver) run() { + defer close(rr.done) + + t := time.NewTicker(rr.Period) + defer t.Stop() + + for { + select { + case <-t.C: + report := rr.report() + if report != nil { + rr.WritePacketRTCP(report) + } + + case <-rr.terminate: + return + } + } +} + +func (rr *RTCPReceiver) report() rtcp.Packet { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + if !rr.firstRTPPacketReceived { + return nil + } + + system := rr.TimeNow() + + report := &rtcp.ReceiverReport{ + SSRC: *rr.ReceiverSSRC, + Reports: []rtcp.ReceptionReport{ + { + SSRC: rr.senderSSRC, + LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), + // equivalent to taking the integer part after multiplying the + // loss fraction by 256 + FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), + TotalLost: rr.totalLost, + Jitter: uint32(rr.jitter), + }, + }, + } + + if rr.firstSenderReportReceived { + // middle 32 bits out of 64 in the NTP timestamp of last sender report + report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) + + // delay, expressed in units of 1/65536 seconds, between + // receiving the last SR packet from source SSRC_n and sending this + // reception report block + report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) + } + + rr.totalLostSinceReport = 0 + rr.totalSinceReport = 0 + + return report +} + +// ProcessPacket extracts the needed data from RTP packets. +func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + // first packet + if !rr.firstRTPPacketReceived { + rr.firstRTPPacketReceived = true + rr.totalSinceReport = 1 + rr.lastSequenceNumber = pkt.SequenceNumber + rr.senderSSRC = pkt.SSRC + + if ptsEqualsDTS { + rr.timeInitialized = true + rr.lastTimeRTP = pkt.Timestamp + rr.lastTimeSystem = system + } + + // subsequent packets + } else { + if pkt.SSRC != rr.senderSSRC { + return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.senderSSRC) + } + + diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber) + + // overflow + if diff < -0x0FFF { + rr.sequenceNumberCycles++ + } + + // detect lost packets + if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) { + rr.totalLost += uint32(uint16(diff) - 1) + rr.totalLostSinceReport += uint32(uint16(diff) - 1) + + // allow up to 24 bits + if rr.totalLost > 0xFFFFFF { + rr.totalLost = 0xFFFFFF + } + if rr.totalLostSinceReport > 0xFFFFFF { + rr.totalLostSinceReport = 0xFFFFFF + } + } + + rr.totalSinceReport += uint32(uint16(diff)) + rr.lastSequenceNumber = pkt.SequenceNumber + + if ptsEqualsDTS { + if rr.timeInitialized { + // update jitter + // https://tools.ietf.org/html/rfc3550#page-39 + D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) - + (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) + if D < 0 { + D = -D + } + rr.jitter += (D - rr.jitter) / 16 + } + + rr.timeInitialized = true + rr.lastTimeRTP = pkt.Timestamp + rr.lastTimeSystem = system + } + } + + return nil +} + +// ProcessSenderReport extracts the needed data from RTCP sender reports. +func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + rr.firstSenderReportReceived = true + rr.lastSenderReportTimeNTP = sr.NTPTime + rr.lastSenderReportTimeRTP = sr.RTPTime + rr.lastSenderReportTimeSystem = system +} + +// PacketNTP returns the NTP timestamp of the packet. +func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + if !rr.firstSenderReportReceived { + return time.Time{}, false + } + + timeDiff := int32(ts - rr.lastSenderReportTimeRTP) + timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate) + + return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true +} + +// SenderSSRC returns the SSRC of outgoing RTP packets. +func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { + rr.mutex.RLock() + defer rr.mutex.RUnlock() + return rr.senderSSRC, rr.firstRTPPacketReceived +} diff --git a/internal/rtcpreceiver/rtcpreceiver_test.go b/internal/rtcpreceiver/rtcpreceiver_test.go new file mode 100644 index 00000000..df821466 --- /dev/null +++ b/internal/rtcpreceiver/rtcpreceiver_test.go @@ -0,0 +1,402 @@ +package rtcpreceiver + +import ( + "testing" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func uint32Ptr(v uint32) *uint32 { + return &v +} + +func TestRTCPReceiverBase(t *testing.T) { + done := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + ReceiverSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 947, + LastSenderReport: 0x887a17ce, + Delay: 2 * 65536, + }, + }, + }, pkt) + close(done) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 0xafb45733, + PacketCount: 714, + OctetCount: 859127, + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 90000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + <-done +} + +func TestRTCPReceiverOverflow(t *testing.T) { + done := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + ReceiverSSRC: uint32Ptr(0x65f83afb), + Period: 250 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 1 << 16, + LastSenderReport: 0x887a17ce, + Delay: 1 * 65536, + }, + }, + }, pkt) + close(done) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + time.Sleep(400 * time.Millisecond) + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 1287981738, + PacketCount: 714, + OctetCount: 859127, + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0xffff, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0x0000, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + <-done +} + +func TestRTCPReceiverPacketLost(t *testing.T) { + done := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + ReceiverSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 0x0122, + LastSenderReport: 0x887a17ce, + FractionLost: func() uint8 { + v := float64(1) / 3 + return uint8(v * 256) + }(), + TotalLost: 1, + Delay: 1 * 65536, + }, + }, + }, pkt) + close(done) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 1287981738, + PacketCount: 714, + OctetCount: 859127, + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0x0120, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0x0122, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + <-done +} + +func TestRTCPReceiverOverflowPacketLost(t *testing.T) { + done := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + ReceiverSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 1<<16 | 0x0002, + LastSenderReport: 0x887a17ce, + FractionLost: func() uint8 { + v := float64(2) / 4 + return uint8(v * 256) + }(), + TotalLost: 2, + Delay: 1 * 65536, + }, + }, + }, pkt) + close(done) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 1287981738, + PacketCount: 714, + OctetCount: 859127, + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0xffff, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 0x0002, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + <-done +} + +func TestRTCPReceiverJitter(t *testing.T) { + done := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + ReceiverSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 948, + LastSenderReport: 0x887a17ce, + Delay: 2 * 65536, + Jitter: 45000 / 16, + }, + }, + }, pkt) + close(done) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 0xafb45733, + PacketCount: 714, + OctetCount: 859127, + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 45000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 948, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, false) + require.NoError(t, err) + + <-done +} diff --git a/internal/rtcpsender/rtcpsender.go b/internal/rtcpsender/rtcpsender.go new file mode 100644 index 00000000..413a37c1 --- /dev/null +++ b/internal/rtcpsender/rtcpsender.go @@ -0,0 +1,132 @@ +// Package rtcpsender contains a utility to generate RTCP sender reports. +package rtcpsender + +import ( + "sync" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// seconds since 1st January 1900 +// higher 32 bits are the integer part, lower 32 bits are the fractional part +func ntpTimeGoToRTCP(v time.Time) uint64 { + s := uint64(v.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) +} + +// RTCPSender is a utility to generate RTCP sender reports. +type RTCPSender struct { + ClockRate int + Period time.Duration + TimeNow func() time.Time + WritePacketRTCP func(rtcp.Packet) + + mutex sync.RWMutex + + // data from RTP packets + initialized bool + lastTimeRTP uint32 + lastTimeNTP time.Time + lastTimeSystem time.Time + senderSSRC uint32 + lastSequenceNumber uint16 + packetCount uint32 + octetCount uint32 + + terminate chan struct{} + done chan struct{} +} + +// Initialize initializes a RTCPSender. +func (rs *RTCPSender) Initialize() { + if rs.TimeNow == nil { + rs.TimeNow = time.Now + } + + rs.terminate = make(chan struct{}) + rs.done = make(chan struct{}) + + go rs.run() +} + +// Close closes the RTCPSender. +func (rs *RTCPSender) Close() { + close(rs.terminate) + <-rs.done +} + +func (rs *RTCPSender) run() { + defer close(rs.done) + + t := time.NewTicker(rs.Period) + defer t.Stop() + + for { + select { + case <-t.C: + report := rs.report() + if report != nil { + rs.WritePacketRTCP(report) + } + + case <-rs.terminate: + return + } + } +} + +func (rs *RTCPSender) report() rtcp.Packet { + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if !rs.initialized { + return nil + } + + systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem) + ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) + rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate)) + + return &rtcp.SenderReport{ + SSRC: rs.senderSSRC, + NTPTime: ntpTimeGoToRTCP(ntpTime), + RTPTime: rtpTime, + PacketCount: rs.packetCount, + OctetCount: rs.octetCount, + } +} + +// ProcessPacket extracts data from RTP packets. +func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if ptsEqualsDTS { + rs.initialized = true + rs.lastTimeRTP = pkt.Timestamp + rs.lastTimeNTP = ntp + rs.lastTimeSystem = rs.TimeNow() + rs.senderSSRC = pkt.SSRC + } + + rs.lastSequenceNumber = pkt.SequenceNumber + + rs.packetCount++ + rs.octetCount += uint32(len(pkt.Payload)) +} + +// SenderSSRC returns the SSRC of outgoing RTP packets. +func (rs *RTCPSender) SenderSSRC() (uint32, bool) { + rs.mutex.RLock() + defer rs.mutex.RUnlock() + return rs.senderSSRC, rs.initialized +} + +// LastPacketData returns metadata of the last RTP packet. +func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { + rs.mutex.RLock() + defer rs.mutex.RUnlock() + return rs.lastSequenceNumber, rs.lastTimeRTP, rs.lastTimeNTP, rs.initialized +} diff --git a/internal/rtcpsender/rtcpsender_test.go b/internal/rtcpsender/rtcpsender_test.go new file mode 100644 index 00000000..faab96e4 --- /dev/null +++ b/internal/rtcpsender/rtcpsender_test.go @@ -0,0 +1,100 @@ +package rtcpsender + +import ( + "sync" + "testing" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestRTCPSender(t *testing.T) { + var curTime time.Time + var mutex sync.Mutex + + setCurTime := func(v time.Time) { + mutex.Lock() + defer mutex.Unlock() + curTime = v + } + + sent := make(chan struct{}) + + rs := &RTCPSender{ + ClockRate: 90000, + Period: 100 * time.Millisecond, + TimeNow: func() time.Time { + mutex.Lock() + defer mutex.Unlock() + return curTime + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: func() uint64 { + // timeDiff = (24 - 22) = 2 + // 21 + 2 = 23 + d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC) + s := uint64(d.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) + }(), + RTPTime: 1287987768 + 2*90000, + PacketCount: 3, + OctetCount: 6, + }, pkt) + close(sent) + }, + } + rs.Initialize() + defer rs.Close() + + setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC)) + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 1287987768, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rs.ProcessPacket(&rtpPkt, ts, true) + + setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC)) + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 1287987768, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + rs.ProcessPacket(&rtpPkt, ts, true) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 948, + Timestamp: 1287987768, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + rs.ProcessPacket(&rtpPkt, ts, false) + + setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC)) + + <-sent +} diff --git a/internal/rtplossdetector/lossdetector.go b/internal/rtplossdetector/lossdetector.go new file mode 100644 index 00000000..c7abe6ec --- /dev/null +++ b/internal/rtplossdetector/lossdetector.go @@ -0,0 +1,31 @@ +// Package rtplossdetector implements an algorithm that detects lost packets. +package rtplossdetector + +import ( + "github.com/pion/rtp" +) + +// LossDetector detects lost packets. +type LossDetector struct { + initialized bool + expectedSeqNum uint16 +} + +// Process processes a RTP packet. +// It returns the number of lost packets. +func (r *LossDetector) Process(pkt *rtp.Packet) int { + if !r.initialized { + r.initialized = true + r.expectedSeqNum = pkt.SequenceNumber + 1 + return 0 + } + + if pkt.SequenceNumber != r.expectedSeqNum { + diff := pkt.SequenceNumber - r.expectedSeqNum + r.expectedSeqNum = pkt.SequenceNumber + 1 + return int(diff) + } + + r.expectedSeqNum = pkt.SequenceNumber + 1 + return 0 +} diff --git a/internal/rtplossdetector/lossdetector_test.go b/internal/rtplossdetector/lossdetector_test.go new file mode 100644 index 00000000..dd40de70 --- /dev/null +++ b/internal/rtplossdetector/lossdetector_test.go @@ -0,0 +1,33 @@ +package rtplossdetector + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestLossDetector(t *testing.T) { + d := &LossDetector{} + + c := d.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65530, + }, + }) + require.Equal(t, 0, c) + + c = d.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }) + require.Equal(t, 0, c) + + c = d.Process(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65535, + }, + }) + require.Equal(t, 3, c) +} diff --git a/internal/rtpreorderer/reorderer.go b/internal/rtpreorderer/reorderer.go new file mode 100644 index 00000000..d09312c1 --- /dev/null +++ b/internal/rtpreorderer/reorderer.go @@ -0,0 +1,133 @@ +// Package rtpreorderer implements a filter to reorder incoming RTP packets. +package rtpreorderer + +import ( + "github.com/pion/rtp" +) + +const ( + bufferSize = 64 +) + +// Reorderer filters incoming RTP packets, in order to +// - order packets +// - remove duplicate packets +type Reorderer struct { + initialized bool + expectedSeqNum uint16 + buffer []*rtp.Packet + absPos uint16 + negativeCount int +} + +// Initialize initializes a Reorderer. +func (r *Reorderer) Initialize() { + r.buffer = make([]*rtp.Packet, bufferSize) +} + +// Process processes a RTP packet. +// It returns a sequence of ordered packets and the number of lost packets. +func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { + if !r.initialized { + r.initialized = true + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt}, 0 + } + + relPos := int16(pkt.SequenceNumber - r.expectedSeqNum) + + // packet is a duplicate or has been sent + // before the first packet processed by Reorderer. + // discard. + if relPos < 0 { + r.negativeCount++ + + // stream has been resetted, therefore reset reorderer too + if r.negativeCount > bufferSize { + r.negativeCount = 0 + + // clear buffer + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + r.buffer[p] = nil + } + + // reset position + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt}, 0 + } + + return nil, 0 + } + r.negativeCount = 0 + + // there's a missing packet and buffer is full. + // return entire buffer and clear it. + if relPos >= bufferSize { + n := 1 + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + if r.buffer[p] != nil { + n++ + } + } + + ret := make([]*rtp.Packet, n) + pos := 0 + + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + if r.buffer[p] != nil { + ret[pos], r.buffer[p] = r.buffer[p], nil + pos++ + } + } + + ret[pos] = pkt + + r.expectedSeqNum = pkt.SequenceNumber + 1 + return ret, int(relPos) - n + 1 + } + + // there's a missing packet + if relPos != 0 { + p := (r.absPos + uint16(relPos)) & (bufferSize - 1) + + // current packet is a duplicate. discard + if r.buffer[p] != nil { + return nil, 0 + } + + // put current packet in buffer + r.buffer[p] = pkt + return nil, 0 + } + + // all packets have been received correctly. + // return them + + n := uint16(1) + for { + p := (r.absPos + n) & (bufferSize - 1) + if r.buffer[p] == nil { + break + } + n++ + } + + ret := make([]*rtp.Packet, n) + + ret[0] = pkt + r.absPos++ + r.absPos &= (bufferSize - 1) + + for i := uint16(1); i < n; i++ { + ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil + r.absPos++ + r.absPos &= (bufferSize - 1) + } + + r.expectedSeqNum = pkt.SequenceNumber + n + + return ret, 0 +} diff --git a/pkg/rtpreorderer/reorderer_test.go b/internal/rtpreorderer/reorderer_test.go similarity index 97% rename from pkg/rtpreorderer/reorderer_test.go rename to internal/rtpreorderer/reorderer_test.go index 6f888c48..9e391de5 100644 --- a/pkg/rtpreorderer/reorderer_test.go +++ b/internal/rtpreorderer/reorderer_test.go @@ -157,7 +157,8 @@ func TestReorder(t *testing.T) { }, } - r := New() + r := &Reorderer{} + r.Initialize() r.absPos = 40 for _, entry := range sequence { @@ -168,7 +169,8 @@ func TestReorder(t *testing.T) { } func TestBufferIsFull(t *testing.T) { - r := New() + r := &Reorderer{} + r.Initialize() r.absPos = 25 sn := uint16(1564) toMiss := 34 @@ -222,7 +224,8 @@ func TestBufferIsFull(t *testing.T) { } func TestReset(t *testing.T) { - r := New() + r := &Reorderer{} + r.Initialize() sn := uint16(1234) r.Process(&rtp.Packet{ diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 53234457..8f3a6afd 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -2,62 +2,17 @@ package rtcpreceiver import ( - "crypto/rand" - "fmt" - "sync" "time" + "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" "github.com/pion/rtcp" "github.com/pion/rtp" ) -// seconds since 1st January 1900 -// higher 32 bits are the integer part, lower 32 bits are the fractional part -func ntpTimeRTCPToGo(v uint64) time.Time { - nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000 - return time.Unix(0, nano) -} - -func randUint32() (uint32, error) { - var b [4]byte - _, err := rand.Read(b[:]) - if err != nil { - return 0, err - } - return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil -} - // RTCPReceiver is a utility to generate RTCP receiver reports. -type RTCPReceiver struct { - clockRate float64 - receiverSSRC uint32 - period time.Duration - timeNow func() time.Time - writePacketRTCP func(rtcp.Packet) - mutex sync.RWMutex - - // data from RTP packets - firstRTPPacketReceived bool - timeInitialized bool - sequenceNumberCycles uint16 - lastSequenceNumber uint16 - senderSSRC uint32 - lastTimeRTP uint32 - lastTimeSystem time.Time - totalLost uint32 - totalLostSinceReport uint32 - totalSinceReport uint32 - jitter float64 - - // data from RTCP packets - firstSenderReportReceived bool - lastSenderReportTimeNTP uint64 - lastSenderReportTimeRTP uint32 - lastSenderReportTimeSystem time.Time - - terminate chan struct{} - done chan struct{} -} +// +// Deprecated: will be removed in the next version. +type RTCPReceiver rtcpreceiver.RTCPReceiver // New allocates a RTCPReceiver. func New( @@ -67,198 +22,42 @@ func New( timeNow func() time.Time, writePacketRTCP func(rtcp.Packet), ) (*RTCPReceiver, error) { - if receiverSSRC == nil { - v, err := randUint32() - if err != nil { - return nil, err - } - receiverSSRC = &v + rr := &rtcpreceiver.RTCPReceiver{ + ClockRate: clockRate, + ReceiverSSRC: receiverSSRC, + Period: period, + TimeNow: timeNow, + WritePacketRTCP: writePacketRTCP, + } + err := rr.Initialize() + if err != nil { + return nil, err } - if timeNow == nil { - timeNow = time.Now - } - - rr := &RTCPReceiver{ - clockRate: float64(clockRate), - receiverSSRC: *receiverSSRC, - period: period, - timeNow: timeNow, - writePacketRTCP: writePacketRTCP, - terminate: make(chan struct{}), - done: make(chan struct{}), - } - - go rr.run() - - return rr, nil + return (*RTCPReceiver)(rr), nil } // Close closes the RTCPReceiver. func (rr *RTCPReceiver) Close() { - close(rr.terminate) - <-rr.done -} - -func (rr *RTCPReceiver) run() { - defer close(rr.done) - - t := time.NewTicker(rr.period) - defer t.Stop() - - for { - select { - case <-t.C: - report := rr.report() - if report != nil { - rr.writePacketRTCP(report) - } - - case <-rr.terminate: - return - } - } -} - -func (rr *RTCPReceiver) report() rtcp.Packet { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - if !rr.firstRTPPacketReceived { - return nil - } - - system := rr.timeNow() - - report := &rtcp.ReceiverReport{ - SSRC: rr.receiverSSRC, - Reports: []rtcp.ReceptionReport{ - { - SSRC: rr.senderSSRC, - LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), - // equivalent to taking the integer part after multiplying the - // loss fraction by 256 - FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), - TotalLost: rr.totalLost, - Jitter: uint32(rr.jitter), - }, - }, - } - - if rr.firstSenderReportReceived { - // middle 32 bits out of 64 in the NTP timestamp of last sender report - report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) - - // delay, expressed in units of 1/65536 seconds, between - // receiving the last SR packet from source SSRC_n and sending this - // reception report block - report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) - } - - rr.totalLostSinceReport = 0 - rr.totalSinceReport = 0 - - return report + (*rtcpreceiver.RTCPReceiver)(rr).Close() } // ProcessPacket extracts the needed data from RTP packets. func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - // first packet - if !rr.firstRTPPacketReceived { - rr.firstRTPPacketReceived = true - rr.totalSinceReport = 1 - rr.lastSequenceNumber = pkt.SequenceNumber - rr.senderSSRC = pkt.SSRC - - if ptsEqualsDTS { - rr.timeInitialized = true - rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeSystem = system - } - - // subsequent packets - } else { - if pkt.SSRC != rr.senderSSRC { - return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.senderSSRC) - } - - diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber) - - // overflow - if diff < -0x0FFF { - rr.sequenceNumberCycles++ - } - - // detect lost packets - if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) { - rr.totalLost += uint32(uint16(diff) - 1) - rr.totalLostSinceReport += uint32(uint16(diff) - 1) - - // allow up to 24 bits - if rr.totalLost > 0xFFFFFF { - rr.totalLost = 0xFFFFFF - } - if rr.totalLostSinceReport > 0xFFFFFF { - rr.totalLostSinceReport = 0xFFFFFF - } - } - - rr.totalSinceReport += uint32(uint16(diff)) - rr.lastSequenceNumber = pkt.SequenceNumber - - if ptsEqualsDTS { - if rr.timeInitialized { - // update jitter - // https://tools.ietf.org/html/rfc3550#page-39 - D := system.Sub(rr.lastTimeSystem).Seconds()*rr.clockRate - - (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) - if D < 0 { - D = -D - } - rr.jitter += (D - rr.jitter) / 16 - } - - rr.timeInitialized = true - rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeSystem = system - } - } - - return nil + return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacket(pkt, system, ptsEqualsDTS) } // ProcessSenderReport extracts the needed data from RTCP sender reports. func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - rr.firstSenderReportReceived = true - rr.lastSenderReportTimeNTP = sr.NTPTime - rr.lastSenderReportTimeRTP = sr.RTPTime - rr.lastSenderReportTimeSystem = system + (*rtcpreceiver.RTCPReceiver)(rr).ProcessSenderReport(sr, system) } // PacketNTP returns the NTP timestamp of the packet. func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - if !rr.firstSenderReportReceived { - return time.Time{}, false - } - - timeDiff := int32(ts - rr.lastSenderReportTimeRTP) - timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.clockRate) - - return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true + return (*rtcpreceiver.RTCPReceiver)(rr).PacketNTP(ts) } // SenderSSRC returns the SSRC of outgoing RTP packets. func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { - rr.mutex.RLock() - defer rr.mutex.RUnlock() - return rr.senderSSRC, rr.firstRTPPacketReceived + return (*rtcpreceiver.RTCPReceiver)(rr).SenderSSRC() } diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 9af5f4e3..cd2307d2 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -2,41 +2,17 @@ package rtcpsender import ( - "sync" "time" + "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/pion/rtcp" "github.com/pion/rtp" ) -// seconds since 1st January 1900 -// higher 32 bits are the integer part, lower 32 bits are the fractional part -func ntpTimeGoToRTCP(v time.Time) uint64 { - s := uint64(v.UnixNano()) + 2208988800*1000000000 - return (s/1000000000)<<32 | (s % 1000000000) -} - // RTCPSender is a utility to generate RTCP sender reports. -type RTCPSender struct { - clockRate float64 - period time.Duration - timeNow func() time.Time - writePacketRTCP func(rtcp.Packet) - mutex sync.RWMutex - - // data from RTP packets - initialized bool - lastTimeRTP uint32 - lastTimeNTP time.Time - lastTimeSystem time.Time - senderSSRC uint32 - lastSequenceNumber uint16 - packetCount uint32 - octetCount uint32 - - terminate chan struct{} - done chan struct{} -} +// +// Deprecated: will be removed in the next version. +type RTCPSender rtcpsender.RTCPSender // New allocates a RTCPSender. func New( @@ -45,100 +21,33 @@ func New( timeNow func() time.Time, writePacketRTCP func(rtcp.Packet), ) *RTCPSender { - if timeNow == nil { - timeNow = time.Now + rs := &rtcpsender.RTCPSender{ + ClockRate: clockRate, + Period: period, + TimeNow: timeNow, + WritePacketRTCP: writePacketRTCP, } + rs.Initialize() - rs := &RTCPSender{ - clockRate: float64(clockRate), - period: period, - timeNow: timeNow, - writePacketRTCP: writePacketRTCP, - terminate: make(chan struct{}), - done: make(chan struct{}), - } - - go rs.run() - - return rs + return (*RTCPSender)(rs) } // Close closes the RTCPSender. func (rs *RTCPSender) Close() { - close(rs.terminate) - <-rs.done -} - -func (rs *RTCPSender) run() { - defer close(rs.done) - - t := time.NewTicker(rs.period) - defer t.Stop() - - for { - select { - case <-t.C: - report := rs.report() - if report != nil { - rs.writePacketRTCP(report) - } - - case <-rs.terminate: - return - } - } -} - -func (rs *RTCPSender) report() rtcp.Packet { - rs.mutex.Lock() - defer rs.mutex.Unlock() - - if !rs.initialized { - return nil - } - - systemTimeDiff := rs.timeNow().Sub(rs.lastTimeSystem) - ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) - rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate) - - return &rtcp.SenderReport{ - SSRC: rs.senderSSRC, - NTPTime: ntpTimeGoToRTCP(ntpTime), - RTPTime: rtpTime, - PacketCount: rs.packetCount, - OctetCount: rs.octetCount, - } + (*rtcpsender.RTCPSender)(rs).Close() } // ProcessPacket extracts data from RTP packets. func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { - rs.mutex.Lock() - defer rs.mutex.Unlock() - - if ptsEqualsDTS { - rs.initialized = true - rs.lastTimeRTP = pkt.Timestamp - rs.lastTimeNTP = ntp - rs.lastTimeSystem = rs.timeNow() - rs.senderSSRC = pkt.SSRC - } - - rs.lastSequenceNumber = pkt.SequenceNumber - - rs.packetCount++ - rs.octetCount += uint32(len(pkt.Payload)) + (*rtcpsender.RTCPSender)(rs).ProcessPacket(pkt, ntp, ptsEqualsDTS) } // SenderSSRC returns the SSRC of outgoing RTP packets. func (rs *RTCPSender) SenderSSRC() (uint32, bool) { - rs.mutex.RLock() - defer rs.mutex.RUnlock() - return rs.senderSSRC, rs.initialized + return (*rtcpsender.RTCPSender)(rs).SenderSSRC() } // LastPacketData returns metadata of the last RTP packet. func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { - rs.mutex.RLock() - defer rs.mutex.RUnlock() - return rs.lastSequenceNumber, rs.lastTimeRTP, rs.lastTimeNTP, rs.initialized + return (*rtcpsender.RTCPSender)(rs).LastPacketData() } diff --git a/pkg/rtplossdetector/lossdetector.go b/pkg/rtplossdetector/lossdetector.go index bb4e4bfb..9b37238b 100644 --- a/pkg/rtplossdetector/lossdetector.go +++ b/pkg/rtplossdetector/lossdetector.go @@ -2,14 +2,14 @@ package rtplossdetector import ( + "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" "github.com/pion/rtp" ) // LossDetector detects lost packets. -type LossDetector struct { - initialized bool - expectedSeqNum uint16 -} +// +// Deprecated: will be removed in the next version. +type LossDetector rtplossdetector.LossDetector // New allocates a LossDetector. func New() *LossDetector { @@ -19,18 +19,5 @@ func New() *LossDetector { // Process processes a RTP packet. // It returns the number of lost packets. func (r *LossDetector) Process(pkt *rtp.Packet) int { - if !r.initialized { - r.initialized = true - r.expectedSeqNum = pkt.SequenceNumber + 1 - return 0 - } - - if pkt.SequenceNumber != r.expectedSeqNum { - diff := pkt.SequenceNumber - r.expectedSeqNum - r.expectedSeqNum = pkt.SequenceNumber + 1 - return int(diff) - } - - r.expectedSeqNum = pkt.SequenceNumber + 1 - return 0 + return (*rtplossdetector.LossDetector)(r).Process(pkt) } diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index a36bb8c5..7c5b4fde 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -2,134 +2,26 @@ package rtpreorderer import ( + "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/pion/rtp" ) -const ( - bufferSize = 64 -) - // Reorderer filters incoming RTP packets, in order to // - order packets // - remove duplicate packets -type Reorderer struct { - initialized bool - expectedSeqNum uint16 - buffer []*rtp.Packet - absPos uint16 - negativeCount int -} +// +// Deprecated: will be removed in the next version. +type Reorderer rtpreorderer.Reorderer // New allocates a Reorderer. func New() *Reorderer { - return &Reorderer{ - buffer: make([]*rtp.Packet, bufferSize), - } + r := &rtpreorderer.Reorderer{} + r.Initialize() + return (*Reorderer)(r) } // Process processes a RTP packet. // It returns a sequence of ordered packets and the number of lost packets. func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) { - if !r.initialized { - r.initialized = true - r.expectedSeqNum = pkt.SequenceNumber + 1 - return []*rtp.Packet{pkt}, 0 - } - - relPos := int16(pkt.SequenceNumber - r.expectedSeqNum) - - // packet is a duplicate or has been sent - // before the first packet processed by Reorderer. - // discard. - if relPos < 0 { - r.negativeCount++ - - // stream has been resetted, therefore reset reorderer too - if r.negativeCount > bufferSize { - r.negativeCount = 0 - - // clear buffer - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) - r.buffer[p] = nil - } - - // reset position - r.expectedSeqNum = pkt.SequenceNumber + 1 - return []*rtp.Packet{pkt}, 0 - } - - return nil, 0 - } - r.negativeCount = 0 - - // there's a missing packet and buffer is full. - // return entire buffer and clear it. - if relPos >= bufferSize { - n := 1 - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) - if r.buffer[p] != nil { - n++ - } - } - - ret := make([]*rtp.Packet, n) - pos := 0 - - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) - if r.buffer[p] != nil { - ret[pos], r.buffer[p] = r.buffer[p], nil - pos++ - } - } - - ret[pos] = pkt - - r.expectedSeqNum = pkt.SequenceNumber + 1 - return ret, int(relPos) - n + 1 - } - - // there's a missing packet - if relPos != 0 { - p := (r.absPos + uint16(relPos)) & (bufferSize - 1) - - // current packet is a duplicate. discard - if r.buffer[p] != nil { - return nil, 0 - } - - // put current packet in buffer - r.buffer[p] = pkt - return nil, 0 - } - - // all packets have been received correctly. - // return them - - n := uint16(1) - for { - p := (r.absPos + n) & (bufferSize - 1) - if r.buffer[p] == nil { - break - } - n++ - } - - ret := make([]*rtp.Packet, n) - - ret[0] = pkt - r.absPos++ - r.absPos &= (bufferSize - 1) - - for i := uint16(1); i < n; i++ { - ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil - r.absPos++ - r.absPos &= (bufferSize - 1) - } - - r.expectedSeqNum = pkt.SequenceNumber + n - - return ret, 0 + return (*rtpreorderer.Reorderer)(r).Process(pkt) } diff --git a/scripts/test.mk b/scripts/test.mk index 5325f074..7489a774 100644 --- a/scripts/test.mk +++ b/scripts/test.mk @@ -6,13 +6,16 @@ endif test-examples: go build -o /dev/null ./examples/... +test-internal: + go test -v $(RACE) -coverprofile=coverage-internal.txt ./internal/... + test-pkg: go test -v $(RACE) -coverprofile=coverage-pkg.txt ./pkg/... test-root: go test -v $(RACE) -coverprofile=coverage-root.txt . -test-nodocker: test-examples test-pkg test-root +test-nodocker: test-examples test-internal test-pkg test-root define DOCKERFILE_TEST ARG ARCH diff --git a/server_session_format.go b/server_session_format.go index 7988dc80..30c721e3 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -6,11 +6,11 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" + "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" - "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) type serverSessionFormat struct { @@ -26,22 +26,23 @@ type serverSessionFormat struct { func (sf *serverSessionFormat) start() { if sf.sm.ss.state != ServerSessionStatePlay { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { - sf.udpReorderer = rtpreorderer.New() + sf.udpReorderer = &rtpreorderer.Reorderer{} + sf.udpReorderer.Initialize() } else { - sf.tcpLossDetector = rtplossdetector.New() + sf.tcpLossDetector = &rtplossdetector.LossDetector{} } - var err error - sf.rtcpReceiver, err = rtcpreceiver.New( - sf.format.ClockRate(), - nil, - sf.sm.ss.s.receiverReportPeriod, - sf.sm.ss.s.timeNow, - func(pkt rtcp.Packet) { + sf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ + ClockRate: sf.format.ClockRate(), + Period: sf.sm.ss.s.receiverReportPeriod, + TimeNow: sf.sm.ss.s.timeNow, + WritePacketRTCP: func(pkt rtcp.Packet) { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck } - }) + }, + } + err := sf.rtcpReceiver.Initialize() if err != nil { panic(err) } diff --git a/server_stream_format.go b/server_stream_format.go index 85232558..c4fbefc7 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -7,8 +7,8 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" ) type serverStreamFormat struct { @@ -19,16 +19,17 @@ type serverStreamFormat struct { } func (sf *serverStreamFormat) initialize() { - sf.rtcpSender = rtcpsender.New( - sf.format.ClockRate(), - sf.sm.st.s.senderReportPeriod, - sf.sm.st.s.timeNow, - func(pkt rtcp.Packet) { + sf.rtcpSender = &rtcpsender.RTCPSender{ + ClockRate: sf.format.ClockRate(), + Period: sf.sm.st.s.senderReportPeriod, + TimeNow: sf.sm.st.s.timeNow, + WritePacketRTCP: func(pkt rtcp.Packet) { if !sf.sm.st.s.DisableRTCPSenderReports { sf.sm.st.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck } }, - ) + } + sf.rtcpSender.Initialize() } func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {