implement rtcp sender reports

This commit is contained in:
aler9
2020-11-21 23:54:32 +01:00
parent a21bced1dd
commit 2eebf48fca
8 changed files with 233 additions and 42 deletions

View File

@@ -22,12 +22,14 @@ import (
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/multibuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtcpsender"
) )
const ( const (
clientReadBufferSize = 4096 clientReadBufferSize = 4096
clientWriteBufferSize = 4096 clientWriteBufferSize = 4096
clientReceiverReportPeriod = 10 * time.Second clientReceiverReportPeriod = 10 * time.Second
clientSenderReportPeriod = 10 * time.Second
clientUDPCheckStreamPeriod = 5 * time.Second clientUDPCheckStreamPeriod = 5 * time.Second
clientUDPKeepalivePeriod = 30 * time.Second clientUDPKeepalivePeriod = 30 * time.Second
clientTCPFrameReadBufferSize = 128 * 1024 clientTCPFrameReadBufferSize = 128 * 1024
@@ -73,19 +75,20 @@ type ConnClient struct {
streamUrl *base.URL streamUrl *base.URL
streamProtocol *StreamProtocol streamProtocol *StreamProtocol
tracks Tracks tracks Tracks
rtcpReceivers map[int]*rtcpreceiver.RtcpReceiver
udpLastFrameTimes map[int]*int64
udpRtpListeners map[int]*connClientUDPListener udpRtpListeners map[int]*connClientUDPListener
udpRtcpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener
tcpFrameBuffer *multibuffer.MultiBuffer
getParameterSupported bool getParameterSupported bool
// read only // read only
rtcpReceivers map[int]*rtcpreceiver.RtcpReceiver
udpLastFrameTimes map[int]*int64
tcpFrameBuffer *multibuffer.MultiBuffer
readCB func(int, StreamType, []byte) readCB func(int, StreamType, []byte)
// publish only // publish only
rtcpSenders map[int]*rtcpsender.RtcpSender
publishError error publishError error
publishMutex sync.RWMutex publishWriteMutex sync.RWMutex
publishOpen bool publishOpen bool
// in // in
@@ -512,6 +515,8 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
v := time.Now().Unix() v := time.Now().Unix()
c.udpLastFrameTimes[track.Id] = &v c.udpLastFrameTimes[track.Id] = &v
} }
} else {
c.rtcpSenders[track.Id] = rtcpsender.New()
} }
if proto == StreamProtocolUDP { if proto == StreamProtocolUDP {

View File

@@ -78,8 +78,8 @@ func (c *ConnClient) backgroundRecordUDP() {
defer close(c.backgroundDone) defer close(c.backgroundDone)
defer func() { defer func() {
c.publishMutex.Lock() c.publishWriteMutex.Lock()
defer c.publishMutex.Unlock() defer c.publishWriteMutex.Unlock()
c.publishOpen = false c.publishOpen = false
}() }()
@@ -98,6 +98,9 @@ func (c *ConnClient) backgroundRecordUDP() {
} }
}() }()
reportTicker := time.NewTicker(clientSenderReportPeriod)
defer reportTicker.Stop()
select { select {
case <-c.backgroundTerminate: case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now()) c.nconn.SetReadDeadline(time.Now())
@@ -105,6 +108,17 @@ func (c *ConnClient) backgroundRecordUDP() {
c.publishError = fmt.Errorf("terminated") c.publishError = fmt.Errorf("terminated")
return return
case <-reportTicker.C:
c.publishWriteMutex.Lock()
now := time.Now()
for trackId := range c.rtcpSenders {
report := c.rtcpSenders[trackId].Report(now)
if report != nil {
c.udpRtcpListeners[trackId].write(report)
}
}
c.publishWriteMutex.Unlock()
case err := <-readerDone: case err := <-readerDone:
c.publishError = err c.publishError = err
return return
@@ -115,24 +129,53 @@ func (c *ConnClient) backgroundRecordTCP() {
defer close(c.backgroundDone) defer close(c.backgroundDone)
defer func() { defer func() {
c.publishMutex.Lock() c.publishWriteMutex.Lock()
defer c.publishMutex.Unlock() defer c.publishWriteMutex.Unlock()
c.publishOpen = false c.publishOpen = false
}() }()
<-c.backgroundTerminate reportTicker := time.NewTicker(clientSenderReportPeriod)
defer reportTicker.Stop()
for {
select {
case <-c.backgroundTerminate:
return
case <-reportTicker.C:
c.publishWriteMutex.Lock()
now := time.Now()
for trackId := range c.rtcpSenders {
report := c.rtcpSenders[trackId].Report(now)
if report != nil {
c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout))
frame := base.InterleavedFrame{
TrackId: trackId,
StreamType: StreamTypeRtcp,
Content: report,
}
frame.Write(c.bw)
}
}
c.publishWriteMutex.Unlock()
}
}
} }
// WriteFrame writes a frame. // WriteFrame writes a frame.
// This can be called only after Record(). // This can be called only after Record().
func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error { func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error {
c.publishMutex.RLock() c.publishWriteMutex.RLock()
defer c.publishMutex.RUnlock() defer c.publishWriteMutex.RUnlock()
if !c.publishOpen { if !c.publishOpen {
return c.publishError return c.publishError
} }
now := time.Now()
c.rtcpSenders[trackId].OnFrame(now, streamType, content)
if *c.streamProtocol == StreamProtocolUDP { if *c.streamProtocol == StreamProtocolUDP {
if streamType == StreamTypeRtp { if streamType == StreamTypeRtp {
return c.udpRtpListeners[trackId].write(content) return c.udpRtpListeners[trackId].write(content)
@@ -140,7 +183,7 @@ func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []by
return c.udpRtcpListeners[trackId].write(content) return c.udpRtcpListeners[trackId].write(content)
} }
c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) c.nconn.SetWriteDeadline(now.Add(c.d.WriteTimeout))
frame := base.InterleavedFrame{ frame := base.InterleavedFrame{
TrackId: trackId, TrackId: trackId,
StreamType: streamType, StreamType: streamType,

View File

@@ -11,6 +11,7 @@ import (
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/multibuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver" "github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtcpsender"
) )
// DefaultDialer is the default dialer, used by Dial, DialRead and DialPublish. // DefaultDialer is the default dialer, used by Dial, DialRead and DialPublish.
@@ -92,11 +93,12 @@ func (d Dialer) Dial(host string) (*ConnClient, error) {
nconn: nconn, nconn: nconn,
br: bufio.NewReaderSize(nconn, clientReadBufferSize), br: bufio.NewReaderSize(nconn, clientReadBufferSize),
bw: bufio.NewWriterSize(nconn, clientWriteBufferSize), bw: bufio.NewWriterSize(nconn, clientWriteBufferSize),
rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver),
udpLastFrameTimes: make(map[int]*int64),
udpRtpListeners: make(map[int]*connClientUDPListener), udpRtpListeners: make(map[int]*connClientUDPListener),
udpRtcpListeners: make(map[int]*connClientUDPListener), udpRtcpListeners: make(map[int]*connClientUDPListener),
rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver),
udpLastFrameTimes: make(map[int]*int64),
tcpFrameBuffer: multibuffer.New(d.ReadBufferCount, clientTCPFrameReadBufferSize), tcpFrameBuffer: multibuffer.New(d.ReadBufferCount, clientTCPFrameReadBufferSize),
rtcpSenders: make(map[int]*rtcpsender.RtcpSender),
publishError: fmt.Errorf("not running"), publishError: fmt.Errorf("not running"),
}, nil }, nil
} }

View File

@@ -13,6 +13,7 @@ import (
// RtcpReceiver allows to generate RTCP receiver reports. // RtcpReceiver allows to generate RTCP receiver reports.
type RtcpReceiver struct { type RtcpReceiver struct {
mutex sync.Mutex mutex sync.Mutex
firstRtpReceived bool
senderSSRC uint32 senderSSRC uint32
receiverSSRC uint32 receiverSSRC uint32
sequenceNumberCycles uint16 sequenceNumberCycles uint16
@@ -21,7 +22,6 @@ type RtcpReceiver struct {
totalLost uint32 totalLost uint32
totalLostSinceRR uint32 totalLostSinceRR uint32
totalSinceRR uint32 totalSinceRR uint32
firstRtpReceived bool
} }
// New allocates a RtcpReceiver. // New allocates a RtcpReceiver.
@@ -36,7 +36,7 @@ func New(receiverSSRC *uint32) *RtcpReceiver {
} }
} }
// OnFrame processes a RTP or RTCP frame and extract the data needed by RTCP receiver reports. // OnFrame processes a RTP or RTCP frame and extract the needed data.
func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) { func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) {
rr.mutex.Lock() rr.mutex.Lock()
defer rr.mutex.Unlock() defer rr.mutex.Unlock()
@@ -46,11 +46,13 @@ func (rr *RtcpReceiver) OnFrame(streamType base.StreamType, buf []byte) {
// extract the sequence number of the first frame // extract the sequence number of the first frame
sequenceNumber := uint16(buf[2])<<8 | uint16(buf[3]) sequenceNumber := uint16(buf[2])<<8 | uint16(buf[3])
// first frame
if !rr.firstRtpReceived { if !rr.firstRtpReceived {
rr.firstRtpReceived = true rr.firstRtpReceived = true
rr.totalSinceRR = 1 rr.totalSinceRR = 1
rr.lastSequenceNumber = sequenceNumber rr.lastSequenceNumber = sequenceNumber
// subsequent frames
} else { } else {
diff := (sequenceNumber - rr.lastSequenceNumber) diff := (sequenceNumber - rr.lastSequenceNumber)

View File

@@ -38,8 +38,6 @@ func TestRtcpReceiverBase(t *testing.T) {
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) rr.OnFrame(base.StreamTypeRtp, byts)
res := rr.Report()
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{ Reports: []rtcp.ReceptionReport{
@@ -51,7 +49,7 @@ func TestRtcpReceiverBase(t *testing.T) {
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, res) require.Equal(t, expected, rr.Report())
} }
func TestRtcpReceiverSequenceOverflow(t *testing.T) { func TestRtcpReceiverSequenceOverflow(t *testing.T) {
@@ -96,8 +94,6 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) rr.OnFrame(base.StreamTypeRtp, byts)
res := rr.Report()
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{ Reports: []rtcp.ReceptionReport{
@@ -109,7 +105,7 @@ func TestRtcpReceiverSequenceOverflow(t *testing.T) {
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, res) require.Equal(t, expected, rr.Report())
} }
func TestRtcpReceiverPacketLost(t *testing.T) { func TestRtcpReceiverPacketLost(t *testing.T) {
@@ -154,8 +150,6 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) rr.OnFrame(base.StreamTypeRtp, byts)
res := rr.Report()
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{ Reports: []rtcp.ReceptionReport{
@@ -172,7 +166,7 @@ func TestRtcpReceiverPacketLost(t *testing.T) {
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, res) require.Equal(t, expected, rr.Report())
} }
func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) { func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
@@ -217,8 +211,6 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
byts, _ = rtpPkt.Marshal() byts, _ = rtpPkt.Marshal()
rr.OnFrame(base.StreamTypeRtp, byts) rr.OnFrame(base.StreamTypeRtp, byts)
res := rr.Report()
expectedPkt := rtcp.ReceiverReport{ expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb, SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{ Reports: []rtcp.ReceptionReport{
@@ -235,5 +227,5 @@ func TestRtcpReceiverSequenceOverflowPacketLost(t *testing.T) {
}, },
} }
expected, _ := expectedPkt.Marshal() expected, _ := expectedPkt.Marshal()
require.Equal(t, expected, res) require.Equal(t, expected, rr.Report())
} }

View File

@@ -0,0 +1,90 @@
// Package rtcpsender implements a utility to generate RTCP sender reports.
package rtcpsender
import (
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/base"
)
// RtcpSender allows to generate RTCP sender reports.
type RtcpSender struct {
mutex sync.Mutex
firstRtpReceived bool
secondRtpReceived bool
senderSSRC uint32
packetCount uint32
octetCount uint32
rtpTimeOffset uint32
rtpTimeTime time.Time
clock float64
}
// New allocates a RtcpSender.
func New() *RtcpSender {
return &RtcpSender{}
}
// OnFrame processes a RTP or RTCP frame and extract the needed data.
func (rs *RtcpSender) OnFrame(ts time.Time, streamType base.StreamType, buf []byte) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if streamType == base.StreamTypeRtp {
pkt := rtp.Packet{}
err := pkt.Unmarshal(buf)
if err == nil {
if !rs.firstRtpReceived {
rs.firstRtpReceived = true
rs.senderSSRC = pkt.SSRC
// save RTP time offset and correspondent time
rs.rtpTimeOffset = pkt.Timestamp
rs.rtpTimeTime = ts
} else if !rs.secondRtpReceived && pkt.Timestamp != rs.rtpTimeOffset {
rs.secondRtpReceived = true
// estimate clock
rs.clock = float64(pkt.Timestamp-rs.rtpTimeOffset) /
ts.Sub(rs.rtpTimeTime).Seconds()
}
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))
}
}
}
// Report generates a RTCP sender report.
func (rs *RtcpSender) Report(ts time.Time) []byte {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if !rs.firstRtpReceived || !rs.secondRtpReceived {
return nil
}
report := &rtcp.SenderReport{
SSRC: rs.senderSSRC,
NTPTime: func() uint64 {
// seconds since 1st January 1900
n := (float64(ts.UnixNano()) / 1000000000) + 2208988800
// higher 32 bits are the integer part, lower 32 bits are the fractional part
integerPart := uint32(n)
fractionalPart := uint32((n - float64(integerPart)) * 0xFFFFFFFF)
return uint64(integerPart)<<32 | uint64(fractionalPart)
}(),
RTPTime: rs.rtpTimeOffset + uint32((ts.Sub(rs.rtpTimeTime)).Seconds()*rs.clock),
PacketCount: rs.packetCount,
OctetCount: rs.octetCount,
}
byts, _ := report.Marshal()
return byts
}

View File

@@ -0,0 +1,57 @@
package rtcpsender
import (
"testing"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/pkg/base"
)
func TestRtcpSender(t *testing.T) {
rs := New()
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ := rtpPkt.Marshal()
ts := time.Date(2008, 05, 20, 22, 15, 20, 0, time.UTC)
rs.OnFrame(ts, base.StreamTypeRtp, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 1287987768 + 45000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 15, 20, 500000000, time.UTC)
rs.OnFrame(ts, base.StreamTypeRtp, byts)
expectedPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xcbddcc34999997ff,
RTPTime: 0x4d185ae8,
PacketCount: 2,
OctetCount: 4,
}
expected, _ := expectedPkt.Marshal()
ts = time.Date(2008, 05, 20, 22, 16, 20, 600000000, time.UTC)
require.Equal(t, expected, rs.Report(ts))
}

View File

@@ -32,18 +32,18 @@ func NewEncoder(relativeType uint8) (*Encoder, error) {
} }
// Write encodes NALUs into RTP/H264 packets. // Write encodes NALUs into RTP/H264 packets.
func (e *Encoder) Write(nalus [][]byte, timestamp time.Duration) ([][]byte, error) { func (e *Encoder) Write(nalus [][]byte, ts time.Duration) ([][]byte, error) {
if e.started == 0 { if e.started == 0 {
e.started = timestamp e.started = ts
} }
// rtp/h264 uses a 90khz clock // rtp/h264 uses a 90khz clock
rtpTs := e.initialTs + uint32((timestamp-e.started).Seconds()*90000) rtpTime := e.initialTs + uint32((ts-e.started).Seconds()*90000)
var frames [][]byte var frames [][]byte
for i, nalu := range nalus { for i, nalu := range nalus {
naluFrames, err := e.writeNalu(nalu, rtpTs, (i == len(nalus)-1)) naluFrames, err := e.writeNALU(nalu, rtpTime, (i == len(nalus)-1))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -53,23 +53,23 @@ func (e *Encoder) Write(nalus [][]byte, timestamp time.Duration) ([][]byte, erro
return frames, nil return frames, nil
} }
func (e *Encoder) writeNalu(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { func (e *Encoder) writeNALU(nalu []byte, rtpTime uint32, isFinal bool) ([][]byte, error) {
// if the NALU fits into a single RTP packet, use a single NALU payload // if the NALU fits into a single RTP packet, use a single NALU payload
if len(nalu) < rtpPayloadMaxSize { if len(nalu) < rtpPayloadMaxSize {
return e.writeSingle(nalu, rtpTs, isFinal) return e.writeSingle(nalu, rtpTime, isFinal)
} }
// otherwise, split the NALU into multiple fragmentation payloads // otherwise, split the NALU into multiple fragmentation payloads
return e.writeFragmented(nalu, rtpTs, isFinal) return e.writeFragmented(nalu, rtpTime, isFinal)
} }
func (e *Encoder) writeSingle(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { func (e *Encoder) writeSingle(nalu []byte, rtpTime uint32, isFinal bool) ([][]byte, error) {
rpkt := rtp.Packet{ rpkt := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
SequenceNumber: e.sequenceNumber, SequenceNumber: e.sequenceNumber,
Timestamp: rtpTs, Timestamp: rtpTime,
SSRC: e.ssrc, SSRC: e.ssrc,
}, },
Payload: nalu, Payload: nalu,
@@ -88,7 +88,7 @@ func (e *Encoder) writeSingle(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte
return [][]byte{frame}, nil return [][]byte{frame}, nil
} }
func (e *Encoder) writeFragmented(nalu []byte, rtpTs uint32, isFinal bool) ([][]byte, error) { func (e *Encoder) writeFragmented(nalu []byte, rtpTime uint32, isFinal bool) ([][]byte, error) {
// use only FU-A, not FU-B, since we always use non-interleaved mode // use only FU-A, not FU-B, since we always use non-interleaved mode
// (packetization-mode=1) // (packetization-mode=1)
frameCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2) frameCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2)
@@ -125,7 +125,7 @@ func (e *Encoder) writeFragmented(nalu []byte, rtpTs uint32, isFinal bool) ([][]
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
SequenceNumber: e.sequenceNumber, SequenceNumber: e.sequenceNumber,
Timestamp: rtpTs, Timestamp: rtpTime,
SSRC: e.ssrc, SSRC: e.ssrc,
}, },
Payload: data, Payload: data,