rtcp*: split ProcessFrame into ProcessPacketRTP and ProcessPacketRTCP

This commit is contained in:
aler9
2021-10-30 16:36:58 +02:00
committed by Alessandro Ros
parent 1f5dec4a02
commit 6d340cdf39
10 changed files with 163 additions and 139 deletions

View File

@@ -836,7 +836,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, 0, f.Channel)
rr.ProcessFrame(time.Now(), StreamTypeRTP, f.Payload)
rr.ProcessPacketRTP(time.Now(), f.Payload)
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
@@ -853,7 +853,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
PacketCount: 1,
OctetCount: 4,
}, sr)
rr.ProcessFrame(time.Now(), StreamTypeRTCP, f.Payload)
rr.ProcessPacketRTCP(time.Now(), f.Payload)
err = base.InterleavedFrame{
Channel: 1,

View File

@@ -1872,7 +1872,7 @@ func TestClientReadRTCPReport(t *testing.T) {
Payload: byts,
}.Write(bconn.Writer)
require.NoError(t, err)
rs.ProcessFrame(time.Now(), StreamTypeRTP, byts)
rs.ProcessPacketRTP(time.Now(), byts)
err = base.InterleavedFrame{
Channel: 1,

View File

@@ -617,7 +617,13 @@ func (cc *ClientConn) runBackgroundPlayTCP() error {
now := time.Now()
atomic.StoreInt64(&lastFrameTime, now.Unix())
cc.tracks[trackID].rtcpReceiver.ProcessFrame(now, streamType, frame.Payload)
if streamType == StreamTypeRTP {
cc.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload)
} else {
cc.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload)
}
cc.pullReadCB()(trackID, streamType, frame.Payload)
}
}()
@@ -1683,7 +1689,11 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b
}
if cc.tracks[trackID].rtcpSender != nil {
cc.tracks[trackID].rtcpSender.ProcessFrame(now, streamType, payload)
if streamType == StreamTypeRTP {
cc.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload)
} else {
cc.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload)
}
}
switch *cc.protocol {

View File

@@ -166,7 +166,13 @@ func (l *clientConnUDPListener) run() {
now := time.Now()
atomic.StoreInt64(l.lastFrameTime, now.Unix())
l.cc.tracks[l.trackID].rtcpReceiver.ProcessFrame(now, l.streamType, buf[:n])
if l.streamType == StreamTypeRTP {
l.cc.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n])
} else {
l.cc.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n])
}
l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n])
}
} else { // record

View File

@@ -7,8 +7,6 @@ import (
"time"
"github.com/pion/rtcp"
"github.com/aler9/gortsplib/pkg/base"
)
func randUint32() uint32 {
@@ -53,83 +51,6 @@ func New(receiverSSRC *uint32, clockRate int) *RTCPReceiver {
}
}
// ProcessFrame extracts the needed data from RTP or RTCP packets.
func (rr *RTCPReceiver) ProcessFrame(ts time.Time, streamType base.StreamType, payload []byte) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if streamType == base.StreamTypeRTP {
// do not parse the entire packet, extract only the fields we need
if len(payload) >= 8 {
sequenceNumber := uint16(payload[2])<<8 | uint16(payload[3])
rtpTime := uint32(payload[4])<<24 | uint32(payload[5])<<16 | uint32(payload[6])<<8 | uint32(payload[7])
// first frame
if !rr.firstRTPReceived {
rr.firstRTPReceived = true
rr.totalSinceReport = 1
rr.lastSequenceNumber = sequenceNumber
rr.lastRTPTimeRTP = rtpTime
rr.lastRTPTimeTime = ts
// subsequent frames
} else {
diff := int32(sequenceNumber) - int32(rr.lastSequenceNumber)
// following frame or following frame after an overflow
if diff > 0 || diff < -0x0FFF {
// overflow
if diff < -0x0FFF {
rr.sequenceNumberCycles++
}
// detect lost frames
if sequenceNumber != (rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(uint16(diff) - 1)
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
// allow up to 24 bits
if rr.totalLost > 0xFFFFFF {
rr.totalLost = 0xFFFFFF
}
if rr.totalLostSinceReport > 0xFFFFFF {
rr.totalLostSinceReport = 0xFFFFFF
}
}
// compute jitter
// https://tools.ietf.org/html/rfc3550#page-39
D := ts.Sub(rr.lastRTPTimeTime).Seconds()*rr.clockRate -
(float64(rtpTime) - float64(rr.lastRTPTimeRTP))
if D < 0 {
D = -D
}
rr.jitter += (D - rr.jitter) / 16
rr.totalSinceReport += uint32(uint16(diff))
rr.lastSequenceNumber = sequenceNumber
rr.lastRTPTimeRTP = rtpTime
rr.lastRTPTimeTime = ts
}
// ignore invalid frames (diff = 0) or reordered frames (diff < 0)
}
}
} else {
// we can afford to unmarshal all RTCP packets
// since they are sent with a frequency much lower than the one of RTP packets
frames, err := rtcp.Unmarshal(payload)
if err == nil {
for _, frame := range frames {
if sr, ok := (frame).(*rtcp.SenderReport); ok {
rr.senderSSRC = sr.SSRC
rr.lastSenderReport = uint32(sr.NTPTime >> 16)
rr.lastSenderReportTime = ts
}
}
}
}
}
// Report generates a RTCP receiver report.
func (rr *RTCPReceiver) Report(ts time.Time) []byte {
rr.mutex.Lock()
@@ -165,3 +86,84 @@ func (rr *RTCPReceiver) Report(ts time.Time) []byte {
return byts
}
// ProcessPacketRTP extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacketRTP(ts time.Time, payload []byte) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
// do not parse the entire packet, extract only the fields we need
if len(payload) >= 8 {
sequenceNumber := uint16(payload[2])<<8 | uint16(payload[3])
rtpTime := uint32(payload[4])<<24 | uint32(payload[5])<<16 | uint32(payload[6])<<8 | uint32(payload[7])
// first frame
if !rr.firstRTPReceived {
rr.firstRTPReceived = true
rr.totalSinceReport = 1
rr.lastSequenceNumber = sequenceNumber
rr.lastRTPTimeRTP = rtpTime
rr.lastRTPTimeTime = ts
// subsequent frames
} else {
diff := int32(sequenceNumber) - int32(rr.lastSequenceNumber)
// following frame or following frame after an overflow
if diff > 0 || diff < -0x0FFF {
// overflow
if diff < -0x0FFF {
rr.sequenceNumberCycles++
}
// detect lost frames
if sequenceNumber != (rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(uint16(diff) - 1)
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
// allow up to 24 bits
if rr.totalLost > 0xFFFFFF {
rr.totalLost = 0xFFFFFF
}
if rr.totalLostSinceReport > 0xFFFFFF {
rr.totalLostSinceReport = 0xFFFFFF
}
}
// compute jitter
// https://tools.ietf.org/html/rfc3550#page-39
D := ts.Sub(rr.lastRTPTimeTime).Seconds()*rr.clockRate -
(float64(rtpTime) - float64(rr.lastRTPTimeRTP))
if D < 0 {
D = -D
}
rr.jitter += (D - rr.jitter) / 16
rr.totalSinceReport += uint32(uint16(diff))
rr.lastSequenceNumber = sequenceNumber
rr.lastRTPTimeRTP = rtpTime
rr.lastRTPTimeTime = ts
}
// ignore invalid frames (diff = 0) or reordered frames (diff < 0)
}
}
}
// ProcessPacketRTCP extracts the needed data from RTCP packets.
func (rr *RTCPReceiver) ProcessPacketRTCP(ts time.Time, payload []byte) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
// we can afford to unmarshal all RTCP packets
// since they are sent with a frequency much lower than the one of RTP packets
frames, err := rtcp.Unmarshal(payload)
if err == nil {
for _, frame := range frames {
if sr, ok := (frame).(*rtcp.SenderReport); ok {
rr.senderSSRC = sr.SSRC
rr.lastSenderReport = uint32(sr.NTPTime >> 16)
rr.lastSenderReportTime = ts
}
}
}
}

View File

@@ -7,8 +7,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/pkg/base"
)
func TestRTCPReceiverBase(t *testing.T) {
@@ -24,7 +22,7 @@ func TestRTCPReceiverBase(t *testing.T) {
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTCP, byts)
rr.ProcessPacketRTCP(ts, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -39,7 +37,7 @@ func TestRTCPReceiverBase(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -54,7 +52,7 @@ func TestRTCPReceiverBase(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
@@ -85,7 +83,7 @@ func TestRTCPReceiverOverflow(t *testing.T) {
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTCP, byts)
rr.ProcessPacketRTCP(ts, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -100,7 +98,7 @@ func TestRTCPReceiverOverflow(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -115,7 +113,7 @@ func TestRTCPReceiverOverflow(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
@@ -146,7 +144,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTCP, byts)
rr.ProcessPacketRTCP(ts, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -161,7 +159,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -176,7 +174,7 @@ func TestRTCPReceiverPacketLost(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
@@ -212,7 +210,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTCP, byts)
rr.ProcessPacketRTCP(ts, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -227,7 +225,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -242,7 +240,7 @@ func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
@@ -278,7 +276,7 @@ func TestRTCPReceiverReorderedPackets(t *testing.T) {
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTCP, byts)
rr.ProcessPacketRTCP(ts, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -293,7 +291,7 @@ func TestRTCPReceiverReorderedPackets(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -308,7 +306,7 @@ func TestRTCPReceiverReorderedPackets(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,
@@ -339,7 +337,7 @@ func TestRTCPReceiverJitter(t *testing.T) {
}
byts, _ := srPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTCP, byts)
rr.ProcessPacketRTCP(ts, byts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -354,7 +352,7 @@ func TestRTCPReceiverJitter(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -369,7 +367,7 @@ func TestRTCPReceiverJitter(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
rr.ProcessFrame(ts, base.StreamTypeRTP, byts)
rr.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.ReceiverReport{
SSRC: 0x65f83afb,

View File

@@ -7,8 +7,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/base"
)
// RTCPSender is a utility to generate RTCP sender reports.
@@ -32,32 +30,8 @@ func New(clockRate int) *RTCPSender {
}
}
// ProcessFrame extracts the needed data from RTP or RTCP packets.
func (rs *RTCPSender) ProcessFrame(ts time.Time, streamType base.StreamType, payload []byte) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if streamType == base.StreamTypeRTP {
pkt := rtp.Packet{}
err := pkt.Unmarshal(payload)
if err == nil {
if !rs.firstRTPReceived {
rs.firstRTPReceived = true
rs.senderSSRC = pkt.SSRC
}
// always update time to minimize errors
rs.lastRTPTimeRTP = pkt.Timestamp
rs.lastRTPTimeTime = ts
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))
}
}
}
// Report generates a RTCP sender report.
// It returns nil if no packets has been passed to ProcessFrame yet.
// It returns nil if no packets has been passed to ProcessPacketRTP yet.
func (rs *RTCPSender) Report(ts time.Time) []byte {
rs.mutex.Lock()
defer rs.mutex.Unlock()
@@ -89,3 +63,29 @@ func (rs *RTCPSender) Report(ts time.Time) []byte {
return byts
}
// ProcessPacketRTP extracts the needed data from RTP packets.
func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, payload []byte) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
pkt := rtp.Packet{}
err := pkt.Unmarshal(payload)
if err == nil {
if !rs.firstRTPReceived {
rs.firstRTPReceived = true
rs.senderSSRC = pkt.SSRC
}
// always update time to minimize errors
rs.lastRTPTimeRTP = pkt.Timestamp
rs.lastRTPTimeTime = ts
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))
}
}
// ProcessPacketRTCP extracts the needed data from RTCP packets.
func (rs *RTCPSender) ProcessPacketRTCP(ts time.Time, payload []byte) {
}

View File

@@ -7,8 +7,6 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/pkg/base"
)
func TestRTCPSender(t *testing.T) {
@@ -27,7 +25,7 @@ func TestRTCPSender(t *testing.T) {
}
byts, _ := rtpPkt.Marshal()
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rs.ProcessFrame(ts, base.StreamTypeRTP, byts)
rs.ProcessPacketRTP(ts, byts)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -42,7 +40,7 @@ func TestRTCPSender(t *testing.T) {
}
byts, _ = rtpPkt.Marshal()
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC)
rs.ProcessFrame(ts, base.StreamTypeRTP, byts)
rs.ProcessPacketRTP(ts, byts)
expectedPkt := rtcp.SenderReport{
SSRC: 0xba9da416,

View File

@@ -155,8 +155,13 @@ func (sc *ServerConn) run() {
// forward frame only if it has been set up
if trackID, ok := sc.tcpSession.setuppedTracksByChannel[channel]; ok {
if sc.tcpFrameIsRecording {
sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessFrame(
time.Now(), streamType, frame.Payload)
if streamType == StreamTypeRTP {
sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessPacketRTP(
time.Now(), frame.Payload)
} else {
sc.tcpSession.announcedTracks[trackID].rtcpReceiver.ProcessPacketRTCP(
time.Now(), frame.Payload)
}
}
if streamType == StreamTypeRTP {

View File

@@ -205,7 +205,12 @@ func (u *serverUDPListener) run() {
if clientData.isPublishing {
now := time.Now()
atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix())
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessFrame(now, u.streamType, buf[:n])
if u.streamType == StreamTypeRTP {
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n])
} else {
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n])
}
}
if u.streamType == StreamTypeRTP {