add delay to rtcp receiver reports (#15)

This commit is contained in:
aler9
2020-11-28 12:14:02 +01:00
parent f82fe2309d
commit 3b5901ba01
5 changed files with 63 additions and 36 deletions

View File

@@ -94,8 +94,9 @@ func (c *ConnClient) backgroundPlayUDP(onFrameDone chan error) {
return return
case <-reportTicker.C: case <-reportTicker.C:
now := time.Now()
for trackId := range c.rtcpReceivers { for trackId := range c.rtcpReceivers {
report := c.rtcpReceivers[trackId].Report() report := c.rtcpReceivers[trackId].Report(now)
c.udpRtcpListeners[trackId].write(report) c.udpRtcpListeners[trackId].write(report)
} }
@@ -161,7 +162,7 @@ func (c *ConnClient) backgroundPlayTCP(onFrameDone chan error) {
return return
} }
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.rtcpReceivers[frame.TrackId].OnFrame(time.Now(), frame.StreamType, frame.Content)
c.readCB(frame.TrackId, frame.StreamType, frame.Content) c.readCB(frame.TrackId, frame.StreamType, frame.Content)
} }
@@ -188,8 +189,9 @@ func (c *ConnClient) backgroundPlayTCP(onFrameDone chan error) {
return return
case <-reportTicker.C: case <-reportTicker.C:
now := time.Now()
for trackId := range c.rtcpReceivers { for trackId := range c.rtcpReceivers {
report := c.rtcpReceivers[trackId].Report() report := c.rtcpReceivers[trackId].Report(now)
c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout))
frame := base.InterleavedFrame{ frame := base.InterleavedFrame{
TrackId: trackId, TrackId: trackId,

View File

@@ -83,9 +83,9 @@ func (l *connClientUDPListener) run() {
continue continue
} }
atomic.StoreInt64(l.c.udpLastFrameTimes[l.trackId], time.Now().Unix()) now := time.Now()
atomic.StoreInt64(l.c.udpLastFrameTimes[l.trackId], now.Unix())
l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) l.c.rtcpReceivers[l.trackId].OnFrame(now, l.streamType, buf[:n])
l.c.readCB(l.trackId, l.streamType, buf[:n]) l.c.readCB(l.trackId, l.streamType, buf[:n])
} }

View File

@@ -4,6 +4,7 @@ package rtcpreceiver
import ( import (
"math/rand" "math/rand"
"sync" "sync"
"time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
@@ -18,7 +19,8 @@ type RtcpReceiver struct {
senderSSRC uint32 senderSSRC uint32
sequenceNumberCycles uint16 sequenceNumberCycles uint16
lastSequenceNumber uint16 lastSequenceNumber uint16
lastSenderReportTime uint32 lastSenderReport uint32
lastSenderReportTime time.Time
totalLost uint32 totalLost uint32
totalLostSinceRR uint32 totalLostSinceRR uint32
totalSinceRR uint32 totalSinceRR uint32
@@ -37,7 +39,7 @@ func New(receiverSSRC *uint32) *RtcpReceiver {
} }
// OnFrame processes a RTP or RTCP frame and extract the needed data. // OnFrame processes a RTP or RTCP frame and extract the needed data.
func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { func (rr *RtcpReceiver) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
@@ -78,7 +80,8 @@ func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) {
for _, frame := range frames { for _, frame := range frames {
if sr, ok := (frame).(*rtcp.SenderReport); ok { if sr, ok := (frame).(*rtcp.SenderReport); ok {
rr.senderSSRC = sr.SSRC rr.senderSSRC = sr.SSRC
rr.lastSenderReportTime = uint32(sr.NTPTime >> 16) rr.lastSenderReport = uint32(sr.NTPTime >> 16)
rr.lastSenderReportTime = ts
} }
} }
} }
@@ -86,7 +89,7 @@ func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) {
} }
// Report generates a RTCP receiver report. // Report generates a RTCP receiver report.
func (rr *RtcpReceiver) Report() []byte { func (rr *RtcpReceiver) Report(ts time.Time) []byte {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
@@ -96,13 +99,15 @@ func (rr *RtcpReceiver) Report() []byte {
{ {
SSRC: rr.senderSSRC, SSRC: rr.senderSSRC,
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber),
LastSenderReport: rr.lastSenderReportTime, LastSenderReport: rr.lastSenderReport,
FractionLost: func() uint8 { // equivalent to taking the integer part after multiplying the
// equivalent to taking the integer part after multiplying the // loss fraction by 256
// loss fraction by 256 FractionLost: uint8(float64(rr.totalLostSinceRR*256) / float64(rr.totalSinceRR)),
return uint8(float64(rr.totalLostSinceRR*256) / float64(rr.totalSinceRR)) TotalLost: rr.totalLost,
}(), // delay, expressed in units of 1/65536 seconds, between
TotalLost: rr.totalLost, // receiving the last SR packet from source SSRC_n and sending this
// reception report block
Delay: uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536),
}, },
}, },
} }

View File

@@ -2,6 +2,7 @@ package rtcpreceiver
import ( import (
"testing" "testing"
"time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -22,7 +23,8 @@ func TestRtcpReceiverBase(t *testing.T) {
OctetCount: 859127, OctetCount: 859127,
} }
byts, _ := srPkt.Marshal() byts, _ := srPkt.Marshal()
rr.OnFrame(base.StreamTypeRtcp, byts) ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtcp, byts)
rtpPkt := rtp.Packet{ rtpPkt := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -36,7 +38,8 @@ func TestRtcpReceiverBase(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
@@ -45,11 +48,13 @@ func TestRtcpReceiverBase(t *testing.T) {
SSRC: 0xba9da416, SSRC: 0xba9da416,
LastSequenceNumber: 946, LastSequenceNumber: 946,
LastSenderReport: 0x887a17ce, LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
}, },
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, rr.Report()) ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
} }
func TestRtcpReceiverSequenceOverflow(t *testing.T) { func TestRtcpReceiverSequenceOverflow(t *testing.T) {
@@ -64,7 +69,8 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
OctetCount: 859127, OctetCount: 859127,
} }
byts, _ := srPkt.Marshal() byts, _ := srPkt.Marshal()
rr.OnFrame(base.StreamTypeRtcp, byts) ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtcp, byts)
rtpPkt := rtp.Packet{ rtpPkt := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -78,7 +84,8 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{ rtpPkt = rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -92,7 +99,8 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
@@ -101,11 +109,13 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
SSRC: 0xba9da416, SSRC: 0xba9da416,
LastSequenceNumber: 1<<16 | 0x0000, LastSequenceNumber: 1<<16 | 0x0000,
LastSenderReport: 0x887a17ce, LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
}, },
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, rr.Report()) ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
} }
func TestRtcpReceiverPacketLost(t *testing.T) { func TestRtcpReceiverPacketLost(t *testing.T) {
@@ -120,7 +130,8 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
OctetCount: 859127, OctetCount: 859127,
} }
byts, _ := srPkt.Marshal() byts, _ := srPkt.Marshal()
rr.OnFrame(base.StreamTypeRtcp, byts) ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtcp, byts)
rtpPkt := rtp.Packet{ rtpPkt := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -134,7 +145,8 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{ rtpPkt = rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -148,7 +160,8 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
@@ -162,11 +175,13 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
return uint8(v * 256) return uint8(v * 256)
}(), }(),
TotalLost: 1, TotalLost: 1,
Delay: 1 * 65536,
}, },
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, rr.Report()) ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
} }
func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
@@ -181,7 +196,8 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
OctetCount: 859127, OctetCount: 859127,
} }
byts, _ := srPkt.Marshal() byts, _ := srPkt.Marshal()
rr.OnFrame(base.StreamTypeRtcp, byts) ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtcp, byts)
rtpPkt := rtp.Packet{ rtpPkt := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -195,7 +211,8 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{ rtpPkt = rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -209,7 +226,8 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
Payload: []byte("\x00\x00"), Payload: []byte("\x00\x00"),
} }
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
@@ -223,9 +241,11 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
return uint8(v * 256) return uint8(v * 256)
}(), }(),
TotalLost: 2, TotalLost: 2,
Delay: 1 * 65536,
}, },
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, rr.Report()) ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
} }

View File

@@ -42,12 +42,12 @@ func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []by
if !rs.firstRtpReceived { if !rs.firstRtpReceived {
rs.firstRtpReceived = true rs.firstRtpReceived = true
rs.senderSSRC = pkt.SSRC rs.senderSSRC = pkt.SSRC
// save RTP time offset and correspondent time
rs.rtpTimeOffset = pkt.Timestamp
rs.rtpTimeTime = ts
} }
// always update time to minimize errors
rs.rtpTimeOffset = pkt.Timestamp
rs.rtpTimeTime = ts
rs.packetCount++ rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload)) rs.octetCount += uint32(len(pkt.Payload))
} }