diff --git a/client.go b/client.go index c6893394..7e2b1c4a 100644 --- a/client.go +++ b/client.go @@ -20,8 +20,6 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/bytecounter" @@ -30,6 +28,8 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) @@ -1989,7 +1989,7 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, cm := c.setuppedMedias[medi] cf := cm.formats[pkt.PayloadType] - cf.rtcpSender.ProcessPacketRTP(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) + cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) ok := c.writer.push(func() error { return cf.writePacketRTPInQueue(byts) diff --git a/client_format.go b/client_format.go index 450ac38b..2f903fd5 100644 --- a/client_format.go +++ b/client_format.go @@ -7,11 +7,11 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" - "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" - "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" + "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" + "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) type clientFormat struct { @@ -93,7 +93,7 @@ func (cf *clientFormat) stop() { func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) { packets, lost := cf.udpReorderer.Process(pkt) if lost != 0 { - cf.handlePacketsLost(lost) + cf.handlePacketsLost(uint64(lost)) // do not return } @@ -107,7 +107,7 @@ func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) { func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) { lost := cf.tcpLossDetector.Process(pkt) if lost != 0 { - cf.handlePacketsLost(lost) + cf.handlePacketsLost(uint64(lost)) // do not return } @@ -117,7 +117,7 @@ func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) { } func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { - err := cf.rtcpReceiver.ProcessPacketRTP(pkt, now, cf.format.PTSEqualsDTS(pkt)) + err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt)) if err != nil { cf.cm.onPacketRTPDecodeError(err) return diff --git a/internal/rtcpreceiver/rtcpreceiver.go b/internal/rtcpreceiver/rtcpreceiver.go deleted file mode 100644 index fb4d3a5a..00000000 --- a/internal/rtcpreceiver/rtcpreceiver.go +++ /dev/null @@ -1,278 +0,0 @@ -// Package rtcpreceiver contains a utility to generate RTCP receiver reports. -package rtcpreceiver - -import ( - "crypto/rand" - "fmt" - "sync" - "time" - - "github.com/pion/rtcp" - "github.com/pion/rtp" -) - -// seconds since 1st January 1900 -// higher 32 bits are the integer part, lower 32 bits are the fractional part -func ntpTimeRTCPToGo(v uint64) time.Time { - nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000 - return time.Unix(0, nano) -} - -func randUint32() (uint32, error) { - var b [4]byte - _, err := rand.Read(b[:]) - if err != nil { - return 0, err - } - return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil -} - -// RTCPReceiver is a utility to generate RTCP receiver reports. -type RTCPReceiver struct { - ClockRate int - LocalSSRC *uint32 - Period time.Duration - TimeNow func() time.Time - WritePacketRTCP func(rtcp.Packet) - - mutex sync.RWMutex - - // data from RTP packets - firstRTPPacketReceived bool - timeInitialized bool - sequenceNumberCycles uint16 - lastSequenceNumber uint16 - remoteSSRC uint32 - lastTimeRTP uint32 - lastTimeSystem time.Time - totalLost uint32 - totalLostSinceReport uint32 - totalSinceReport uint32 - jitter float64 - - // data from RTCP packets - firstSenderReportReceived bool - lastSenderReportTimeNTP uint64 - lastSenderReportTimeRTP uint32 - lastSenderReportTimeSystem time.Time - - terminate chan struct{} - done chan struct{} -} - -// Initialize initializes RTCPReceiver. -func (rr *RTCPReceiver) Initialize() error { - if rr.LocalSSRC == nil { - v, err := randUint32() - if err != nil { - return err - } - rr.LocalSSRC = &v - } - - if rr.TimeNow == nil { - rr.TimeNow = time.Now - } - - rr.terminate = make(chan struct{}) - rr.done = make(chan struct{}) - - go rr.run() - - return nil -} - -// Close closes the RTCPReceiver. -func (rr *RTCPReceiver) Close() { - close(rr.terminate) - <-rr.done -} - -func (rr *RTCPReceiver) run() { - defer close(rr.done) - - t := time.NewTicker(rr.Period) - defer t.Stop() - - for { - select { - case <-t.C: - report := rr.report() - if report != nil { - rr.WritePacketRTCP(report) - } - - case <-rr.terminate: - return - } - } -} - -func (rr *RTCPReceiver) report() rtcp.Packet { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - if !rr.firstRTPPacketReceived { - return nil - } - - system := rr.TimeNow() - - report := &rtcp.ReceiverReport{ - SSRC: *rr.LocalSSRC, - Reports: []rtcp.ReceptionReport{ - { - SSRC: rr.remoteSSRC, - LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), - // equivalent to taking the integer part after multiplying the - // loss fraction by 256 - FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), - TotalLost: rr.totalLost, - Jitter: uint32(rr.jitter), - }, - }, - } - - if rr.firstSenderReportReceived { - // middle 32 bits out of 64 in the NTP timestamp of last sender report - report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) - - // delay, expressed in units of 1/65536 seconds, between - // receiving the last SR packet from source SSRC_n and sending this - // reception report block - report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) - } - - rr.totalLostSinceReport = 0 - rr.totalSinceReport = 0 - - return report -} - -// ProcessPacketRTP extracts the needed data from RTP packets. -func (rr *RTCPReceiver) ProcessPacketRTP(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - // first packet - if !rr.firstRTPPacketReceived { - rr.firstRTPPacketReceived = true - rr.totalSinceReport = 1 - rr.lastSequenceNumber = pkt.SequenceNumber - rr.remoteSSRC = pkt.SSRC - - if ptsEqualsDTS { - rr.timeInitialized = true - rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeSystem = system - } - - // subsequent packets - } else { - if pkt.SSRC != rr.remoteSSRC { - return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) - } - - diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber) - - // overflow - if diff < -0x0FFF { - rr.sequenceNumberCycles++ - } - - // detect lost packets - if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) { - rr.totalLost += uint32(uint16(diff) - 1) - rr.totalLostSinceReport += uint32(uint16(diff) - 1) - - // allow up to 24 bits - if rr.totalLost > 0xFFFFFF { - rr.totalLost = 0xFFFFFF - } - if rr.totalLostSinceReport > 0xFFFFFF { - rr.totalLostSinceReport = 0xFFFFFF - } - } - - rr.totalSinceReport += uint32(uint16(diff)) - rr.lastSequenceNumber = pkt.SequenceNumber - - if ptsEqualsDTS { - if rr.timeInitialized { - // update jitter - // https://tools.ietf.org/html/rfc3550#page-39 - D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) - - (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) - if D < 0 { - D = -D - } - rr.jitter += (D - rr.jitter) / 16 - } - - rr.timeInitialized = true - rr.lastTimeRTP = pkt.Timestamp - rr.lastTimeSystem = system - } - } - - return nil -} - -// ProcessSenderReport extracts the needed data from RTCP sender reports. -func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - rr.firstSenderReportReceived = true - rr.lastSenderReportTimeNTP = sr.NTPTime - rr.lastSenderReportTimeRTP = sr.RTPTime - rr.lastSenderReportTimeSystem = system -} - -func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { - if !rr.firstSenderReportReceived { - return time.Time{}, false - } - - timeDiff := int32(ts - rr.lastSenderReportTimeRTP) - timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate) - - return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true -} - -// PacketNTP returns the NTP timestamp of the packet. -func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { - rr.mutex.Lock() - defer rr.mutex.Unlock() - - return rr.packetNTPUnsafe(ts) -} - -// Stats are statistics. -type Stats struct { - RemoteSSRC uint32 - LastSequenceNumber uint16 - LastRTP uint32 - LastNTP time.Time - Jitter float64 -} - -// Stats returns statistics. -func (rr *RTCPReceiver) Stats() *Stats { - rr.mutex.RLock() - defer rr.mutex.RUnlock() - - if !rr.firstRTPPacketReceived { - return nil - } - - ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP) - - return &Stats{ - RemoteSSRC: rr.remoteSSRC, - LastSequenceNumber: rr.lastSequenceNumber, - LastRTP: rr.lastTimeRTP, - LastNTP: ntp, - Jitter: rr.jitter, - } -} diff --git a/internal/rtcpreceiver/rtcpreceiver_test.go b/internal/rtcpreceiver/rtcpreceiver_test.go deleted file mode 100644 index 0a351a39..00000000 --- a/internal/rtcpreceiver/rtcpreceiver_test.go +++ /dev/null @@ -1,402 +0,0 @@ -package rtcpreceiver - -import ( - "testing" - "time" - - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/stretchr/testify/require" -) - -func uint32Ptr(v uint32) *uint32 { - return &v -} - -func TestRTCPReceiverBase(t *testing.T) { - done := make(chan struct{}) - - rr := &RTCPReceiver{ - ClockRate: 90000, - LocalSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 947, - LastSenderReport: 0x887a17ce, - Delay: 2 * 65536, - }, - }, - }, pkt) - close(done) - }, - } - err := rr.Initialize() - require.NoError(t, err) - defer rr.Close() - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 0xafb45733, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) - - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 946, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 947, - Timestamp: 0xafb45733 + 90000, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - <-done -} - -func TestRTCPReceiverOverflow(t *testing.T) { - done := make(chan struct{}) - - rr := &RTCPReceiver{ - ClockRate: 90000, - LocalSSRC: uint32Ptr(0x65f83afb), - Period: 250 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 1 << 16, - LastSenderReport: 0x887a17ce, - Delay: 1 * 65536, - }, - }, - }, pkt) - close(done) - }, - } - err := rr.Initialize() - require.NoError(t, err) - defer rr.Close() - - time.Sleep(400 * time.Millisecond) - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 1287981738, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) - - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0xffff, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0x0000, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - <-done -} - -func TestRTCPReceiverPacketsLost(t *testing.T) { - done := make(chan struct{}) - - rr := &RTCPReceiver{ - ClockRate: 90000, - LocalSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 0x0122, - LastSenderReport: 0x887a17ce, - FractionLost: func() uint8 { - v := float64(1) / 3 - return uint8(v * 256) - }(), - TotalLost: 1, - Delay: 1 * 65536, - }, - }, - }, pkt) - close(done) - }, - } - err := rr.Initialize() - require.NoError(t, err) - defer rr.Close() - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 1287981738, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) - - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0x0120, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0x0122, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - <-done -} - -func TestRTCPReceiverOverflowPacketsLost(t *testing.T) { - done := make(chan struct{}) - - rr := &RTCPReceiver{ - ClockRate: 90000, - LocalSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 1<<16 | 0x0002, - LastSenderReport: 0x887a17ce, - FractionLost: func() uint8 { - v := float64(2) / 4 - return uint8(v * 256) - }(), - TotalLost: 2, - Delay: 1 * 65536, - }, - }, - }, pkt) - close(done) - }, - } - err := rr.Initialize() - require.NoError(t, err) - defer rr.Close() - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 1287981738, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) - - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0xffff, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 0x0002, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - <-done -} - -func TestRTCPReceiverJitter(t *testing.T) { - done := make(chan struct{}) - - rr := &RTCPReceiver{ - ClockRate: 90000, - LocalSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 948, - LastSenderReport: 0x887a17ce, - Delay: 2 * 65536, - Jitter: 45000 / 16, - }, - }, - }, pkt) - close(done) - }, - } - err := rr.Initialize() - require.NoError(t, err) - defer rr.Close() - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 0xafb45733, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) - - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 946, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 947, - Timestamp: 0xafb45733 + 45000, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 948, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - err = rr.ProcessPacketRTP(&rtpPkt, ts, false) - require.NoError(t, err) - - <-done -} diff --git a/internal/rtcpsender/rtcpsender.go b/internal/rtcpsender/rtcpsender.go deleted file mode 100644 index 9e35d794..00000000 --- a/internal/rtcpsender/rtcpsender.go +++ /dev/null @@ -1,143 +0,0 @@ -// Package rtcpsender contains a utility to generate RTCP sender reports. -package rtcpsender - -import ( - "sync" - "time" - - "github.com/pion/rtcp" - "github.com/pion/rtp" -) - -// seconds since 1st January 1900 -// higher 32 bits are the integer part, lower 32 bits are the fractional part -func ntpTimeGoToRTCP(v time.Time) uint64 { - s := uint64(v.UnixNano()) + 2208988800*1000000000 - return (s/1000000000)<<32 | (s % 1000000000) -} - -// RTCPSender is a utility to generate RTCP sender reports. -type RTCPSender struct { - ClockRate int - Period time.Duration - TimeNow func() time.Time - WritePacketRTCP func(rtcp.Packet) - - mutex sync.RWMutex - - // data from RTP packets - firstRTPPacketSent bool - lastTimeRTP uint32 - lastTimeNTP time.Time - lastTimeSystem time.Time - localSSRC uint32 - lastSequenceNumber uint16 - packetCount uint32 - octetCount uint32 - - terminate chan struct{} - done chan struct{} -} - -// Initialize initializes a RTCPSender. -func (rs *RTCPSender) Initialize() { - if rs.TimeNow == nil { - rs.TimeNow = time.Now - } - - rs.terminate = make(chan struct{}) - rs.done = make(chan struct{}) - - go rs.run() -} - -// Close closes the RTCPSender. -func (rs *RTCPSender) Close() { - close(rs.terminate) - <-rs.done -} - -func (rs *RTCPSender) run() { - defer close(rs.done) - - t := time.NewTicker(rs.Period) - defer t.Stop() - - for { - select { - case <-t.C: - report := rs.report() - if report != nil { - rs.WritePacketRTCP(report) - } - - case <-rs.terminate: - return - } - } -} - -func (rs *RTCPSender) report() rtcp.Packet { - rs.mutex.Lock() - defer rs.mutex.Unlock() - - if !rs.firstRTPPacketSent { - return nil - } - - systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem) - ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) - rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate)) - - return &rtcp.SenderReport{ - SSRC: rs.localSSRC, - NTPTime: ntpTimeGoToRTCP(ntpTime), - RTPTime: rtpTime, - PacketCount: rs.packetCount, - OctetCount: rs.octetCount, - } -} - -// ProcessPacketRTP extracts data from RTP packets. -func (rs *RTCPSender) ProcessPacketRTP(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { - rs.mutex.Lock() - defer rs.mutex.Unlock() - - if ptsEqualsDTS { - rs.firstRTPPacketSent = true - rs.lastTimeRTP = pkt.Timestamp - rs.lastTimeNTP = ntp - rs.lastTimeSystem = rs.TimeNow() - rs.localSSRC = pkt.SSRC - } - - rs.lastSequenceNumber = pkt.SequenceNumber - - rs.packetCount++ - rs.octetCount += uint32(len(pkt.Payload)) -} - -// Stats are statistics. -type Stats struct { - LocalSSRC uint32 - LastSequenceNumber uint16 - LastRTP uint32 - LastNTP time.Time -} - -// Stats returns statistics. -func (rs *RTCPSender) Stats() *Stats { - rs.mutex.RLock() - defer rs.mutex.RUnlock() - - if !rs.firstRTPPacketSent { - return nil - } - - return &Stats{ - LocalSSRC: rs.localSSRC, - LastSequenceNumber: rs.lastSequenceNumber, - LastRTP: rs.lastTimeRTP, - LastNTP: rs.lastTimeNTP, - } -} diff --git a/internal/rtcpsender/rtcpsender_test.go b/internal/rtcpsender/rtcpsender_test.go deleted file mode 100644 index 17e88da8..00000000 --- a/internal/rtcpsender/rtcpsender_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package rtcpsender - -import ( - "sync" - "testing" - "time" - - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/stretchr/testify/require" -) - -func TestRTCPSender(t *testing.T) { - var curTime time.Time - var mutex sync.Mutex - - setCurTime := func(v time.Time) { - mutex.Lock() - defer mutex.Unlock() - curTime = v - } - - sent := make(chan struct{}) - - rs := &RTCPSender{ - ClockRate: 90000, - Period: 100 * time.Millisecond, - TimeNow: func() time.Time { - mutex.Lock() - defer mutex.Unlock() - return curTime - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: func() uint64 { - // timeDiff = (24 - 22) = 2 - // 21 + 2 = 23 - d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC) - s := uint64(d.UnixNano()) + 2208988800*1000000000 - return (s/1000000000)<<32 | (s % 1000000000) - }(), - RTPTime: 1287987768 + 2*90000, - PacketCount: 3, - OctetCount: 6, - }, pkt) - close(sent) - }, - } - rs.Initialize() - defer rs.Close() - - setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC)) - rtpPkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 946, - Timestamp: 1287987768, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rs.ProcessPacketRTP(&rtpPkt, ts, true) - - setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC)) - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 947, - Timestamp: 1287987768, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - rs.ProcessPacketRTP(&rtpPkt, ts, true) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 948, - Timestamp: 1287987768, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - rs.ProcessPacketRTP(&rtpPkt, ts, false) - - setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC)) - - <-sent -} diff --git a/internal/rtplossdetector/lossdetector.go b/internal/rtplossdetector/lossdetector.go deleted file mode 100644 index a8f28bf3..00000000 --- a/internal/rtplossdetector/lossdetector.go +++ /dev/null @@ -1,31 +0,0 @@ -// Package rtplossdetector implements an algorithm that detects lost packets. -package rtplossdetector - -import ( - "github.com/pion/rtp" -) - -// LossDetector detects lost packets. -type LossDetector struct { - initialized bool - expectedSeqNum uint16 -} - -// Process processes a RTP packet. -// It returns the number of lost packets. -func (r *LossDetector) Process(pkt *rtp.Packet) uint64 { - if !r.initialized { - r.initialized = true - r.expectedSeqNum = pkt.SequenceNumber + 1 - return 0 - } - - if pkt.SequenceNumber != r.expectedSeqNum { - diff := pkt.SequenceNumber - r.expectedSeqNum - r.expectedSeqNum = pkt.SequenceNumber + 1 - return uint64(diff) - } - - r.expectedSeqNum = pkt.SequenceNumber + 1 - return 0 -} diff --git a/internal/rtplossdetector/lossdetector_test.go b/internal/rtplossdetector/lossdetector_test.go deleted file mode 100644 index 4e489440..00000000 --- a/internal/rtplossdetector/lossdetector_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package rtplossdetector - -import ( - "testing" - - "github.com/pion/rtp" - "github.com/stretchr/testify/require" -) - -func TestLossDetector(t *testing.T) { - d := &LossDetector{} - - c := d.Process(&rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: 65530, - }, - }) - require.Equal(t, uint64(0), c) - - c = d.Process(&rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: 65531, - }, - }) - require.Equal(t, uint64(0), c) - - c = d.Process(&rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: 65535, - }, - }) - require.Equal(t, uint64(3), c) -} diff --git a/internal/rtpreorderer/reorderer.go b/internal/rtpreorderer/reorderer.go deleted file mode 100644 index 7624d1d3..00000000 --- a/internal/rtpreorderer/reorderer.go +++ /dev/null @@ -1,133 +0,0 @@ -// Package rtpreorderer implements a filter to reorder incoming RTP packets. -package rtpreorderer - -import ( - "github.com/pion/rtp" -) - -const ( - bufferSize = 64 -) - -// Reorderer filters incoming RTP packets, in order to -// - order packets -// - remove duplicate packets -type Reorderer struct { - initialized bool - expectedSeqNum uint16 - buffer []*rtp.Packet - absPos uint16 - negativeCount int -} - -// Initialize initializes a Reorderer. -func (r *Reorderer) Initialize() { - r.buffer = make([]*rtp.Packet, bufferSize) -} - -// Process processes a RTP packet. -// It returns a sequence of ordered packets and the number of lost packets. -func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint64) { - if !r.initialized { - r.initialized = true - r.expectedSeqNum = pkt.SequenceNumber + 1 - return []*rtp.Packet{pkt}, 0 - } - - relPos := int16(pkt.SequenceNumber - r.expectedSeqNum) - - // packet is a duplicate or has been sent - // before the first packet processed by Reorderer. - // discard. - if relPos < 0 { - r.negativeCount++ - - // stream has been resetted, therefore reset reorderer too - if r.negativeCount > bufferSize { - r.negativeCount = 0 - - // clear buffer - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) - r.buffer[p] = nil - } - - // reset position - r.expectedSeqNum = pkt.SequenceNumber + 1 - return []*rtp.Packet{pkt}, 0 - } - - return nil, 0 - } - r.negativeCount = 0 - - // there's a missing packet and buffer is full. - // return entire buffer and clear it. - if relPos >= bufferSize { - n := 1 - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) - if r.buffer[p] != nil { - n++ - } - } - - ret := make([]*rtp.Packet, n) - pos := 0 - - for i := uint16(0); i < bufferSize; i++ { - p := (r.absPos + i) & (bufferSize - 1) - if r.buffer[p] != nil { - ret[pos], r.buffer[p] = r.buffer[p], nil - pos++ - } - } - - ret[pos] = pkt - - r.expectedSeqNum = pkt.SequenceNumber + 1 - return ret, uint64(int(relPos) - n + 1) - } - - // there's a missing packet - if relPos != 0 { - p := (r.absPos + uint16(relPos)) & (bufferSize - 1) - - // current packet is a duplicate. discard - if r.buffer[p] != nil { - return nil, 0 - } - - // put current packet in buffer - r.buffer[p] = pkt - return nil, 0 - } - - // all packets have been received correctly. - // return them - - n := uint16(1) - for { - p := (r.absPos + n) & (bufferSize - 1) - if r.buffer[p] == nil { - break - } - n++ - } - - ret := make([]*rtp.Packet, n) - - ret[0] = pkt - r.absPos++ - r.absPos &= (bufferSize - 1) - - for i := uint16(1); i < n; i++ { - ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil - r.absPos++ - r.absPos &= (bufferSize - 1) - } - - r.expectedSeqNum = pkt.SequenceNumber + n - - return ret, 0 -} diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 73be0653..931bc671 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -2,19 +2,67 @@ package rtcpreceiver import ( + "crypto/rand" + "fmt" + "sync" "time" - "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" "github.com/pion/rtcp" "github.com/pion/rtp" ) +// seconds since 1st January 1900 +// higher 32 bits are the integer part, lower 32 bits are the fractional part +func ntpTimeRTCPToGo(v uint64) time.Time { + nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000 + return time.Unix(0, nano) +} + +func randUint32() (uint32, error) { + var b [4]byte + _, err := rand.Read(b[:]) + if err != nil { + return 0, err + } + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil +} + // RTCPReceiver is a utility to generate RTCP receiver reports. -// -// Deprecated: will be removed in the next version. -type RTCPReceiver rtcpreceiver.RTCPReceiver +type RTCPReceiver struct { + ClockRate int + LocalSSRC *uint32 + Period time.Duration + TimeNow func() time.Time + WritePacketRTCP func(rtcp.Packet) + + mutex sync.RWMutex + + // data from RTP packets + firstRTPPacketReceived bool + timeInitialized bool + sequenceNumberCycles uint16 + lastSequenceNumber uint16 + remoteSSRC uint32 + lastTimeRTP uint32 + lastTimeSystem time.Time + totalLost uint32 + totalLostSinceReport uint32 + totalSinceReport uint32 + jitter float64 + + // data from RTCP packets + firstSenderReportReceived bool + lastSenderReportTimeNTP uint64 + lastSenderReportTimeRTP uint32 + lastSenderReportTimeSystem time.Time + + terminate chan struct{} + done chan struct{} +} // New allocates a RTCPReceiver. +// +// Deprecated: replaced by Initialize(). func New( clockRate int, receiverSSRC *uint32, @@ -22,7 +70,7 @@ func New( timeNow func() time.Time, writePacketRTCP func(rtcp.Packet), ) (*RTCPReceiver, error) { - rr := &rtcpreceiver.RTCPReceiver{ + rr := &RTCPReceiver{ ClockRate: clockRate, LocalSSRC: receiverSSRC, Period: period, @@ -34,34 +82,233 @@ func New( return nil, err } - return (*RTCPReceiver)(rr), nil + return rr, nil +} + +// Initialize initializes RTCPReceiver. +func (rr *RTCPReceiver) Initialize() error { + if rr.LocalSSRC == nil { + v, err := randUint32() + if err != nil { + return err + } + rr.LocalSSRC = &v + } + + if rr.TimeNow == nil { + rr.TimeNow = time.Now + } + + rr.terminate = make(chan struct{}) + rr.done = make(chan struct{}) + + go rr.run() + + return nil +} + +func (rr *RTCPReceiver) run() { + defer close(rr.done) + + t := time.NewTicker(rr.Period) + defer t.Stop() + + for { + select { + case <-t.C: + report := rr.report() + if report != nil { + rr.WritePacketRTCP(report) + } + + case <-rr.terminate: + return + } + } +} + +func (rr *RTCPReceiver) report() rtcp.Packet { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + if !rr.firstRTPPacketReceived { + return nil + } + + system := rr.TimeNow() + + report := &rtcp.ReceiverReport{ + SSRC: *rr.LocalSSRC, + Reports: []rtcp.ReceptionReport{ + { + SSRC: rr.remoteSSRC, + LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), + // equivalent to taking the integer part after multiplying the + // loss fraction by 256 + FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), + TotalLost: rr.totalLost, + Jitter: uint32(rr.jitter), + }, + }, + } + + if rr.firstSenderReportReceived { + // middle 32 bits out of 64 in the NTP timestamp of last sender report + report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16) + + // delay, expressed in units of 1/65536 seconds, between + // receiving the last SR packet from source SSRC_n and sending this + // reception report block + report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536) + } + + rr.totalLostSinceReport = 0 + rr.totalSinceReport = 0 + + return report } // Close closes the RTCPReceiver. func (rr *RTCPReceiver) Close() { - (*rtcpreceiver.RTCPReceiver)(rr).Close() + close(rr.terminate) + <-rr.done } // ProcessPacket extracts the needed data from RTP packets. func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { - return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacketRTP(pkt, system, ptsEqualsDTS) + rr.mutex.Lock() + defer rr.mutex.Unlock() + + // first packet + if !rr.firstRTPPacketReceived { + rr.firstRTPPacketReceived = true + rr.totalSinceReport = 1 + rr.lastSequenceNumber = pkt.SequenceNumber + rr.remoteSSRC = pkt.SSRC + + if ptsEqualsDTS { + rr.timeInitialized = true + rr.lastTimeRTP = pkt.Timestamp + rr.lastTimeSystem = system + } + + // subsequent packets + } else { + if pkt.SSRC != rr.remoteSSRC { + return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) + } + + diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber) + + // overflow + if diff < -0x0FFF { + rr.sequenceNumberCycles++ + } + + // detect lost packets + if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) { + rr.totalLost += uint32(uint16(diff) - 1) + rr.totalLostSinceReport += uint32(uint16(diff) - 1) + + // allow up to 24 bits + if rr.totalLost > 0xFFFFFF { + rr.totalLost = 0xFFFFFF + } + if rr.totalLostSinceReport > 0xFFFFFF { + rr.totalLostSinceReport = 0xFFFFFF + } + } + + rr.totalSinceReport += uint32(uint16(diff)) + rr.lastSequenceNumber = pkt.SequenceNumber + + if ptsEqualsDTS { + if rr.timeInitialized { + // update jitter + // https://tools.ietf.org/html/rfc3550#page-39 + D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) - + (float64(pkt.Timestamp) - float64(rr.lastTimeRTP)) + if D < 0 { + D = -D + } + rr.jitter += (D - rr.jitter) / 16 + } + + rr.timeInitialized = true + rr.lastTimeRTP = pkt.Timestamp + rr.lastTimeSystem = system + } + } + + return nil } // ProcessSenderReport extracts the needed data from RTCP sender reports. func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { - (*rtcpreceiver.RTCPReceiver)(rr).ProcessSenderReport(sr, system) + rr.mutex.Lock() + defer rr.mutex.Unlock() + + rr.firstSenderReportReceived = true + rr.lastSenderReportTimeNTP = sr.NTPTime + rr.lastSenderReportTimeRTP = sr.RTPTime + rr.lastSenderReportTimeSystem = system +} + +func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { + if !rr.firstSenderReportReceived { + return time.Time{}, false + } + + timeDiff := int32(ts - rr.lastSenderReportTimeRTP) + timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate) + + return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true } // PacketNTP returns the NTP timestamp of the packet. func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { - return (*rtcpreceiver.RTCPReceiver)(rr).PacketNTP(ts) + rr.mutex.Lock() + defer rr.mutex.Unlock() + + return rr.packetNTPUnsafe(ts) } // SenderSSRC returns the SSRC of outgoing RTP packets. +// +// Deprecated: replaced by Stats(). func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { - stats := (*rtcpreceiver.RTCPReceiver)(rr).Stats() + stats := rr.Stats() if stats == nil { return 0, false } return stats.RemoteSSRC, true } + +// Stats are statistics. +type Stats struct { + RemoteSSRC uint32 + LastSequenceNumber uint16 + LastRTP uint32 + LastNTP time.Time + Jitter float64 +} + +// Stats returns statistics. +func (rr *RTCPReceiver) Stats() *Stats { + rr.mutex.RLock() + defer rr.mutex.RUnlock() + + if !rr.firstRTPPacketReceived { + return nil + } + + ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP) + + return &Stats{ + RemoteSSRC: rr.remoteSSRC, + LastSequenceNumber: rr.lastSequenceNumber, + LastRTP: rr.lastTimeRTP, + LastNTP: ntp, + Jitter: rr.jitter, + } +} diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index 37ac1375..773c9064 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -16,14 +16,14 @@ func uint32Ptr(v uint32) *uint32 { func TestRTCPReceiverBase(t *testing.T) { done := make(chan struct{}) - rr, err := New( - 90000, - uint32Ptr(0x65f83afb), - 500*time.Millisecond, - func() time.Time { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) }, - func(pkt rtcp.Packet) { + WritePacketRTCP: func(pkt rtcp.Packet) { require.Equal(t, &rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -36,7 +36,9 @@ func TestRTCPReceiverBase(t *testing.T) { }, }, pkt) close(done) - }) + }, + } + err := rr.Initialize() require.NoError(t, err) defer rr.Close() @@ -86,14 +88,14 @@ func TestRTCPReceiverBase(t *testing.T) { func TestRTCPReceiverOverflow(t *testing.T) { done := make(chan struct{}) - rr, err := New( - 90000, - uint32Ptr(0x65f83afb), - 250*time.Millisecond, - func() time.Time { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 250 * time.Millisecond, + TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) }, - func(pkt rtcp.Packet) { + WritePacketRTCP: func(pkt rtcp.Packet) { require.Equal(t, &rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -106,7 +108,9 @@ func TestRTCPReceiverOverflow(t *testing.T) { }, }, pkt) close(done) - }) + }, + } + err := rr.Initialize() require.NoError(t, err) defer rr.Close() @@ -158,14 +162,14 @@ func TestRTCPReceiverOverflow(t *testing.T) { func TestRTCPReceiverPacketsLost(t *testing.T) { done := make(chan struct{}) - rr, err := New( - 90000, - uint32Ptr(0x65f83afb), - 500*time.Millisecond, - func() time.Time { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) }, - func(pkt rtcp.Packet) { + WritePacketRTCP: func(pkt rtcp.Packet) { require.Equal(t, &rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -183,7 +187,9 @@ func TestRTCPReceiverPacketsLost(t *testing.T) { }, }, pkt) close(done) - }) + }, + } + err := rr.Initialize() require.NoError(t, err) defer rr.Close() @@ -233,14 +239,14 @@ func TestRTCPReceiverPacketsLost(t *testing.T) { func TestRTCPReceiverOverflowPacketsLost(t *testing.T) { done := make(chan struct{}) - rr, err := New( - 90000, - uint32Ptr(0x65f83afb), - 500*time.Millisecond, - func() time.Time { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) }, - func(pkt rtcp.Packet) { + WritePacketRTCP: func(pkt rtcp.Packet) { require.Equal(t, &rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -258,7 +264,9 @@ func TestRTCPReceiverOverflowPacketsLost(t *testing.T) { }, }, pkt) close(done) - }) + }, + } + err := rr.Initialize() require.NoError(t, err) defer rr.Close() @@ -308,14 +316,14 @@ func TestRTCPReceiverOverflowPacketsLost(t *testing.T) { func TestRTCPReceiverJitter(t *testing.T) { done := make(chan struct{}) - rr, err := New( - 90000, - uint32Ptr(0x65f83afb), - 500*time.Millisecond, - func() time.Time { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) }, - func(pkt rtcp.Packet) { + WritePacketRTCP: func(pkt rtcp.Packet) { require.Equal(t, &rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -329,7 +337,9 @@ func TestRTCPReceiverJitter(t *testing.T) { }, }, pkt) close(done) - }) + }, + } + err := rr.Initialize() require.NoError(t, err) defer rr.Close() diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 135268c8..929d7a6a 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -2,26 +2,53 @@ package rtcpsender import ( + "sync" "time" - "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/pion/rtcp" "github.com/pion/rtp" ) +// seconds since 1st January 1900 +// higher 32 bits are the integer part, lower 32 bits are the fractional part +func ntpTimeGoToRTCP(v time.Time) uint64 { + s := uint64(v.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) +} + // RTCPSender is a utility to generate RTCP sender reports. -// -// Deprecated: will be removed in the next version. -type RTCPSender rtcpsender.RTCPSender +type RTCPSender struct { + ClockRate int + Period time.Duration + TimeNow func() time.Time + WritePacketRTCP func(rtcp.Packet) + + mutex sync.RWMutex + + // data from RTP packets + firstRTPPacketSent bool + lastTimeRTP uint32 + lastTimeNTP time.Time + lastTimeSystem time.Time + localSSRC uint32 + lastSequenceNumber uint16 + packetCount uint32 + octetCount uint32 + + terminate chan struct{} + done chan struct{} +} // New allocates a RTCPSender. +// +// Deprecated: replaced by Initialize(). func New( clockRate int, period time.Duration, timeNow func() time.Time, writePacketRTCP func(rtcp.Packet), ) *RTCPSender { - rs := &rtcpsender.RTCPSender{ + rs := &RTCPSender{ ClockRate: clockRate, Period: period, TimeNow: timeNow, @@ -29,22 +56,92 @@ func New( } rs.Initialize() - return (*RTCPSender)(rs) + return rs +} + +// Initialize initializes a RTCPSender. +func (rs *RTCPSender) Initialize() { + if rs.TimeNow == nil { + rs.TimeNow = time.Now + } + + rs.terminate = make(chan struct{}) + rs.done = make(chan struct{}) + + go rs.run() +} + +func (rs *RTCPSender) run() { + defer close(rs.done) + + t := time.NewTicker(rs.Period) + defer t.Stop() + + for { + select { + case <-t.C: + report := rs.report() + if report != nil { + rs.WritePacketRTCP(report) + } + + case <-rs.terminate: + return + } + } +} + +func (rs *RTCPSender) report() rtcp.Packet { + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if !rs.firstRTPPacketSent { + return nil + } + + systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem) + ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) + rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate)) + + return &rtcp.SenderReport{ + SSRC: rs.localSSRC, + NTPTime: ntpTimeGoToRTCP(ntpTime), + RTPTime: rtpTime, + PacketCount: rs.packetCount, + OctetCount: rs.octetCount, + } } // Close closes the RTCPSender. func (rs *RTCPSender) Close() { - (*rtcpsender.RTCPSender)(rs).Close() + close(rs.terminate) + <-rs.done } // ProcessPacket extracts data from RTP packets. func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { - (*rtcpsender.RTCPSender)(rs).ProcessPacketRTP(pkt, ntp, ptsEqualsDTS) + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if ptsEqualsDTS { + rs.firstRTPPacketSent = true + rs.lastTimeRTP = pkt.Timestamp + rs.lastTimeNTP = ntp + rs.lastTimeSystem = rs.TimeNow() + rs.localSSRC = pkt.SSRC + } + + rs.lastSequenceNumber = pkt.SequenceNumber + + rs.packetCount++ + rs.octetCount += uint32(len(pkt.Payload)) } // SenderSSRC returns the SSRC of outgoing RTP packets. +// +// Deprecated: replaced by Stats(). func (rs *RTCPSender) SenderSSRC() (uint32, bool) { - stats := (*rtcpsender.RTCPSender)(rs).Stats() + stats := rs.Stats() if stats == nil { return 0, false } @@ -53,11 +150,38 @@ func (rs *RTCPSender) SenderSSRC() (uint32, bool) { } // LastPacketData returns metadata of the last RTP packet. +// +// Deprecated: replaced by Stats(). func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { - stats := (*rtcpsender.RTCPSender)(rs).Stats() + stats := rs.Stats() if stats == nil { return 0, 0, time.Time{}, false } return stats.LastSequenceNumber, stats.LastRTP, stats.LastNTP, true } + +// Stats are statistics. +type Stats struct { + LocalSSRC uint32 + LastSequenceNumber uint16 + LastRTP uint32 + LastNTP time.Time +} + +// Stats returns statistics. +func (rs *RTCPSender) Stats() *Stats { + rs.mutex.RLock() + defer rs.mutex.RUnlock() + + if !rs.firstRTPPacketSent { + return nil + } + + return &Stats{ + LocalSSRC: rs.localSSRC, + LastSequenceNumber: rs.lastSequenceNumber, + LastRTP: rs.lastTimeRTP, + LastNTP: rs.lastTimeNTP, + } +} diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtcpsender/rtcpsender_test.go index ac3dc7ab..faab96e4 100644 --- a/pkg/rtcpsender/rtcpsender_test.go +++ b/pkg/rtcpsender/rtcpsender_test.go @@ -22,15 +22,15 @@ func TestRTCPSender(t *testing.T) { sent := make(chan struct{}) - rs := New( - 90000, - 100*time.Millisecond, - func() time.Time { + rs := &RTCPSender{ + ClockRate: 90000, + Period: 100 * time.Millisecond, + TimeNow: func() time.Time { mutex.Lock() defer mutex.Unlock() return curTime }, - func(pkt rtcp.Packet) { + WritePacketRTCP: func(pkt rtcp.Packet) { require.Equal(t, &rtcp.SenderReport{ SSRC: 0xba9da416, NTPTime: func() uint64 { @@ -45,7 +45,9 @@ func TestRTCPSender(t *testing.T) { OctetCount: 6, }, pkt) close(sent) - }) + }, + } + rs.Initialize() defer rs.Close() setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC)) diff --git a/pkg/rtplossdetector/lossdetector.go b/pkg/rtplossdetector/lossdetector.go index d0f9aabe..219af570 100644 --- a/pkg/rtplossdetector/lossdetector.go +++ b/pkg/rtplossdetector/lossdetector.go @@ -2,16 +2,18 @@ package rtplossdetector import ( - "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" "github.com/pion/rtp" ) // LossDetector detects lost packets. -// -// Deprecated: will be removed in the next version. -type LossDetector rtplossdetector.LossDetector +type LossDetector struct { + initialized bool + expectedSeqNum uint16 +} // New allocates a LossDetector. +// +// Deprecated: Useless. func New() *LossDetector { return &LossDetector{} } @@ -19,5 +21,18 @@ func New() *LossDetector { // Process processes a RTP packet. // It returns the number of lost packets. func (r *LossDetector) Process(pkt *rtp.Packet) uint { - return uint((*rtplossdetector.LossDetector)(r).Process(pkt)) + if !r.initialized { + r.initialized = true + r.expectedSeqNum = pkt.SequenceNumber + 1 + return 0 + } + + if pkt.SequenceNumber != r.expectedSeqNum { + diff := pkt.SequenceNumber - r.expectedSeqNum + r.expectedSeqNum = pkt.SequenceNumber + 1 + return uint(diff) + } + + r.expectedSeqNum = pkt.SequenceNumber + 1 + return 0 } diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index ba63dda1..00043b0f 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -2,27 +2,141 @@ package rtpreorderer import ( - "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/pion/rtp" ) +const ( + bufferSize = 64 +) + // Reorderer filters incoming RTP packets, in order to // - order packets // - remove duplicate packets -// -// Deprecated: will be removed in the next version. -type Reorderer rtpreorderer.Reorderer +type Reorderer struct { + initialized bool + expectedSeqNum uint16 + buffer []*rtp.Packet + absPos uint16 + negativeCount int +} // New allocates a Reorderer. +// +// Deprecated: replaced by Initialize(). func New() *Reorderer { - r := &rtpreorderer.Reorderer{} + r := &Reorderer{} r.Initialize() - return (*Reorderer)(r) + return r +} + +// Initialize initializes a Reorderer. +func (r *Reorderer) Initialize() { + r.buffer = make([]*rtp.Packet, bufferSize) } // Process processes a RTP packet. // It returns a sequence of ordered packets and the number of lost packets. func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) { - v1, v2 := (*rtpreorderer.Reorderer)(r).Process(pkt) - return v1, uint(v2) + if !r.initialized { + r.initialized = true + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt}, 0 + } + + relPos := int16(pkt.SequenceNumber - r.expectedSeqNum) + + // packet is a duplicate or has been sent + // before the first packet processed by Reorderer. + // discard. + if relPos < 0 { + r.negativeCount++ + + // stream has been resetted, therefore reset reorderer too + if r.negativeCount > bufferSize { + r.negativeCount = 0 + + // clear buffer + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + r.buffer[p] = nil + } + + // reset position + r.expectedSeqNum = pkt.SequenceNumber + 1 + return []*rtp.Packet{pkt}, 0 + } + + return nil, 0 + } + r.negativeCount = 0 + + // there's a missing packet and buffer is full. + // return entire buffer and clear it. + if relPos >= bufferSize { + n := 1 + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + if r.buffer[p] != nil { + n++ + } + } + + ret := make([]*rtp.Packet, n) + pos := 0 + + for i := uint16(0); i < bufferSize; i++ { + p := (r.absPos + i) & (bufferSize - 1) + if r.buffer[p] != nil { + ret[pos], r.buffer[p] = r.buffer[p], nil + pos++ + } + } + + ret[pos] = pkt + + r.expectedSeqNum = pkt.SequenceNumber + 1 + return ret, uint(int(relPos) - n + 1) + } + + // there's a missing packet + if relPos != 0 { + p := (r.absPos + uint16(relPos)) & (bufferSize - 1) + + // current packet is a duplicate. discard + if r.buffer[p] != nil { + return nil, 0 + } + + // put current packet in buffer + r.buffer[p] = pkt + return nil, 0 + } + + // all packets have been received correctly. + // return them + + n := uint16(1) + for { + p := (r.absPos + n) & (bufferSize - 1) + if r.buffer[p] == nil { + break + } + n++ + } + + ret := make([]*rtp.Packet, n) + + ret[0] = pkt + r.absPos++ + r.absPos &= (bufferSize - 1) + + for i := uint16(1); i < n; i++ { + ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil + r.absPos++ + r.absPos &= (bufferSize - 1) + } + + r.expectedSeqNum = pkt.SequenceNumber + n + + return ret, 0 } diff --git a/internal/rtpreorderer/reorderer_test.go b/pkg/rtpreorderer/reorderer_test.go similarity index 93% rename from internal/rtpreorderer/reorderer_test.go rename to pkg/rtpreorderer/reorderer_test.go index 6e1eb318..c80af535 100644 --- a/internal/rtpreorderer/reorderer_test.go +++ b/pkg/rtpreorderer/reorderer_test.go @@ -164,7 +164,7 @@ func TestReorder(t *testing.T) { for _, entry := range sequence { out, missing := r.Process(entry.in) require.Equal(t, entry.out, out) - require.Equal(t, uint64(0), missing) + require.Equal(t, uint(0), missing) } } @@ -173,7 +173,7 @@ func TestBufferIsFull(t *testing.T) { r.Initialize() r.absPos = 25 sn := uint16(1564) - toMiss := uint64(34) + toMiss := uint(34) out, missing := r.Process(&rtp.Packet{ Header: rtp.Header{ @@ -185,19 +185,19 @@ func TestBufferIsFull(t *testing.T) { SequenceNumber: sn, }, }}, out) - require.Equal(t, uint64(0), missing) + require.Equal(t, uint(0), missing) sn++ var expected []*rtp.Packet - for i := uint64(0); i < 64-toMiss; i++ { + for i := uint(0); i < 64-toMiss; i++ { out, missing = r.Process(&rtp.Packet{ Header: rtp.Header{ SequenceNumber: sn + uint16(toMiss), }, }) require.Equal(t, []*rtp.Packet(nil), out) - require.Equal(t, uint64(0), missing) + require.Equal(t, uint(0), missing) expected = append(expected, &rtp.Packet{ Header: rtp.Header{ @@ -242,7 +242,7 @@ func TestReset(t *testing.T) { }, }) require.Equal(t, []*rtp.Packet(nil), out) - require.Equal(t, uint64(0), missing) + require.Equal(t, uint(0), missing) sn++ } @@ -256,5 +256,5 @@ func TestReset(t *testing.T) { SequenceNumber: sn, }, }}, out) - require.Equal(t, uint64(0), missing) + require.Equal(t, uint(0), missing) } diff --git a/scripts/test.mk b/scripts/test.mk index 7489a774..5325f074 100644 --- a/scripts/test.mk +++ b/scripts/test.mk @@ -6,16 +6,13 @@ endif test-examples: go build -o /dev/null ./examples/... -test-internal: - go test -v $(RACE) -coverprofile=coverage-internal.txt ./internal/... - test-pkg: go test -v $(RACE) -coverprofile=coverage-pkg.txt ./pkg/... test-root: go test -v $(RACE) -coverprofile=coverage-root.txt . -test-nodocker: test-examples test-internal test-pkg test-root +test-nodocker: test-examples test-pkg test-root define DOCKERFILE_TEST ARG ARCH diff --git a/server_session.go b/server_session.go index 7cfa1d97..beca57a8 100644 --- a/server_session.go +++ b/server_session.go @@ -15,13 +15,13 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) diff --git a/server_session_format.go b/server_session_format.go index ad0e530a..865a81cd 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -8,11 +8,11 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/internal/rtplossdetector" - "github.com/bluenviron/gortsplib/v4/internal/rtpreorderer" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" + "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" + "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) type serverSessionFormat struct { @@ -79,7 +79,7 @@ func (sf *serverSessionFormat) stop() { func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) { packets, lost := sf.udpReorderer.Process(pkt) if lost != 0 { - sf.onPacketRTPLost(lost) + sf.onPacketRTPLost(uint64(lost)) // do not return } @@ -91,7 +91,7 @@ func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) { lost := sf.tcpLossDetector.Process(pkt) if lost != 0 { - sf.onPacketRTPLost(lost) + sf.onPacketRTPLost(uint64(lost)) // do not return } @@ -101,7 +101,7 @@ func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) { } func (sf *serverSessionFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { - err := sf.rtcpReceiver.ProcessPacketRTP(pkt, now, sf.format.PTSEqualsDTS(pkt)) + err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) if err != nil { sf.sm.onPacketRTPDecodeError(err) return diff --git a/server_stream_format.go b/server_stream_format.go index 26be0a5e..0498ae50 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -7,8 +7,8 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/internal/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" ) type serverStreamFormat struct { @@ -36,7 +36,7 @@ func (sf *serverStreamFormat) initialize() { } func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { - sf.rtcpSender.ProcessPacketRTP(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) + sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) le := uint64(len(byts))