rtcp sender/report: do not send reports until all needed data is available

This commit is contained in:
aler9
2022-04-08 16:30:51 +02:00
parent f4efe9ceb5
commit 5b37e9d5bb
2 changed files with 34 additions and 31 deletions

View File

@@ -29,8 +29,8 @@ type RTCPReceiver struct {
// data from RTP packets // data from RTP packets
firstRTPReceived bool firstRTPReceived bool
sequenceNumberCycles uint16 sequenceNumberCycles uint16
lastSequenceNumber uint16 lastSequenceNumber *uint16
lastRTPTimeRTP uint32 lastRTPTimeRTP *uint32
lastRTPTimeTime time.Time lastRTPTimeTime time.Time
totalLost uint32 totalLost uint32
totalLostSinceReport uint32 totalLostSinceReport uint32
@@ -39,7 +39,7 @@ type RTCPReceiver struct {
// data from rtcp packets // data from rtcp packets
senderSSRC uint32 senderSSRC uint32
lastSenderReport uint32 lastSenderReportRTP *uint32
lastSenderReportTime time.Time lastSenderReportTime time.Time
terminate chan struct{} terminate chan struct{}
@@ -99,8 +99,7 @@ func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
var zero time.Time if rr.lastSenderReportRTP == nil || rr.lastSequenceNumber == nil {
if rr.lastSenderReportTime == zero {
return nil return nil
} }
@@ -109,8 +108,8 @@ func (rr *RTCPReceiver) report(ts time.Time) rtcp.Packet {
Reports: []rtcp.ReceptionReport{ Reports: []rtcp.ReceptionReport{
{ {
SSRC: rr.senderSSRC, SSRC: rr.senderSSRC,
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(*rr.lastSequenceNumber),
LastSenderReport: rr.lastSenderReport, LastSenderReport: *rr.lastSenderReportRTP,
// equivalent to taking the integer part after multiplying the // equivalent to taking the integer part after multiplying the
// loss fraction by 256 // loss fraction by 256
FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)),
@@ -136,19 +135,21 @@ func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqual
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
// first packet // first packet
if !rr.firstRTPReceived { if rr.lastSequenceNumber == nil {
rr.firstRTPReceived = true rr.firstRTPReceived = true
rr.totalSinceReport = 1 rr.totalSinceReport = 1
rr.lastSequenceNumber = pkt.Header.SequenceNumber v := pkt.Header.SequenceNumber
rr.lastSequenceNumber = &v
if ptsEqualsDTS { if ptsEqualsDTS {
rr.lastRTPTimeRTP = pkt.Header.Timestamp v := pkt.Header.Timestamp
rr.lastRTPTimeRTP = &v
rr.lastRTPTimeTime = ts rr.lastRTPTimeTime = ts
} }
// subsequent packets // subsequent packets
} else { } else {
diff := int32(pkt.Header.SequenceNumber) - int32(rr.lastSequenceNumber) diff := int32(pkt.Header.SequenceNumber) - int32(*rr.lastSequenceNumber)
// following packet or following packet after an overflow // following packet or following packet after an overflow
if diff > 0 || diff < -0x0FFF { if diff > 0 || diff < -0x0FFF {
@@ -158,7 +159,7 @@ func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqual
} }
// detect lost packets // detect lost packets
if pkt.Header.SequenceNumber != (rr.lastSequenceNumber + 1) { if pkt.Header.SequenceNumber != (*rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(uint16(diff) - 1) rr.totalLost += uint32(uint16(diff) - 1)
rr.totalLostSinceReport += uint32(uint16(diff) - 1) rr.totalLostSinceReport += uint32(uint16(diff) - 1)
@@ -172,22 +173,23 @@ func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqual
} }
rr.totalSinceReport += uint32(uint16(diff)) rr.totalSinceReport += uint32(uint16(diff))
rr.lastSequenceNumber = pkt.Header.SequenceNumber v := pkt.Header.SequenceNumber
rr.lastSequenceNumber = &v
if ptsEqualsDTS { if ptsEqualsDTS {
var zero time.Time if rr.lastRTPTimeRTP != nil {
if rr.lastRTPTimeTime != zero {
// update jitter // update jitter
// https://tools.ietf.org/html/rfc3550#page-39 // https://tools.ietf.org/html/rfc3550#page-39
D := ts.Sub(rr.lastRTPTimeTime).Seconds()*rr.clockRate - D := ts.Sub(rr.lastRTPTimeTime).Seconds()*rr.clockRate -
(float64(pkt.Header.Timestamp) - float64(rr.lastRTPTimeRTP)) (float64(pkt.Header.Timestamp) - float64(*rr.lastRTPTimeRTP))
if D < 0 { if D < 0 {
D = -D D = -D
} }
rr.jitter += (D - rr.jitter) / 16 rr.jitter += (D - rr.jitter) / 16
} }
rr.lastRTPTimeRTP = pkt.Header.Timestamp v := pkt.Header.Timestamp
rr.lastRTPTimeRTP = &v
rr.lastRTPTimeTime = ts rr.lastRTPTimeTime = ts
} }
} }
@@ -202,7 +204,8 @@ func (rr *RTCPReceiver) ProcessPacketRTCP(ts time.Time, pkt rtcp.Packet) {
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
rr.senderSSRC = sr.SSRC rr.senderSSRC = sr.SSRC
rr.lastSenderReport = uint32(sr.NTPTime >> 16) v := uint32(sr.NTPTime >> 16)
rr.lastSenderReportRTP = &v
rr.lastSenderReportTime = ts rr.lastSenderReportTime = ts
} }
} }

View File

@@ -19,12 +19,11 @@ type RTCPSender struct {
mutex sync.Mutex mutex sync.Mutex
// data from RTP packets // data from RTP packets
firstRTPReceived bool senderSSRC *uint32
senderSSRC uint32 lastRTPTimeRTP *uint32
lastRTPTimeRTP uint32 lastRTPTimeTime time.Time
lastRTPTimeTime time.Time packetCount uint32
packetCount uint32 octetCount uint32
octetCount uint32
terminate chan struct{} terminate chan struct{}
done chan struct{} done chan struct{}
@@ -77,12 +76,12 @@ func (rs *RTCPSender) report(ts time.Time) rtcp.Packet {
rs.mutex.Lock() rs.mutex.Lock()
defer rs.mutex.Unlock() defer rs.mutex.Unlock()
if !rs.firstRTPReceived { if rs.senderSSRC == nil || rs.lastRTPTimeRTP == nil {
return nil return nil
} }
return &rtcp.SenderReport{ return &rtcp.SenderReport{
SSRC: rs.senderSSRC, SSRC: *rs.senderSSRC,
NTPTime: func() uint64 { NTPTime: func() uint64 {
// seconds since 1st January 1900 // seconds since 1st January 1900
s := (float64(ts.UnixNano()) / 1000000000) + 2208988800 s := (float64(ts.UnixNano()) / 1000000000) + 2208988800
@@ -92,7 +91,7 @@ func (rs *RTCPSender) report(ts time.Time) rtcp.Packet {
fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF) fractionalPart := uint32((s - float64(integerPart)) * 0xFFFFFFFF)
return uint64(integerPart)<<32 | uint64(fractionalPart) return uint64(integerPart)<<32 | uint64(fractionalPart)
}(), }(),
RTPTime: rs.lastRTPTimeRTP + uint32((ts.Sub(rs.lastRTPTimeTime)).Seconds()*rs.clockRate), RTPTime: *rs.lastRTPTimeRTP + uint32((ts.Sub(rs.lastRTPTimeTime)).Seconds()*rs.clockRate),
PacketCount: rs.packetCount, PacketCount: rs.packetCount,
OctetCount: rs.octetCount, OctetCount: rs.octetCount,
} }
@@ -103,13 +102,14 @@ func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqualsD
rs.mutex.Lock() rs.mutex.Lock()
defer rs.mutex.Unlock() defer rs.mutex.Unlock()
if !rs.firstRTPReceived { if rs.senderSSRC == nil {
rs.firstRTPReceived = true v := pkt.SSRC
rs.senderSSRC = pkt.SSRC rs.senderSSRC = &v
} }
if ptsEqualsDTS { if ptsEqualsDTS {
rs.lastRTPTimeRTP = pkt.Timestamp v := pkt.Timestamp
rs.lastRTPTimeRTP = &v
rs.lastRTPTimeTime = ts rs.lastRTPTimeTime = ts
} }