diff --git a/connclientread.go b/connclientread.go index d6ba48d7..2cac73f4 100644 --- a/connclientread.go +++ b/connclientread.go @@ -94,8 +94,9 @@ func (c *ConnClient) backgroundPlayUDP(onFrameDone chan error) { return case <-reportTicker.C: + now := time.Now() for trackId := range c.rtcpReceivers { - report := c.rtcpReceivers[trackId].Report() + report := c.rtcpReceivers[trackId].Report(now) c.udpRtcpListeners[trackId].write(report) } @@ -161,7 +162,7 @@ func (c *ConnClient) backgroundPlayTCP(onFrameDone chan error) { return } - c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + c.rtcpReceivers[frame.TrackId].OnFrame(time.Now(), frame.StreamType, frame.Content) c.readCB(frame.TrackId, frame.StreamType, frame.Content) } @@ -188,8 +189,9 @@ func (c *ConnClient) backgroundPlayTCP(onFrameDone chan error) { return case <-reportTicker.C: + now := time.Now() for trackId := range c.rtcpReceivers { - report := c.rtcpReceivers[trackId].Report() + report := c.rtcpReceivers[trackId].Report(now) c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) frame := base.InterleavedFrame{ TrackId: trackId, diff --git a/connclientudpl.go b/connclientudpl.go index c78ee677..5514e9f9 100644 --- a/connclientudpl.go +++ b/connclientudpl.go @@ -83,9 +83,9 @@ func (l *connClientUDPListener) run() { continue } - atomic.StoreInt64(l.c.udpLastFrameTimes[l.trackId], time.Now().Unix()) - - l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) + now := time.Now() + atomic.StoreInt64(l.c.udpLastFrameTimes[l.trackId], now.Unix()) + l.c.rtcpReceivers[l.trackId].OnFrame(now, l.streamType, buf[:n]) l.c.readCB(l.trackId, l.streamType, buf[:n]) } diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index caee1546..6a928374 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -4,6 +4,7 @@ package rtcpreceiver import ( "math/rand" "sync" + "time" "github.com/pion/rtcp" @@ -18,7 +19,8 @@ type RtcpReceiver struct { senderSSRC uint32 sequenceNumberCycles uint16 lastSequenceNumber uint16 - lastSenderReportTime uint32 + lastSenderReport uint32 + lastSenderReportTime time.Time totalLost uint32 totalLostSinceRR uint32 totalSinceRR uint32 @@ -37,7 +39,7 @@ func New(receiverSSRC *uint32) *RtcpReceiver { } // OnFrame processes a RTP or RTCP frame and extract the needed data. -func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { +func (rr *RtcpReceiver) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -78,7 +80,8 @@ func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { for _, frame := range frames { if sr, ok := (frame).(*rtcp.SenderReport); ok { rr.senderSSRC = sr.SSRC - rr.lastSenderReportTime = uint32(sr.NTPTime >> 16) + rr.lastSenderReport = uint32(sr.NTPTime >> 16) + rr.lastSenderReportTime = ts } } } @@ -86,7 +89,7 @@ func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { } // Report generates a RTCP receiver report. -func (rr *RtcpReceiver) Report() []byte { +func (rr *RtcpReceiver) Report(ts time.Time) []byte { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -96,13 +99,15 @@ func (rr *RtcpReceiver) Report() []byte { { SSRC: rr.senderSSRC, LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), - LastSenderReport: rr.lastSenderReportTime, - FractionLost: func() uint8 { - // equivalent to taking the integer part after multiplying the - // loss fraction by 256 - return uint8(float64(rr.totalLostSinceRR*256) / float64(rr.totalSinceRR)) - }(), - TotalLost: rr.totalLost, + LastSenderReport: rr.lastSenderReport, + // equivalent to taking the integer part after multiplying the + // loss fraction by 256 + FractionLost: uint8(float64(rr.totalLostSinceRR*256) / float64(rr.totalSinceRR)), + TotalLost: rr.totalLost, + // delay, expressed in units of 1/65536 seconds, between + // receiving the last SR packet from source SSRC_n and sending this + // reception report block + Delay: uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536), }, }, } diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index da60c4b8..d1da149e 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -2,6 +2,7 @@ package rtcpreceiver import ( "testing" + "time" "github.com/pion/rtcp" "github.com/pion/rtp" @@ -22,7 +23,8 @@ func TestRtcpReceiverBase(t *testing.T) { OctetCount: 859127, } byts, _ := srPkt.Marshal() - rr.OnFrame(base.StreamTypeRtcp, byts) + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtcp, byts) rtpPkt := rtp.Packet{ Header: rtp.Header{ @@ -36,7 +38,8 @@ func TestRtcpReceiverBase(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, @@ -45,11 +48,13 @@ func TestRtcpReceiverBase(t *testing.T) { SSRC: 0xba9da416, LastSequenceNumber: 946, LastSenderReport: 0x887a17ce, + Delay: 1 * 65536, }, }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, rr.Report()) + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + require.Equal(t, expected, rr.Report(ts)) } func TestRtcpReceiverSequenceOverflow(t *testing.T) { @@ -64,7 +69,8 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { OctetCount: 859127, } byts, _ := srPkt.Marshal() - rr.OnFrame(base.StreamTypeRtcp, byts) + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtcp, byts) rtpPkt := rtp.Packet{ Header: rtp.Header{ @@ -78,7 +84,8 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) rtpPkt = rtp.Packet{ Header: rtp.Header{ @@ -92,7 +99,8 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, @@ -101,11 +109,13 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { SSRC: 0xba9da416, LastSequenceNumber: 1<<16 | 0x0000, LastSenderReport: 0x887a17ce, + Delay: 1 * 65536, }, }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, rr.Report()) + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + require.Equal(t, expected, rr.Report(ts)) } func TestRtcpReceiverPacketLost(t *testing.T) { @@ -120,7 +130,8 @@ func TestRtcpReceiverPacketLost(t *testing.T) { OctetCount: 859127, } byts, _ := srPkt.Marshal() - rr.OnFrame(base.StreamTypeRtcp, byts) + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtcp, byts) rtpPkt := rtp.Packet{ Header: rtp.Header{ @@ -134,7 +145,8 @@ func TestRtcpReceiverPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) rtpPkt = rtp.Packet{ Header: rtp.Header{ @@ -148,7 +160,8 @@ func TestRtcpReceiverPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, @@ -162,11 +175,13 @@ func TestRtcpReceiverPacketLost(t *testing.T) { return uint8(v * 256) }(), TotalLost: 1, + Delay: 1 * 65536, }, }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, rr.Report()) + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + require.Equal(t, expected, rr.Report(ts)) } func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { @@ -181,7 +196,8 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { OctetCount: 859127, } byts, _ := srPkt.Marshal() - rr.OnFrame(base.StreamTypeRtcp, byts) + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtcp, byts) rtpPkt := rtp.Packet{ Header: rtp.Header{ @@ -195,7 +211,8 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) rtpPkt = rtp.Packet{ Header: rtp.Header{ @@ -209,7 +226,8 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { Payload: []byte("\x00\x00"), } byts, _ = rtpPkt.Marshal() - rr.OnFrame(base.StreamTypeRtp, byts) + ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rr.OnFrame(ts, base.StreamTypeRtp, byts) expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, @@ -223,9 +241,11 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { return uint8(v * 256) }(), TotalLost: 2, + Delay: 1 * 65536, }, }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, rr.Report()) + ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC) + require.Equal(t, expected, rr.Report(ts)) } diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index e450ac43..6db1cc7a 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -42,12 +42,12 @@ func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []by if !rs.firstRtpReceived { rs.firstRtpReceived = true rs.senderSSRC = pkt.SSRC - - // save RTP time offset and correspondent time - rs.rtpTimeOffset = pkt.Timestamp - rs.rtpTimeTime = ts } + // always update time to minimize errors + rs.rtpTimeOffset = pkt.Timestamp + rs.rtpTimeTime = ts + rs.packetCount++ rs.octetCount += uint32(len(pkt.Payload)) }