From 2eebf48fca880d7ff1bc3038cc72f4f799e081ca Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 21 Nov 2020 23:54:32 +0100 Subject: [PATCH] implement rtcp sender reports --- connclient.go | 19 +++--- connclientpublish.go | 59 +++++++++++++++--- dialer.go | 6 +- pkg/rtcpreceiver/rtcpreceiver.go | 6 +- pkg/rtcpreceiver/rtcpreceiver_test.go | 16 ++--- pkg/rtcpsender/rtcpsender.go | 90 +++++++++++++++++++++++++++ pkg/rtcpsender/rtcpsender_test.go | 57 +++++++++++++++++ pkg/rtph264/encoder.go | 22 +++---- 8 files changed, 233 insertions(+), 42 deletions(-) create mode 100644 pkg/rtcpsender/rtcpsender.go create mode 100644 pkg/rtcpsender/rtcpsender_test.go diff --git a/connclient.go b/connclient.go index 13529998..fea445a5 100644 --- a/connclient.go +++ b/connclient.go @@ -22,12 +22,14 @@ import ( "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/rtcpreceiver" + "github.com/aler9/gortsplib/pkg/rtcpsender" ) const ( clientReadBufferSize = 4096 clientWriteBufferSize = 4096 clientReceiverReportPeriod = 10 * time.Second + clientSenderReportPeriod = 10 * time.Second clientUDPCheckStreamPeriod = 5 * time.Second clientUDPKeepalivePeriod = 30 * time.Second clientTCPFrameReadBufferSize = 128 * 1024 @@ -73,20 +75,21 @@ type ConnClient struct { streamUrl *base.URL streamProtocol *StreamProtocol tracks Tracks - rtcpReceivers map[int]*rtcpreceiver.RtcpReceiver - udpLastFrameTimes map[int]*int64 udpRtpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener - tcpFrameBuffer *multibuffer.MultiBuffer getParameterSupported bool // read only - readCB func(int, StreamType, []byte) + rtcpReceivers map[int]*rtcpreceiver.RtcpReceiver + udpLastFrameTimes map[int]*int64 + tcpFrameBuffer *multibuffer.MultiBuffer + readCB func(int, StreamType, []byte) // publish only - publishError error - publishMutex sync.RWMutex - publishOpen bool + rtcpSenders map[int]*rtcpsender.RtcpSender + publishError error + publishWriteMutex sync.RWMutex + publishOpen bool // in backgroundTerminate chan struct{} @@ -512,6 +515,8 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S v := time.Now().Unix() c.udpLastFrameTimes[track.Id] = &v } + } else { + c.rtcpSenders[track.Id] = rtcpsender.New() } if proto == StreamProtocolUDP { diff --git a/connclientpublish.go b/connclientpublish.go index 7380e5cd..6d23a09a 100644 --- a/connclientpublish.go +++ b/connclientpublish.go @@ -78,8 +78,8 @@ func (c *ConnClient) backgroundRecordUDP() { defer close(c.backgroundDone) defer func() { - c.publishMutex.Lock() - defer c.publishMutex.Unlock() + c.publishWriteMutex.Lock() + defer c.publishWriteMutex.Unlock() c.publishOpen = false }() @@ -98,6 +98,9 @@ func (c *ConnClient) backgroundRecordUDP() { } }() + reportTicker := time.NewTicker(clientSenderReportPeriod) + defer reportTicker.Stop() + select { case <-c.backgroundTerminate: c.nconn.SetReadDeadline(time.Now()) @@ -105,6 +108,17 @@ func (c *ConnClient) backgroundRecordUDP() { c.publishError = fmt.Errorf("terminated") return + case <-reportTicker.C: + c.publishWriteMutex.Lock() + now := time.Now() + for trackId := range c.rtcpSenders { + report := c.rtcpSenders[trackId].Report(now) + if report != nil { + c.udpRtcpListeners[trackId].write(report) + } + } + c.publishWriteMutex.Unlock() + case err := <-readerDone: c.publishError = err return @@ -115,24 +129,53 @@ func (c *ConnClient) backgroundRecordTCP() { defer close(c.backgroundDone) defer func() { - c.publishMutex.Lock() - defer c.publishMutex.Unlock() + c.publishWriteMutex.Lock() + defer c.publishWriteMutex.Unlock() c.publishOpen = false }() - <-c.backgroundTerminate + reportTicker := time.NewTicker(clientSenderReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-c.backgroundTerminate: + return + + case <-reportTicker.C: + c.publishWriteMutex.Lock() + now := time.Now() + for trackId := range c.rtcpSenders { + report := c.rtcpSenders[trackId].Report(now) + if report != nil { + c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + frame := base.InterleavedFrame{ + TrackId: trackId, + StreamType: StreamTypeRtcp, + Content: report, + } + frame.Write(c.bw) + } + } + c.publishWriteMutex.Unlock() + } + } } // WriteFrame writes a frame. // This can be called only after Record(). func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error { - c.publishMutex.RLock() - defer c.publishMutex.RUnlock() + c.publishWriteMutex.RLock() + defer c.publishWriteMutex.RUnlock() if !c.publishOpen { return c.publishError } + now := time.Now() + + c.rtcpSenders[trackId].OnFrame(now, streamType, content) + if *c.streamProtocol == StreamProtocolUDP { if streamType == StreamTypeRtp { return c.udpRtpListeners[trackId].write(content) @@ -140,7 +183,7 @@ func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []by return c.udpRtcpListeners[trackId].write(content) } - c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + c.nconn.SetWriteDeadline(now.Add(c.d.WriteTimeout)) frame := base.InterleavedFrame{ TrackId: trackId, StreamType: streamType, diff --git a/dialer.go b/dialer.go index 2f42b68e..2e9f3fcf 100644 --- a/dialer.go +++ b/dialer.go @@ -11,6 +11,7 @@ import ( "github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/rtcpreceiver" + "github.com/aler9/gortsplib/pkg/rtcpsender" ) // DefaultDialer is the default dialer, used by Dial, DialRead and DialPublish. @@ -92,11 +93,12 @@ func (d Dialer) Dial(host string) (*ConnClient, error) { nconn: nconn, br: bufio.NewReaderSize(nconn, clientReadBufferSize), bw: bufio.NewWriterSize(nconn, clientWriteBufferSize), - rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), - udpLastFrameTimes: make(map[int]*int64), udpRtpListeners: make(map[int]*connClientUDPListener), udpRtcpListeners: make(map[int]*connClientUDPListener), + rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), + udpLastFrameTimes: make(map[int]*int64), tcpFrameBuffer: multibuffer.New(d.ReadBufferCount, clientTCPFrameReadBufferSize), + rtcpSenders: make(map[int]*rtcpsender.RtcpSender), publishError: fmt.Errorf("not running"), }, nil } diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index caf5b81f..f5508735 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -13,6 +13,7 @@ import ( // RtcpReceiver allows to generate RTCP receiver reports. type RtcpReceiver struct { mutex sync.Mutex + firstRtpReceived bool senderSSRC uint32 receiverSSRC uint32 sequenceNumberCycles uint16 @@ -21,7 +22,6 @@ type RtcpReceiver struct { totalLost uint32 totalLostSinceRR uint32 totalSinceRR uint32 - firstRtpReceived bool } // New allocates a RtcpReceiver. @@ -36,7 +36,7 @@ func New(receiverSSRC *uint32) *RtcpReceiver { } } -// OnFrame processes a RTP or RTCP frame and extract the data needed by RTCP receiver reports. +// OnFrame processes a RTP or RTCP frame and extract the needed data. func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -46,11 +46,13 @@ func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { // extract the sequence number of the first frame sequenceNumber := uint16(buf[2])<<8 | uint16(buf[3]) + // first frame if !rr.firstRtpReceived { rr.firstRtpReceived = true rr.totalSinceRR = 1 rr.lastSequenceNumber = sequenceNumber + // subsequent frames } else { diff := (sequenceNumber - rr.lastSequenceNumber) diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index 7874755a..da60c4b8 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -38,8 +38,6 @@ func TestRtcpReceiverBase(t *testing.T) { byts, _ = rtpPkt.Marshal() rr.OnFrame(base.StreamTypeRtp, byts) - res := rr.Report() - expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -51,7 +49,7 @@ func TestRtcpReceiverBase(t *testing.T) { }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, res) + require.Equal(t, expected, rr.Report()) } func TestRtcpReceiverSequenceOverflow(t *testing.T) { @@ -96,8 +94,6 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { byts, _ = rtpPkt.Marshal() rr.OnFrame(base.StreamTypeRtp, byts) - res := rr.Report() - expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -109,7 +105,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) { }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, res) + require.Equal(t, expected, rr.Report()) } func TestRtcpReceiverPacketLost(t *testing.T) { @@ -154,8 +150,6 @@ func TestRtcpReceiverPacketLost(t *testing.T) { byts, _ = rtpPkt.Marshal() rr.OnFrame(base.StreamTypeRtp, byts) - res := rr.Report() - expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -172,7 +166,7 @@ func TestRtcpReceiverPacketLost(t *testing.T) { }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, res) + require.Equal(t, expected, rr.Report()) } func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { @@ -217,8 +211,6 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { byts, _ = rtpPkt.Marshal() rr.OnFrame(base.StreamTypeRtp, byts) - res := rr.Report() - expectedPkt := rtcp.ReceiverReport{ SSRC: 0x65f83afb, Reports: []rtcp.ReceptionReport{ @@ -235,5 +227,5 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { }, } expected, _ := expectedPkt.Marshal() - require.Equal(t, expected, res) + require.Equal(t, expected, rr.Report()) } diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go new file mode 100644 index 00000000..e2794a66 --- /dev/null +++ b/pkg/rtcpsender/rtcpsender.go @@ -0,0 +1,90 @@ +// Package rtcpsender implements a utility to generate RTCP sender reports. +package rtcpsender + +import ( + "sync" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + + "github.com/aler9/gortsplib/pkg/base" +) + +// RtcpSender allows to generate RTCP sender reports. +type RtcpSender struct { + mutex sync.Mutex + firstRtpReceived bool + secondRtpReceived bool + senderSSRC uint32 + packetCount uint32 + octetCount uint32 + rtpTimeOffset uint32 + rtpTimeTime time.Time + clock float64 +} + +// New allocates a RtcpSender. +func New() *RtcpSender { + return &RtcpSender{} +} + +// OnFrame processes a RTP or RTCP frame and extract the needed data. +func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) { + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if streamType == base.StreamTypeRtp { + pkt := rtp.Packet{} + err := pkt.Unmarshal(buf) + if err == nil { + if !rs.firstRtpReceived { + rs.firstRtpReceived = true + rs.senderSSRC = pkt.SSRC + + // save RTP time offset and correspondent time + rs.rtpTimeOffset = pkt.Timestamp + rs.rtpTimeTime = ts + + } else if !rs.secondRtpReceived && pkt.Timestamp != rs.rtpTimeOffset { + rs.secondRtpReceived = true + + // estimate clock + rs.clock = float64(pkt.Timestamp-rs.rtpTimeOffset) / + ts.Sub(rs.rtpTimeTime).Seconds() + } + + rs.packetCount++ + rs.octetCount += uint32(len(pkt.Payload)) + } + } +} + +// Report generates a RTCP sender report. +func (rs *RtcpSender) Report(ts time.Time) []byte { + rs.mutex.Lock() + defer rs.mutex.Unlock() + + if !rs.firstRtpReceived || !rs.secondRtpReceived { + return nil + } + + report := &rtcp.SenderReport{ + SSRC: rs.senderSSRC, + NTPTime: func() uint64 { + // seconds since 1st January 1900 + n := (float64(ts.UnixNano()) / 1000000000) + 2208988800 + + // higher 32 bits are the integer part, lower 32 bits are the fractional part + integerPart := uint32(n) + fractionalPart := uint32((n - float64(integerPart)) * 0xFFFFFFFF) + return uint64(integerPart)<<32 | uint64(fractionalPart) + }(), + RTPTime: rs.rtpTimeOffset + uint32((ts.Sub(rs.rtpTimeTime)).Seconds()*rs.clock), + PacketCount: rs.packetCount, + OctetCount: rs.octetCount, + } + + byts, _ := report.Marshal() + return byts +} diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtcpsender/rtcpsender_test.go new file mode 100644 index 00000000..d6520557 --- /dev/null +++ b/pkg/rtcpsender/rtcpsender_test.go @@ -0,0 +1,57 @@ +package rtcpsender + +import ( + "testing" + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/pkg/base" +) + +func TestRtcpSender(t *testing.T) { + rs := New() + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 1287987768, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ := rtpPkt.Marshal() + ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC) + rs.OnFrame(ts, base.StreamTypeRtp, byts) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 1287987768 + 45000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + byts, _ = rtpPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 15, 20, 500000000, time.UTC) + rs.OnFrame(ts, base.StreamTypeRtp, byts) + + expectedPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xcbddcc34999997ff, + RTPTime: 0x4d185ae8, + PacketCount: 2, + OctetCount: 4, + } + expected, _ := expectedPkt.Marshal() + ts = time.Date(2008, 05, 20, 22, 16, 20, 600000000, time.UTC) + require.Equal(t, expected, rs.Report(ts)) +} diff --git a/pkg/rtph264/encoder.go b/pkg/rtph264/encoder.go index 5e933a11..23781c9b 100644 --- a/pkg/rtph264/encoder.go +++ b/pkg/rtph264/encoder.go @@ -32,18 +32,18 @@ func NewEncoder(relativeType uint8) (*Encoder, error) { } // Write encodes NALUs into RTP/H264 packets. -func (e *Encoder) Write(nalus [][]byte, timestamp time.Duration) ([][]byte, error) { +func (e *Encoder) Write(nalus [][]byte, ts time.Duration) ([][]byte, error) { if e.started == 0 { - e.started = timestamp + e.started = ts } // rtp/h264 uses a 90khz clock - rtpTs := e.initialTs + uint32((timestamp-e.started).Seconds()*90000) + rtpTime := e.initialTs + uint32((ts-e.started).Seconds()*90000) var frames [][]byte for i, nalu := range nalus { - naluFrames, err := e.writeNalu(nalu, rtpTs, (i == len(nalus)-1)) + naluFrames, err := e.writeNALU(nalu, rtpTime, (i == len(nalus)-1)) if err != nil { return nil, err } @@ -53,23 +53,23 @@ func (e *Encoder) Write(nalus [][]byte, timestamp time.Duration) ([][]byte, erro return frames, nil } -func (e *Encoder) writeNalu(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { +func (e *Encoder) writeNALU(nalu []byte, rtpTime uint32, isFinal bool) ([][]byte, error) { // if the NALU fits into a single RTP packet, use a single NALU payload if len(nalu) < rtpPayloadMaxSize { - return e.writeSingle(nalu, rtpTs, isFinal) + return e.writeSingle(nalu, rtpTime, isFinal) } // otherwise, split the NALU into multiple fragmentation payloads - return e.writeFragmented(nalu, rtpTs, isFinal) + return e.writeFragmented(nalu, rtpTime, isFinal) } -func (e *Encoder) writeSingle(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { +func (e *Encoder) writeSingle(nalu []byte, rtpTime uint32, isFinal bool) ([][]byte, error) { rpkt := rtp.Packet{ Header: rtp.Header{ Version: rtpVersion, PayloadType: e.payloadType, SequenceNumber: e.sequenceNumber, - Timestamp: rtpTs, + Timestamp: rtpTime, SSRC: e.ssrc, }, Payload: nalu, @@ -88,7 +88,7 @@ func (e *Encoder) writeSingle(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte return [][]byte{frame}, nil } -func (e *Encoder) writeFragmented(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { +func (e *Encoder) writeFragmented(nalu []byte, rtpTime uint32, isFinal bool) ([][]byte, error) { // use only FU-A, not FU-B, since we always use non-interleaved mode // (packetization-mode=1) frameCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2) @@ -125,7 +125,7 @@ func (e *Encoder) writeFragmented(nalu []byte, rtpTs uint32, isFinal bool) ([][] Version: rtpVersion, PayloadType: e.payloadType, SequenceNumber: e.sequenceNumber, - Timestamp: rtpTs, + Timestamp: rtpTime, SSRC: e.ssrc, }, Payload: data,