add jitter to rtcp receiver reports; fix #15

This commit is contained in:
aler9
2020-11-28 22:56:04 +01:00
parent 3b5901ba01
commit b2054747b8
6 changed files with 269 additions and 74 deletions

View File

@@ -502,18 +502,23 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
}
}
clockRate, err := track.ClockRate()
if err != nil {
if proto == StreamProtocolUDP {
rtpListener.close()
rtcpListener.close()
}
return nil, fmt.Errorf("unable to get the track clock rate: %s", err)
}
if mode == headers.TransportModePlay {
c.rtcpReceivers[track.Id] = rtcpreceiver.New(nil)
c.rtcpReceivers[track.Id] = rtcpreceiver.New(nil, clockRate)
if proto == StreamProtocolUDP {
v := time.Now().Unix()
c.udpLastFrameTimes[track.Id] = &v
}
} else {
clockRate, err := track.ClockRate()
if err != nil {
return nil, fmt.Errorf("unable to get track clock rate: %s", err)
}
c.rtcpSenders[track.Id] = rtcpsender.New(clockRate)
}

View File

@@ -113,9 +113,9 @@ func (c *ConnClient) backgroundRecordUDP() {
c.publishWriteMutex.Lock()
now := time.Now()
for trackId := range c.rtcpSenders {
report := c.rtcpSenders[trackId].Report(now)
if report != nil {
c.udpRtcpListeners[trackId].write(report)
r := c.rtcpSenders[trackId].Report(now)
if r != nil {
c.udpRtcpListeners[trackId].write(r)
}
}
c.publishWriteMutex.Unlock()
@@ -148,13 +148,13 @@ func (c *ConnClient) backgroundRecordTCP() {
c.publishWriteMutex.Lock()
now := time.Now()
for trackId := range c.rtcpSenders {
report := c.rtcpSenders[trackId].Report(now)
if report != nil {
r := c.rtcpSenders[trackId].Report(now)
if r != nil {
c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout))
frame := base.InterleavedFrame{
TrackId: trackId,
StreamType: StreamTypeRtcp,
Content: report,
Content: r,
}
frame.Write(c.bw)
}

View File

@@ -96,8 +96,10 @@ func (c *ConnClient) backgroundPlayUDP(onFrameDone chan error) {
case <-reportTicker.C:
now := time.Now()
for trackId := range c.rtcpReceivers {
report := c.rtcpReceivers[trackId].Report(now)
c.udpRtcpListeners[trackId].write(report)
r := c.rtcpReceivers[trackId].Report(now)
if r != nil {
c.udpRtcpListeners[trackId].write(r)
}
}
case <-keepaliveTicker.C:
@@ -191,15 +193,17 @@ func (c *ConnClient) backgroundPlayTCP(onFrameDone chan error) {
case <-reportTicker.C:
now := time.Now()
for trackId := range c.rtcpReceivers {
report := c.rtcpReceivers[trackId].Report(now)
r := c.rtcpReceivers[trackId].Report(now)
if r != nil {
c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout))
frame := base.InterleavedFrame{
TrackId: trackId,
StreamType: StreamTypeRtcp,
Content: report,
Content: r,
}
frame.Write(c.bw)
}
}
case err := <-readerDone:
returnError = err

View File

@@ -14,20 +14,28 @@ import (
// RtcpReceiver is a utility to generate RTCP receiver reports.
type RtcpReceiver struct {
receiverSSRC uint32
clockRate float64
mutex sync.Mutex
// data from rtp packets
firstRtpReceived bool
senderSSRC uint32
sequenceNumberCycles uint16
lastSequenceNumber uint16
lastRtpTimeRtp uint32
lastRtpTimeTime time.Time
totalLost uint32
totalLostSinceReport uint32
totalSinceReport uint32
jitter float64
// data from rtcp packets
senderSSRC uint32
lastSenderReport uint32
lastSenderReportTime time.Time
totalLost uint32
totalLostSinceRR uint32
totalSinceRR uint32
}
// New allocates a RtcpReceiver.
func New(receiverSSRC *uint32) *RtcpReceiver {
func New(receiverSSRC *uint32, clockRate int) *RtcpReceiver {
return &RtcpReceiver{
receiverSSRC: func() uint32 {
if receiverSSRC == nil {
@@ -35,41 +43,70 @@ func New(receiverSSRC *uint32) *RtcpReceiver {
}
return *receiverSSRC
}(),
clockRate: float64(clockRate),
}
}
// OnFrame processes a RTP or RTCP frame and extract the needed data.
// OnFrame extracts the needed data from RTP or RTCP frames.
func (rr *RtcpReceiver) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if streamType == base.StreamTypeRtp {
if len(buf) >= 3 {
// extract the sequence number of the first frame
// do not parse the entire packet, extract only the fields we need
if len(buf) >= 8 {
sequenceNumber := uint16(buf[2])<<8 | uint16(buf[3])
rtpTime := uint32(buf[4])<<24 | uint32(buf[5])<<16 | uint32(buf[6])<<8 | uint32(buf[7])
// first frame
if !rr.firstRtpReceived {
rr.firstRtpReceived = true
rr.totalSinceRR = 1
rr.totalSinceReport = 1
rr.lastSequenceNumber = sequenceNumber
rr.lastRtpTimeRtp = rtpTime
rr.lastRtpTimeTime = ts
// subsequent frames
} else {
diff := (sequenceNumber - rr.lastSequenceNumber)
diff := int32(sequenceNumber) - int32(rr.lastSequenceNumber)
if sequenceNumber != (rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(diff) - 1
rr.totalLostSinceRR += uint32(diff) - 1
}
if sequenceNumber < rr.lastSequenceNumber {
// following frame or following frame after an overflow
if diff > 0 || diff < -0x0FFF {
// overflow
if diff < -0x0FFF {
rr.sequenceNumberCycles += 1
}
rr.totalSinceRR += uint32(diff)
// detect lost frames
if sequenceNumber != (rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(uint16(diff) - 1)
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
// allow up to 24 bits
if rr.totalLost > 0xFFFFFF {
rr.totalLost = 0xFFFFFF
}
if rr.totalLostSinceReport > 0xFFFFFF {
rr.totalLostSinceReport = 0xFFFFFF
}
}
// compute jitter
// https://tools.ietf.org/html/rfc3550#page-39
D := ts.Sub(rr.lastRtpTimeTime).Seconds()*rr.clockRate -
(float64(rtpTime) - float64(rr.lastRtpTimeRtp))
if D < 0 {
D = -D
}
rr.jitter += (D - rr.jitter) / 16
rr.totalSinceReport += uint32(uint16(diff))
rr.lastSequenceNumber = sequenceNumber
rr.lastRtpTimeRtp = rtpTime
rr.lastRtpTimeTime = ts
}
// ignore invalid frames (diff = 0) or reordered frames (diff < 0)
}
}
} else {
@@ -102,19 +139,24 @@ func (rr *RtcpReceiver) Report(ts time.Time) []byte {
LastSenderReport: rr.lastSenderReport,
// equivalent to taking the integer part after multiplying the
// loss fraction by 256
FractionLost: uint8(float64(rr.totalLostSinceRR*256) / float64(rr.totalSinceRR)),
FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)),
TotalLost: rr.totalLost,
// delay, expressed in units of 1/65536 seconds, between
// receiving the last SR packet from source SSRC_n and sending this
// reception report block
Delay: uint32(ts.Sub(rr.lastSenderReportTime).Seconds() * 65536),
Jitter: uint32(rr.jitter),
},
},
}
rr.totalLostSinceRR = 0
rr.totalSinceRR = 0
rr.totalLostSinceReport = 0
rr.totalSinceReport = 0
byts, err := report.Marshal()
if err != nil {
panic(err)
}
byts, _ := report.Marshal()
return byts
}

View File

@@ -13,12 +13,12 @@ import (
func TestRtcpReceiverBase(t *testing.T) {
v := uint32(0x65f83afb)
rr := New(&v)
rr := New(&v, 90000)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 1287981738,
RTPTime: 0xafb45733,
PacketCount: 714,
OctetCount: 859127,
}
@@ -32,7 +32,7 @@ func TestRtcpReceiverBase(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -41,25 +41,40 @@ func TestRtcpReceiverBase(t *testing.T) {
ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 0xafb45733 + 90000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 946,
LastSequenceNumber: 947,
LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
Delay: 2 * 65536,
},
},
}
expected, _ := expectedPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
ts = time.Date(2008, 05, 20, 22, 15, 22, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
}
func TestRtcpReceiverSequenceOverflow(t *testing.T) {
func TestRtcpReceiverOverflow(t *testing.T) {
v := uint32(0x65f83afb)
rr := New(&v)
rr := New(&v, 90000)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
@@ -78,7 +93,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 0xffff,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -93,7 +108,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0000,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -120,7 +135,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
func TestRtcpReceiverPacketLost(t *testing.T) {
v := uint32(0x65f83afb)
rr := New(&v)
rr := New(&v, 90000)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
@@ -139,7 +154,7 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0120,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -154,7 +169,7 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0122,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -184,9 +199,9 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
require.Equal(t, expected, rr.Report(ts))
}
func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
func TestRtcpReceiverOverflowPacketLost(t *testing.T) {
v := uint32(0x65f83afb)
rr := New(&v)
rr := New(&v, 90000)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
@@ -205,7 +220,7 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 0xffff,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -220,7 +235,7 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0002,
Timestamp: 1287987768,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
@@ -249,3 +264,126 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
}
func TestRtcpReceiverReorderedPackets(t *testing.T) {
v := uint32(0x65f83afb)
rr := New(&v, 90000)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 1287981738,
PacketCount: 714,
OctetCount: 859127,
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtcp, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x43a7,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x43a6,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 0x43a7,
LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
},
},
}
expected, _ := expectedPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
}
func TestRtcpReceiverJitter(t *testing.T) {
v := uint32(0x65f83afb)
rr := New(&v, 90000)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 0xafb45733,
PacketCount: 714,
OctetCount: 859127,
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtcp, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 0xafb45733 + 45000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 21, 0, time.UTC)
rr.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 947,
LastSenderReport: 0x887a17ce,
Delay: 2 * 65536,
Jitter: 45000 / 16,
},
},
}
expected, _ := expectedPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 22, 0, time.UTC)
require.Equal(t, expected, rr.Report(ts))
}

View File

@@ -15,10 +15,12 @@ import (
type RtcpSender struct {
clockRate float64
mutex sync.Mutex
// data from rtp packets
firstRtpReceived bool
senderSSRC uint32
rtpTimeOffset uint32
rtpTimeTime time.Time
lastRtpTimeRtp uint32
lastRtpTimeTime time.Time
packetCount uint32
octetCount uint32
}
@@ -30,7 +32,7 @@ func New(clockRate int) *RtcpSender {
}
}
// OnFrame processes a RTP or RTCP frame and extract the needed data.
// OnFrame extracts the needed data from RTP or RTCP frames.
func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
@@ -45,8 +47,8 @@ func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []by
}
// always update time to minimize errors
rs.rtpTimeOffset = pkt.Timestamp
rs.rtpTimeTime = ts
rs.lastRtpTimeRtp = pkt.Timestamp
rs.lastRtpTimeTime = ts
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))
@@ -74,11 +76,15 @@ func (rs *RtcpSender) Report(ts time.Time) []byte {
fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF)
return uint64(integerPart)<<32 | uint64(fractionalPart)
}(),
RTPTime: rs.rtpTimeOffset + uint32((ts.Sub(rs.rtpTimeTime)).Seconds()*float64(rs.clockRate)),
RTPTime: rs.lastRtpTimeRtp + uint32((ts.Sub(rs.lastRtpTimeTime)).Seconds()*float64(rs.clockRate)),
PacketCount: rs.packetCount,
OctetCount: rs.octetCount,
}
byts, _ := report.Marshal()
byts, err := report.Marshal()
if err != nil {
panic(err)
}
return byts
}