preserve NTP when generating sender reports

This commit is contained in:
aler9
2023-08-14 23:23:14 +02:00
parent 939c065f6f
commit 4ad57d6a75
9 changed files with 93 additions and 81 deletions

View File

@@ -1651,12 +1651,13 @@ func (c *Client) OnPacketRTCP(medi *media.Media, cb OnPacketRTCPFunc) {
cm.onPacketRTCP = cb
}
// WritePacketRTP writes a RTP packet to the media stream.
// WritePacketRTP writes a RTP packet to the server.
func (c *Client) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error {
return c.WritePacketRTPWithNTP(medi, pkt, time.Now())
}
// WritePacketRTPWithNTP writes a RTP packet to the media stream.
// WritePacketRTPWithNTP writes a RTP packet to the server.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (c *Client) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, c.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
@@ -1677,7 +1678,7 @@ func (c *Client) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp t
return nil
}
// WritePacketRTCP writes a RTCP packet to the media stream.
// WritePacketRTCP writes a RTCP packet to the server.
func (c *Client) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {

View File

@@ -53,19 +53,15 @@ func (ct *clientFormat) start() {
} else {
ct.rtcpSender = rtcpsender.New(
ct.format.ClockRate(),
ct.cm.c.senderReportPeriod,
func(pkt rtcp.Packet) {
ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck
if !ct.cm.c.DisableRTCPSenderReports {
ct.cm.c.WritePacketRTCP(ct.cm.media, pkt) //nolint:errcheck
}
})
}
}
// start writing after write*() has been allocated in order to avoid a crash
func (ct *clientFormat) startWriting() {
if ct.cm.c.state != clientStatePlay && !ct.cm.c.DisableRTCPSenderReports {
ct.rtcpSender.Start(ct.cm.c.senderReportPeriod)
}
}
func (ct *clientFormat) stop() {
if ct.udpRTCPReceiver != nil {
ct.udpRTCPReceiver.Close()

View File

@@ -129,10 +129,6 @@ func (cm *clientMedia) start() {
cm.udpRTPListener.start()
cm.udpRTCPListener.start()
}
for _, ct := range cm.formats {
ct.startWriting()
}
}
func (cm *clientMedia) stop() {

View File

@@ -19,7 +19,7 @@ func randUint32() (uint32, error) {
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
var now = time.Now
var timeNow = time.Now
// RTCPReceiver is a utility to generate RTCP receiver reports.
type RTCPReceiver struct {
@@ -95,7 +95,7 @@ func (rr *RTCPReceiver) run() {
for {
select {
case <-t.C:
report := rr.report(now())
report := rr.report(timeNow())
if report != nil {
rr.writePacketRTCP(report)
}

View File

@@ -10,7 +10,7 @@ import (
)
func TestRTCPReceiverBase(t *testing.T) {
now = func() time.Time {
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
}
done := make(chan struct{})
@@ -77,7 +77,7 @@ func TestRTCPReceiverBase(t *testing.T) {
func TestRTCPReceiverOverflow(t *testing.T) {
done := make(chan struct{})
now = func() time.Time {
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
@@ -144,7 +144,7 @@ func TestRTCPReceiverOverflow(t *testing.T) {
func TestRTCPReceiverPacketLost(t *testing.T) {
done := make(chan struct{})
now = func() time.Time {
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
@@ -214,7 +214,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
done := make(chan struct{})
now = func() time.Time {
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
}
v := uint32(0x65f83afb)
@@ -284,7 +284,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
func TestRTCPReceiverJitter(t *testing.T) {
done := make(chan struct{})
now = func() time.Time {
timeNow = func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
}
v := uint32(0x65f83afb)

View File

@@ -9,20 +9,27 @@ import (
"github.com/pion/rtp"
)
var now = time.Now
var timeNow = time.Now
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeGoToRTCP(v time.Time) uint64 {
s := uint64(v.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}
// RTCPSender is a utility to generate RTCP sender reports.
type RTCPSender struct {
clockRate float64
period time.Duration
writePacketRTCP func(rtcp.Packet)
mutex sync.Mutex
started bool
period time.Duration
// data from RTP packets
initialized bool
lastTimeRTP uint32
lastTimeNTP time.Time
lastTimeSystem time.Time
lastSSRC uint32
lastSequenceNumber uint16
packetCount uint32
@@ -35,31 +42,26 @@ type RTCPSender struct {
// New allocates a RTCPSender.
func New(
clockRate int,
period time.Duration,
writePacketRTCP func(rtcp.Packet),
) *RTCPSender {
rs := &RTCPSender{
clockRate: float64(clockRate),
period: period,
writePacketRTCP: writePacketRTCP,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go rs.run()
return rs
}
// Close closes the RTCPSender.
func (rs *RTCPSender) Close() {
if rs.started {
close(rs.terminate)
<-rs.done
}
}
// Start starts the periodic generation of RTCP sender reports.
func (rs *RTCPSender) Start(period time.Duration) {
rs.started = true
rs.period = period
go rs.run()
close(rs.terminate)
<-rs.done
}
func (rs *RTCPSender) run() {
@@ -71,7 +73,7 @@ func (rs *RTCPSender) run() {
for {
select {
case <-t.C:
report := rs.report(now())
report := rs.report()
if report != nil {
rs.writePacketRTCP(report)
}
@@ -82,29 +84,28 @@ func (rs *RTCPSender) run() {
}
}
func (rs *RTCPSender) report(ts time.Time) rtcp.Packet {
func (rs *RTCPSender) report() rtcp.Packet {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if !rs.initialized || rs.clockRate == 0 {
if !rs.initialized {
return nil
}
systemTimeDiff := timeNow().Sub(rs.lastTimeSystem)
ntpTime := rs.lastTimeNTP.Add(systemTimeDiff)
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate)
return &rtcp.SenderReport{
SSRC: rs.lastSSRC,
NTPTime: func() uint64 {
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
s := uint64(ts.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}(),
RTPTime: rs.lastTimeRTP + uint32((ts.Sub(rs.lastTimeNTP)).Seconds()*rs.clockRate),
SSRC: rs.lastSSRC,
NTPTime: ntpTimeGoToRTCP(ntpTime),
RTPTime: rtpTime,
PacketCount: rs.packetCount,
OctetCount: rs.octetCount,
}
}
// ProcessPacket extracts the needed data from RTP packets.
// ProcessPacket extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
@@ -113,6 +114,7 @@ func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS
rs.initialized = true
rs.lastTimeRTP = pkt.Timestamp
rs.lastTimeNTP = ntp
rs.lastTimeSystem = timeNow()
}
rs.lastSSRC = pkt.SSRC

View File

@@ -1,6 +1,7 @@
package rtcpsender
import (
"sync"
"testing"
"time"
@@ -10,26 +11,45 @@ import (
)
func TestRTCPSender(t *testing.T) {
now = func() time.Time {
return time.Date(2008, 5, 20, 22, 16, 20, 600000000, time.UTC)
}
done := make(chan struct{})
var curTime time.Time
var mutex sync.Mutex
rs := New(90000, func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 14690122083862791680,
RTPTime: 0x4d185ae8,
PacketCount: 3,
OctetCount: 6,
}, pkt)
close(done)
})
setCurTime := func(v time.Time) {
mutex.Lock()
defer mutex.Unlock()
curTime = v
}
timeNow = func() time.Time {
mutex.Lock()
defer mutex.Unlock()
return curTime
}
sent := make(chan struct{})
rs := New(
90000,
100*time.Millisecond,
func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: func() uint64 {
// timeDiff = (24 - 22) = 2
// 21 + 2 = 23
d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC)
s := uint64(d.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}(),
RTPTime: 1287987768 + 2*90000,
PacketCount: 3,
OctetCount: 6,
}, pkt)
close(sent)
})
defer rs.Close()
rs.Start(250 * time.Millisecond)
time.Sleep(400 * time.Millisecond)
setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC))
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
@@ -44,18 +64,19 @@ func TestRTCPSender(t *testing.T) {
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC))
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 1287987768 + 45000,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC)
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
rtpPkt = rtp.Packet{
@@ -64,13 +85,15 @@ func TestRTCPSender(t *testing.T) {
Marker: true,
PayloadType: 96,
SequenceNumber: 948,
Timestamp: 1287987768 + 90000,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC)
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, false)
<-done
setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC))
<-sent
}

View File

@@ -43,14 +43,6 @@ func NewServerStream(s *Server, medias media.Medias) *ServerStream {
st.streamMedias[medi] = newServerStreamMedia(st, medi, i)
}
if !st.s.DisableRTCPSenderReports {
for _, ssm := range st.streamMedias {
for _, tr := range ssm.formats {
tr.rtcpSender.Start(st.s.senderReportPeriod)
}
}
}
return st
}
@@ -248,8 +240,7 @@ func (st *ServerStream) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error
}
// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.
// ntp is the absolute time of the packet, and is needed to generate RTCP sender reports
// that allows the receiver to reconstruct the absolute time of the packet.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (st *ServerStream) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, st.s.MaxPacketSize)
n, err := pkt.MarshalTo(byts)

View File

@@ -34,8 +34,11 @@ func newServerStreamMedia(st *ServerStream, medi *media.Media, trackID int) *ser
cmedia := medi
tr.rtcpSender = rtcpsender.New(
forma.ClockRate(),
st.s.senderReportPeriod,
func(pkt rtcp.Packet) {
st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck
if !st.s.DisableRTCPSenderReports {
st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck
}
},
)