From 4ad57d6a757f615d60f11b86d918394093f696d4 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 14 Aug 2023 23:23:14 +0200 Subject: [PATCH] preserve NTP when generating sender reports --- client.go | 7 +-- client_format.go | 12 ++--- client_media.go | 4 -- pkg/rtcpreceiver/rtcpreceiver.go | 4 +- pkg/rtcpreceiver/rtcpreceiver_test.go | 10 ++-- pkg/rtcpsender/rtcpsender.go | 54 ++++++++++----------- pkg/rtcpsender/rtcpsender_test.go | 67 ++++++++++++++++++--------- server_stream.go | 11 +---- server_stream_media.go | 5 +- 9 files changed, 93 insertions(+), 81 deletions(-) diff --git a/client.go b/client.go index 68d4845a..4c6f79f5 100644 --- a/client.go +++ b/client.go @@ -1651,12 +1651,13 @@ func (c *Client) OnPacketRTCP(medi *media.Media, cb OnPacketRTCPFunc) { cm.onPacketRTCP = cb } -// WritePacketRTP writes a RTP packet to the media stream. +// WritePacketRTP writes a RTP packet to the server. func (c *Client) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error { return c.WritePacketRTPWithNTP(medi, pkt, time.Now()) } -// WritePacketRTPWithNTP writes a RTP packet to the media stream. +// WritePacketRTPWithNTP writes a RTP packet to the server. +// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports. func (c *Client) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) error { byts := make([]byte, c.MaxPacketSize) n, err := pkt.MarshalTo(byts) @@ -1677,7 +1678,7 @@ func (c *Client) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp t return nil } -// WritePacketRTCP writes a RTCP packet to the media stream. +// WritePacketRTCP writes a RTCP packet to the server. func (c *Client) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) error { byts, err := pkt.Marshal() if err != nil { diff --git a/client_format.go b/client_format.go index 8b261e03..f7094c0e 100644 --- a/client_format.go +++ b/client_format.go @@ -53,19 +53,15 @@ func (ct *clientFormat) start() { } else { ct.rtcpSender = rtcpsender.New( ct.format.ClockRate(), + ct.cm.c.senderReportPeriod, func(pkt rtcp.Packet) { - ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck + if !ct.cm.c.DisableRTCPSenderReports { + ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck + } }) } } -// start writing after write*() has been allocated in order to avoid a crash -func (ct *clientFormat) startWriting() { - if ct.cm.c.state != clientStatePlay && !ct.cm.c.DisableRTCPSenderReports { - ct.rtcpSender.Start(ct.cm.c.senderReportPeriod) - } -} - func (ct *clientFormat) stop() { if ct.udpRTCPReceiver != nil { ct.udpRTCPReceiver.Close() diff --git a/client_media.go b/client_media.go index 7e89f1fa..fe094e6f 100644 --- a/client_media.go +++ b/client_media.go @@ -129,10 +129,6 @@ func (cm *clientMedia) start() { cm.udpRTPListener.start() cm.udpRTCPListener.start() } - - for _, ct := range cm.formats { - ct.startWriting() - } } func (cm *clientMedia) stop() { diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index fcde0d6d..1cbdbada 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -19,7 +19,7 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } -var now = time.Now +var timeNow = time.Now // RTCPReceiver is a utility to generate RTCP receiver reports. type RTCPReceiver struct { @@ -95,7 +95,7 @@ func (rr *RTCPReceiver) run() { for { select { case <-t.C: - report := rr.report(now()) + report := rr.report(timeNow()) if report != nil { rr.writePacketRTCP(report) } diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index f80b256d..2a2333a8 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -10,7 +10,7 @@ import ( ) func TestRTCPReceiverBase(t *testing.T) { - now = func() time.Time { + timeNow = func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) } done := make(chan struct{}) @@ -77,7 +77,7 @@ func TestRTCPReceiverBase(t *testing.T) { func TestRTCPReceiverOverflow(t *testing.T) { done := make(chan struct{}) - now = func() time.Time { + timeNow = func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) } v := uint32(0x65f83afb) @@ -144,7 +144,7 @@ func TestRTCPReceiverOverflow(t *testing.T) { func TestRTCPReceiverPacketLost(t *testing.T) { done := make(chan struct{}) - now = func() time.Time { + timeNow = func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) } v := uint32(0x65f83afb) @@ -214,7 +214,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) { func TestRTCPReceiverOverflowPacketLost(t *testing.T) { done := make(chan struct{}) - now = func() time.Time { + timeNow = func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) } v := uint32(0x65f83afb) @@ -284,7 +284,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) { func TestRTCPReceiverJitter(t *testing.T) { done := make(chan struct{}) - now = func() time.Time { + timeNow = func() time.Time { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) } v := uint32(0x65f83afb) diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index cab08b2c..d341e6ed 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -9,20 +9,27 @@ import ( "github.com/pion/rtp" ) -var now = time.Now +var timeNow = time.Now + +// seconds since 1st January 1900 +// higher 32 bits are the integer part, lower 32 bits are the fractional part +func ntpTimeGoToRTCP(v time.Time) uint64 { + s := uint64(v.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) +} // RTCPSender is a utility to generate RTCP sender reports. type RTCPSender struct { clockRate float64 + period time.Duration writePacketRTCP func(rtcp.Packet) mutex sync.Mutex - started bool - period time.Duration // data from RTP packets initialized bool lastTimeRTP uint32 lastTimeNTP time.Time + lastTimeSystem time.Time lastSSRC uint32 lastSequenceNumber uint16 packetCount uint32 @@ -35,31 +42,26 @@ type RTCPSender struct { // New allocates a RTCPSender. func New( clockRate int, + period time.Duration, writePacketRTCP func(rtcp.Packet), ) *RTCPSender { rs := &RTCPSender{ clockRate: float64(clockRate), + period: period, writePacketRTCP: writePacketRTCP, terminate: make(chan struct{}), done: make(chan struct{}), } + go rs.run() + return rs } // Close closes the RTCPSender. func (rs *RTCPSender) Close() { - if rs.started { - close(rs.terminate) - <-rs.done - } -} - -// Start starts the periodic generation of RTCP sender reports. -func (rs *RTCPSender) Start(period time.Duration) { - rs.started = true - rs.period = period - go rs.run() + close(rs.terminate) + <-rs.done } func (rs *RTCPSender) run() { @@ -71,7 +73,7 @@ func (rs *RTCPSender) run() { for { select { case <-t.C: - report := rs.report(now()) + report := rs.report() if report != nil { rs.writePacketRTCP(report) } @@ -82,29 +84,28 @@ func (rs *RTCPSender) run() { } } -func (rs *RTCPSender) report(ts time.Time) rtcp.Packet { +func (rs *RTCPSender) report() rtcp.Packet { rs.mutex.Lock() defer rs.mutex.Unlock() - if !rs.initialized || rs.clockRate == 0 { + if !rs.initialized { return nil } + systemTimeDiff := timeNow().Sub(rs.lastTimeSystem) + ntpTime := rs.lastTimeNTP.Add(systemTimeDiff) + rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate) + return &rtcp.SenderReport{ - SSRC: rs.lastSSRC, - NTPTime: func() uint64 { - // seconds since 1st January 1900 - // higher 32 bits are the integer part, lower 32 bits are the fractional part - s := uint64(ts.UnixNano()) + 2208988800*1000000000 - return (s/1000000000)<<32 | (s % 1000000000) - }(), - RTPTime: rs.lastTimeRTP + uint32((ts.Sub(rs.lastTimeNTP)).Seconds()*rs.clockRate), + SSRC: rs.lastSSRC, + NTPTime: ntpTimeGoToRTCP(ntpTime), + RTPTime: rtpTime, PacketCount: rs.packetCount, OctetCount: rs.octetCount, } } -// ProcessPacket extracts the needed data from RTP packets. +// ProcessPacket extracts data from RTP packets. func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { rs.mutex.Lock() defer rs.mutex.Unlock() @@ -113,6 +114,7 @@ func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS rs.initialized = true rs.lastTimeRTP = pkt.Timestamp rs.lastTimeNTP = ntp + rs.lastTimeSystem = timeNow() } rs.lastSSRC = pkt.SSRC diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtcpsender/rtcpsender_test.go index d9304a3b..3569b253 100644 --- a/pkg/rtcpsender/rtcpsender_test.go +++ b/pkg/rtcpsender/rtcpsender_test.go @@ -1,6 +1,7 @@ package rtcpsender import ( + "sync" "testing" "time" @@ -10,26 +11,45 @@ import ( ) func TestRTCPSender(t *testing.T) { - now = func() time.Time { - return time.Date(2008, 5, 20, 22, 16, 20, 600000000, time.UTC) - } - done := make(chan struct{}) + var curTime time.Time + var mutex sync.Mutex - rs := New(90000, func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 14690122083862791680, - RTPTime: 0x4d185ae8, - PacketCount: 3, - OctetCount: 6, - }, pkt) - close(done) - }) + setCurTime := func(v time.Time) { + mutex.Lock() + defer mutex.Unlock() + curTime = v + } + + timeNow = func() time.Time { + mutex.Lock() + defer mutex.Unlock() + return curTime + } + + sent := make(chan struct{}) + + rs := New( + 90000, + 100*time.Millisecond, + func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: func() uint64 { + // timeDiff = (24 - 22) = 2 + // 21 + 2 = 23 + d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC) + s := uint64(d.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) + }(), + RTPTime: 1287987768 + 2*90000, + PacketCount: 3, + OctetCount: 6, + }, pkt) + close(sent) + }) defer rs.Close() - rs.Start(250 * time.Millisecond) - time.Sleep(400 * time.Millisecond) - + setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC)) rtpPkt := rtp.Packet{ Header: rtp.Header{ Version: 2, @@ -44,18 +64,19 @@ func TestRTCPSender(t *testing.T) { ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) rs.ProcessPacket(&rtpPkt, ts, true) + setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC)) rtpPkt = rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: true, PayloadType: 96, SequenceNumber: 947, - Timestamp: 1287987768 + 45000, + Timestamp: 1287987768, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC) + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) rs.ProcessPacket(&rtpPkt, ts, true) rtpPkt = rtp.Packet{ @@ -64,13 +85,15 @@ func TestRTCPSender(t *testing.T) { Marker: true, PayloadType: 96, SequenceNumber: 948, - Timestamp: 1287987768 + 90000, + Timestamp: 1287987768, SSRC: 0xba9da416, }, Payload: []byte("\x00\x00"), } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC) + ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) rs.ProcessPacket(&rtpPkt, ts, false) - <-done + setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC)) + + <-sent } diff --git a/server_stream.go b/server_stream.go index 3a488374..da8ffed0 100644 --- a/server_stream.go +++ b/server_stream.go @@ -43,14 +43,6 @@ func NewServerStream(s *Server, medias media.Medias) *ServerStream { st.streamMedias[medi] = newServerStreamMedia(st, medi, i) } - if !st.s.DisableRTCPSenderReports { - for _, ssm := range st.streamMedias { - for _, tr := range ssm.formats { - tr.rtcpSender.Start(st.s.senderReportPeriod) - } - } - } - return st } @@ -248,8 +240,7 @@ func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error } // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. -// ntp is the absolute time of the packet, and is needed to generate RTCP sender reports -// that allows the receiver to reconstruct the absolute time of the packet. +// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports. func (st *ServerStream) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) error { byts := make([]byte, st.s.MaxPacketSize) n, err := pkt.MarshalTo(byts) diff --git a/server_stream_media.go b/server_stream_media.go index 00c63f06..0cbc40bc 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -34,8 +34,11 @@ func newServerStreamMedia(st *ServerStream, medi *media.Media, trackID int) *ser cmedia := medi tr.rtcpSender = rtcpsender.New( forma.ClockRate(), + st.s.senderReportPeriod, func(pkt rtcp.Packet) { - st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck + if !st.s.DisableRTCPSenderReports { + st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck + } }, )