unexport RTCPSender, RTCPReceiver, LossDetector, Reorderer (#667)

This commit is contained in:
Alessandro Ros
2024-12-24 21:21:11 +01:00
committed by GitHub
parent 93b012b3ca
commit 8c4a3ca018
16 changed files with 1186 additions and 505 deletions

View File

@@ -4,12 +4,12 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer"
)
type clientFormat struct {
@@ -25,33 +25,36 @@ type clientFormat struct {
func (cf *clientFormat) start() {
if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel {
cf.rtcpSender = rtcpsender.New(
cf.format.ClockRate(),
cf.cm.c.senderReportPeriod,
cf.cm.c.timeNow,
func(pkt rtcp.Packet) {
cf.rtcpSender = &rtcpsender.RTCPSender{
ClockRate: cf.format.ClockRate(),
Period: cf.cm.c.senderReportPeriod,
TimeNow: cf.cm.c.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
if !cf.cm.c.DisableRTCPSenderReports {
cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck
}
})
},
}
cf.rtcpSender.Initialize()
} else {
if cf.cm.udpRTPListener != nil {
cf.udpReorderer = rtpreorderer.New()
cf.udpReorderer = &rtpreorderer.Reorderer{}
cf.udpReorderer.Initialize()
} else {
cf.tcpLossDetector = rtplossdetector.New()
cf.tcpLossDetector = &rtplossdetector.LossDetector{}
}
var err error
cf.rtcpReceiver, err = rtcpreceiver.New(
cf.format.ClockRate(),
nil,
cf.cm.c.receiverReportPeriod,
cf.cm.c.timeNow,
func(pkt rtcp.Packet) {
cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{
ClockRate: cf.format.ClockRate(),
Period: cf.cm.c.receiverReportPeriod,
TimeNow: cf.cm.c.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
if cf.cm.udpRTPListener != nil {
cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck
}
})
},
}
err := cf.rtcpReceiver.Initialize()
if err != nil {
panic(err)
}

View File

@@ -0,0 +1,252 @@
// Package rtcpreceiver contains a utility to generate RTCP receiver reports.
package rtcpreceiver
import (
"crypto/rand"
"fmt"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeRTCPToGo(v uint64) time.Time {
nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000
return time.Unix(0, nano)
}
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
// RTCPReceiver is a utility to generate RTCP receiver reports.
type RTCPReceiver struct {
ClockRate int
ReceiverSSRC *uint32
Period time.Duration
TimeNow func() time.Time
WritePacketRTCP func(rtcp.Packet)
mutex sync.RWMutex
// data from RTP packets
firstRTPPacketReceived bool
timeInitialized bool
sequenceNumberCycles uint16
lastSequenceNumber uint16
senderSSRC uint32
lastTimeRTP uint32
lastTimeSystem time.Time
totalLost uint32
totalLostSinceReport uint32
totalSinceReport uint32
jitter float64
// data from RTCP packets
firstSenderReportReceived bool
lastSenderReportTimeNTP uint64
lastSenderReportTimeRTP uint32
lastSenderReportTimeSystem time.Time
terminate chan struct{}
done chan struct{}
}
// Initialize initializes RTCPReceiver.
func (rr *RTCPReceiver) Initialize() error {
if rr.ReceiverSSRC == nil {
v, err := randUint32()
if err != nil {
return err
}
rr.ReceiverSSRC = &v
}
if rr.TimeNow == nil {
rr.TimeNow = time.Now
}
rr.terminate = make(chan struct{})
rr.done = make(chan struct{})
go rr.run()
return nil
}
// Close closes the RTCPReceiver.
func (rr *RTCPReceiver) Close() {
close(rr.terminate)
<-rr.done
}
func (rr *RTCPReceiver) run() {
defer close(rr.done)
t := time.NewTicker(rr.Period)
defer t.Stop()
for {
select {
case <-t.C:
report := rr.report()
if report != nil {
rr.WritePacketRTCP(report)
}
case <-rr.terminate:
return
}
}
}
func (rr *RTCPReceiver) report() rtcp.Packet {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if !rr.firstRTPPacketReceived {
return nil
}
system := rr.TimeNow()
report := &rtcp.ReceiverReport{
SSRC: *rr.ReceiverSSRC,
Reports: []rtcp.ReceptionReport{
{
SSRC: rr.senderSSRC,
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber),
// equivalent to taking the integer part after multiplying the
// loss fraction by 256
FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)),
TotalLost: rr.totalLost,
Jitter: uint32(rr.jitter),
},
},
}
if rr.firstSenderReportReceived {
// middle 32 bits out of 64 in the NTP timestamp of last sender report
report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16)
// delay, expressed in units of 1/65536 seconds, between
// receiving the last SR packet from source SSRC_n and sending this
// reception report block
report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536)
}
rr.totalLostSinceReport = 0
rr.totalSinceReport = 0
return report
}
// ProcessPacket extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
rr.mutex.Lock()
defer rr.mutex.Unlock()
// first packet
if !rr.firstRTPPacketReceived {
rr.firstRTPPacketReceived = true
rr.totalSinceReport = 1
rr.lastSequenceNumber = pkt.SequenceNumber
rr.senderSSRC = pkt.SSRC
if ptsEqualsDTS {
rr.timeInitialized = true
rr.lastTimeRTP = pkt.Timestamp
rr.lastTimeSystem = system
}
// subsequent packets
} else {
if pkt.SSRC != rr.senderSSRC {
return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.senderSSRC)
}
diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber)
// overflow
if diff < -0x0FFF {
rr.sequenceNumberCycles++
}
// detect lost packets
if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(uint16(diff) - 1)
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
// allow up to 24 bits
if rr.totalLost > 0xFFFFFF {
rr.totalLost = 0xFFFFFF
}
if rr.totalLostSinceReport > 0xFFFFFF {
rr.totalLostSinceReport = 0xFFFFFF
}
}
rr.totalSinceReport += uint32(uint16(diff))
rr.lastSequenceNumber = pkt.SequenceNumber
if ptsEqualsDTS {
if rr.timeInitialized {
// update jitter
// https://tools.ietf.org/html/rfc3550#page-39
D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) -
(float64(pkt.Timestamp) - float64(rr.lastTimeRTP))
if D < 0 {
D = -D
}
rr.jitter += (D - rr.jitter) / 16
}
rr.timeInitialized = true
rr.lastTimeRTP = pkt.Timestamp
rr.lastTimeSystem = system
}
}
return nil
}
// ProcessSenderReport extracts the needed data from RTCP sender reports.
func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
rr.firstSenderReportReceived = true
rr.lastSenderReportTimeNTP = sr.NTPTime
rr.lastSenderReportTimeRTP = sr.RTPTime
rr.lastSenderReportTimeSystem = system
}
// PacketNTP returns the NTP timestamp of the packet.
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if !rr.firstSenderReportReceived {
return time.Time{}, false
}
timeDiff := int32(ts - rr.lastSenderReportTimeRTP)
timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate)
return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) {
rr.mutex.RLock()
defer rr.mutex.RUnlock()
return rr.senderSSRC, rr.firstRTPPacketReceived
}

View File

@@ -0,0 +1,402 @@
package rtcpreceiver
import (
"testing"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func uint32Ptr(v uint32) *uint32 {
return &v
}
func TestRTCPReceiverBase(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 947,
LastSenderReport: 0x887a17ce,
Delay: 2 * 65536,
},
},
}, pkt)
close(done)
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 0xafb45733,
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 0xafb45733 + 90000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
}
func TestRTCPReceiverOverflow(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 250 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 1 << 16,
LastSenderReport: 0x887a17ce,
Delay: 1 * 65536,
},
},
}, pkt)
close(done)
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
time.Sleep(400 * time.Millisecond)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 1287981738,
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0xffff,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0000,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
}
func TestRTCPReceiverPacketLost(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 0x0122,
LastSenderReport: 0x887a17ce,
FractionLost: func() uint8 {
v := float64(1) / 3
return uint8(v * 256)
}(),
TotalLost: 1,
Delay: 1 * 65536,
},
},
}, pkt)
close(done)
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 1287981738,
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0120,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0122,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
}
func TestRTCPReceiverOverflowPacketLost(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 1<<16 | 0x0002,
LastSenderReport: 0x887a17ce,
FractionLost: func() uint8 {
v := float64(2) / 4
return uint8(v * 256)
}(),
TotalLost: 2,
Delay: 1 * 65536,
},
},
}, pkt)
close(done)
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 1287981738,
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0xffff,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x0002,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
}
func TestRTCPReceiverJitter(t *testing.T) {
done := make(chan struct{})
rr := &RTCPReceiver{
ClockRate: 90000,
ReceiverSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 948,
LastSenderReport: 0x887a17ce,
Delay: 2 * 65536,
Jitter: 45000 / 16,
},
},
}, pkt)
close(done)
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
RTPTime: 0xafb45733,
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 0xafb45733 + 45000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 948,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, false)
require.NoError(t, err)
<-done
}

View File

@@ -0,0 +1,132 @@
// Package rtcpsender contains a utility to generate RTCP sender reports.
package rtcpsender
import (
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeGoToRTCP(v time.Time) uint64 {
s := uint64(v.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}
// RTCPSender is a utility to generate RTCP sender reports.
type RTCPSender struct {
ClockRate int
Period time.Duration
TimeNow func() time.Time
WritePacketRTCP func(rtcp.Packet)
mutex sync.RWMutex
// data from RTP packets
initialized bool
lastTimeRTP uint32
lastTimeNTP time.Time
lastTimeSystem time.Time
senderSSRC uint32
lastSequenceNumber uint16
packetCount uint32
octetCount uint32
terminate chan struct{}
done chan struct{}
}
// Initialize initializes a RTCPSender.
func (rs *RTCPSender) Initialize() {
if rs.TimeNow == nil {
rs.TimeNow = time.Now
}
rs.terminate = make(chan struct{})
rs.done = make(chan struct{})
go rs.run()
}
// Close closes the RTCPSender.
func (rs *RTCPSender) Close() {
close(rs.terminate)
<-rs.done
}
func (rs *RTCPSender) run() {
defer close(rs.done)
t := time.NewTicker(rs.Period)
defer t.Stop()
for {
select {
case <-t.C:
report := rs.report()
if report != nil {
rs.WritePacketRTCP(report)
}
case <-rs.terminate:
return
}
}
}
func (rs *RTCPSender) report() rtcp.Packet {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if !rs.initialized {
return nil
}
systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem)
ntpTime := rs.lastTimeNTP.Add(systemTimeDiff)
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate))
return &rtcp.SenderReport{
SSRC: rs.senderSSRC,
NTPTime: ntpTimeGoToRTCP(ntpTime),
RTPTime: rtpTime,
PacketCount: rs.packetCount,
OctetCount: rs.octetCount,
}
}
// ProcessPacket extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if ptsEqualsDTS {
rs.initialized = true
rs.lastTimeRTP = pkt.Timestamp
rs.lastTimeNTP = ntp
rs.lastTimeSystem = rs.TimeNow()
rs.senderSSRC = pkt.SSRC
}
rs.lastSequenceNumber = pkt.SequenceNumber
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rs *RTCPSender) SenderSSRC() (uint32, bool) {
rs.mutex.RLock()
defer rs.mutex.RUnlock()
return rs.senderSSRC, rs.initialized
}
// LastPacketData returns metadata of the last RTP packet.
func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) {
rs.mutex.RLock()
defer rs.mutex.RUnlock()
return rs.lastSequenceNumber, rs.lastTimeRTP, rs.lastTimeNTP, rs.initialized
}

View File

@@ -0,0 +1,100 @@
package rtcpsender
import (
"sync"
"testing"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestRTCPSender(t *testing.T) {
var curTime time.Time
var mutex sync.Mutex
setCurTime := func(v time.Time) {
mutex.Lock()
defer mutex.Unlock()
curTime = v
}
sent := make(chan struct{})
rs := &RTCPSender{
ClockRate: 90000,
Period: 100 * time.Millisecond,
TimeNow: func() time.Time {
mutex.Lock()
defer mutex.Unlock()
return curTime
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: func() uint64 {
// timeDiff = (24 - 22) = 2
// 21 + 2 = 23
d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC)
s := uint64(d.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}(),
RTPTime: 1287987768 + 2*90000,
PacketCount: 3,
OctetCount: 6,
}, pkt)
close(sent)
},
}
rs.Initialize()
defer rs.Close()
setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC))
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC))
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 948,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, false)
setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC))
<-sent
}

View File

@@ -0,0 +1,31 @@
// Package rtplossdetector implements an algorithm that detects lost packets.
package rtplossdetector
import (
"github.com/pion/rtp"
)
// LossDetector detects lost packets.
type LossDetector struct {
initialized bool
expectedSeqNum uint16
}
// Process processes a RTP packet.
// It returns the number of lost packets.
func (r *LossDetector) Process(pkt *rtp.Packet) int {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return 0
}
if pkt.SequenceNumber != r.expectedSeqNum {
diff := pkt.SequenceNumber - r.expectedSeqNum
r.expectedSeqNum = pkt.SequenceNumber + 1
return int(diff)
}
r.expectedSeqNum = pkt.SequenceNumber + 1
return 0
}

View File

@@ -0,0 +1,33 @@
package rtplossdetector
import (
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestLossDetector(t *testing.T) {
d := &LossDetector{}
c := d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65530,
},
})
require.Equal(t, 0, c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65531,
},
})
require.Equal(t, 0, c)
c = d.Process(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 65535,
},
})
require.Equal(t, 3, c)
}

View File

@@ -0,0 +1,133 @@
// Package rtpreorderer implements a filter to reorder incoming RTP packets.
package rtpreorderer
import (
"github.com/pion/rtp"
)
const (
bufferSize = 64
)
// Reorderer filters incoming RTP packets, in order to
// - order packets
// - remove duplicate packets
type Reorderer struct {
initialized bool
expectedSeqNum uint16
buffer []*rtp.Packet
absPos uint16
negativeCount int
}
// Initialize initializes a Reorderer.
func (r *Reorderer) Initialize() {
r.buffer = make([]*rtp.Packet, bufferSize)
}
// Process processes a RTP packet.
// It returns a sequence of ordered packets and the number of lost packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}, 0
}
relPos := int16(pkt.SequenceNumber - r.expectedSeqNum)
// packet is a duplicate or has been sent
// before the first packet processed by Reorderer.
// discard.
if relPos < 0 {
r.negativeCount++
// stream has been resetted, therefore reset reorderer too
if r.negativeCount > bufferSize {
r.negativeCount = 0
// clear buffer
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
r.buffer[p] = nil
}
// reset position
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}, 0
}
return nil, 0
}
r.negativeCount = 0
// there's a missing packet and buffer is full.
// return entire buffer and clear it.
if relPos >= bufferSize {
n := 1
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
if r.buffer[p] != nil {
n++
}
}
ret := make([]*rtp.Packet, n)
pos := 0
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
if r.buffer[p] != nil {
ret[pos], r.buffer[p] = r.buffer[p], nil
pos++
}
}
ret[pos] = pkt
r.expectedSeqNum = pkt.SequenceNumber + 1
return ret, int(relPos) - n + 1
}
// there's a missing packet
if relPos != 0 {
p := (r.absPos + uint16(relPos)) & (bufferSize - 1)
// current packet is a duplicate. discard
if r.buffer[p] != nil {
return nil, 0
}
// put current packet in buffer
r.buffer[p] = pkt
return nil, 0
}
// all packets have been received correctly.
// return them
n := uint16(1)
for {
p := (r.absPos + n) & (bufferSize - 1)
if r.buffer[p] == nil {
break
}
n++
}
ret := make([]*rtp.Packet, n)
ret[0] = pkt
r.absPos++
r.absPos &= (bufferSize - 1)
for i := uint16(1); i < n; i++ {
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
r.absPos++
r.absPos &= (bufferSize - 1)
}
r.expectedSeqNum = pkt.SequenceNumber + n
return ret, 0
}

View File

@@ -157,7 +157,8 @@ func TestReorder(t *testing.T) {
},
}
r := New()
r := &Reorderer{}
r.Initialize()
r.absPos = 40
for _, entry := range sequence {
@@ -168,7 +169,8 @@ func TestReorder(t *testing.T) {
}
func TestBufferIsFull(t *testing.T) {
r := New()
r := &Reorderer{}
r.Initialize()
r.absPos = 25
sn := uint16(1564)
toMiss := 34
@@ -222,7 +224,8 @@ func TestBufferIsFull(t *testing.T) {
}
func TestReset(t *testing.T) {
r := New()
r := &Reorderer{}
r.Initialize()
sn := uint16(1234)
r.Process(&rtp.Packet{

View File

@@ -2,62 +2,17 @@
package rtcpreceiver
import (
"crypto/rand"
"fmt"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeRTCPToGo(v uint64) time.Time {
nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000
return time.Unix(0, nano)
}
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
// RTCPReceiver is a utility to generate RTCP receiver reports.
type RTCPReceiver struct {
clockRate float64
receiverSSRC uint32
period time.Duration
timeNow func() time.Time
writePacketRTCP func(rtcp.Packet)
mutex sync.RWMutex
// data from RTP packets
firstRTPPacketReceived bool
timeInitialized bool
sequenceNumberCycles uint16
lastSequenceNumber uint16
senderSSRC uint32
lastTimeRTP uint32
lastTimeSystem time.Time
totalLost uint32
totalLostSinceReport uint32
totalSinceReport uint32
jitter float64
// data from RTCP packets
firstSenderReportReceived bool
lastSenderReportTimeNTP uint64
lastSenderReportTimeRTP uint32
lastSenderReportTimeSystem time.Time
terminate chan struct{}
done chan struct{}
}
//
// Deprecated: will be removed in the next version.
type RTCPReceiver rtcpreceiver.RTCPReceiver
// New allocates a RTCPReceiver.
func New(
@@ -67,198 +22,42 @@ func New(
timeNow func() time.Time,
writePacketRTCP func(rtcp.Packet),
) (*RTCPReceiver, error) {
if receiverSSRC == nil {
v, err := randUint32()
if err != nil {
return nil, err
}
receiverSSRC = &v
rr := &rtcpreceiver.RTCPReceiver{
ClockRate: clockRate,
ReceiverSSRC: receiverSSRC,
Period: period,
TimeNow: timeNow,
WritePacketRTCP: writePacketRTCP,
}
err := rr.Initialize()
if err != nil {
return nil, err
}
if timeNow == nil {
timeNow = time.Now
}
rr := &RTCPReceiver{
clockRate: float64(clockRate),
receiverSSRC: *receiverSSRC,
period: period,
timeNow: timeNow,
writePacketRTCP: writePacketRTCP,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go rr.run()
return rr, nil
return (*RTCPReceiver)(rr), nil
}
// Close closes the RTCPReceiver.
func (rr *RTCPReceiver) Close() {
close(rr.terminate)
<-rr.done
}
func (rr *RTCPReceiver) run() {
defer close(rr.done)
t := time.NewTicker(rr.period)
defer t.Stop()
for {
select {
case <-t.C:
report := rr.report()
if report != nil {
rr.writePacketRTCP(report)
}
case <-rr.terminate:
return
}
}
}
func (rr *RTCPReceiver) report() rtcp.Packet {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if !rr.firstRTPPacketReceived {
return nil
}
system := rr.timeNow()
report := &rtcp.ReceiverReport{
SSRC: rr.receiverSSRC,
Reports: []rtcp.ReceptionReport{
{
SSRC: rr.senderSSRC,
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber),
// equivalent to taking the integer part after multiplying the
// loss fraction by 256
FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)),
TotalLost: rr.totalLost,
Jitter: uint32(rr.jitter),
},
},
}
if rr.firstSenderReportReceived {
// middle 32 bits out of 64 in the NTP timestamp of last sender report
report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16)
// delay, expressed in units of 1/65536 seconds, between
// receiving the last SR packet from source SSRC_n and sending this
// reception report block
report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536)
}
rr.totalLostSinceReport = 0
rr.totalSinceReport = 0
return report
(*rtcpreceiver.RTCPReceiver)(rr).Close()
}
// ProcessPacket extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
rr.mutex.Lock()
defer rr.mutex.Unlock()
// first packet
if !rr.firstRTPPacketReceived {
rr.firstRTPPacketReceived = true
rr.totalSinceReport = 1
rr.lastSequenceNumber = pkt.SequenceNumber
rr.senderSSRC = pkt.SSRC
if ptsEqualsDTS {
rr.timeInitialized = true
rr.lastTimeRTP = pkt.Timestamp
rr.lastTimeSystem = system
}
// subsequent packets
} else {
if pkt.SSRC != rr.senderSSRC {
return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.senderSSRC)
}
diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber)
// overflow
if diff < -0x0FFF {
rr.sequenceNumberCycles++
}
// detect lost packets
if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) {
rr.totalLost += uint32(uint16(diff) - 1)
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
// allow up to 24 bits
if rr.totalLost > 0xFFFFFF {
rr.totalLost = 0xFFFFFF
}
if rr.totalLostSinceReport > 0xFFFFFF {
rr.totalLostSinceReport = 0xFFFFFF
}
}
rr.totalSinceReport += uint32(uint16(diff))
rr.lastSequenceNumber = pkt.SequenceNumber
if ptsEqualsDTS {
if rr.timeInitialized {
// update jitter
// https://tools.ietf.org/html/rfc3550#page-39
D := system.Sub(rr.lastTimeSystem).Seconds()*rr.clockRate -
(float64(pkt.Timestamp) - float64(rr.lastTimeRTP))
if D < 0 {
D = -D
}
rr.jitter += (D - rr.jitter) / 16
}
rr.timeInitialized = true
rr.lastTimeRTP = pkt.Timestamp
rr.lastTimeSystem = system
}
}
return nil
return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacket(pkt, system, ptsEqualsDTS)
}
// ProcessSenderReport extracts the needed data from RTCP sender reports.
func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
rr.firstSenderReportReceived = true
rr.lastSenderReportTimeNTP = sr.NTPTime
rr.lastSenderReportTimeRTP = sr.RTPTime
rr.lastSenderReportTimeSystem = system
(*rtcpreceiver.RTCPReceiver)(rr).ProcessSenderReport(sr, system)
}
// PacketNTP returns the NTP timestamp of the packet.
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if !rr.firstSenderReportReceived {
return time.Time{}, false
}
timeDiff := int32(ts - rr.lastSenderReportTimeRTP)
timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.clockRate)
return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true
return (*rtcpreceiver.RTCPReceiver)(rr).PacketNTP(ts)
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) {
rr.mutex.RLock()
defer rr.mutex.RUnlock()
return rr.senderSSRC, rr.firstRTPPacketReceived
return (*rtcpreceiver.RTCPReceiver)(rr).SenderSSRC()
}

View File

@@ -2,41 +2,17 @@
package rtcpsender
import (
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
// seconds since 1st January 1900
// higher 32 bits are the integer part, lower 32 bits are the fractional part
func ntpTimeGoToRTCP(v time.Time) uint64 {
s := uint64(v.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}
// RTCPSender is a utility to generate RTCP sender reports.
type RTCPSender struct {
clockRate float64
period time.Duration
timeNow func() time.Time
writePacketRTCP func(rtcp.Packet)
mutex sync.RWMutex
// data from RTP packets
initialized bool
lastTimeRTP uint32
lastTimeNTP time.Time
lastTimeSystem time.Time
senderSSRC uint32
lastSequenceNumber uint16
packetCount uint32
octetCount uint32
terminate chan struct{}
done chan struct{}
}
//
// Deprecated: will be removed in the next version.
type RTCPSender rtcpsender.RTCPSender
// New allocates a RTCPSender.
func New(
@@ -45,100 +21,33 @@ func New(
timeNow func() time.Time,
writePacketRTCP func(rtcp.Packet),
) *RTCPSender {
if timeNow == nil {
timeNow = time.Now
rs := &rtcpsender.RTCPSender{
ClockRate: clockRate,
Period: period,
TimeNow: timeNow,
WritePacketRTCP: writePacketRTCP,
}
rs.Initialize()
rs := &RTCPSender{
clockRate: float64(clockRate),
period: period,
timeNow: timeNow,
writePacketRTCP: writePacketRTCP,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go rs.run()
return rs
return (*RTCPSender)(rs)
}
// Close closes the RTCPSender.
func (rs *RTCPSender) Close() {
close(rs.terminate)
<-rs.done
}
func (rs *RTCPSender) run() {
defer close(rs.done)
t := time.NewTicker(rs.period)
defer t.Stop()
for {
select {
case <-t.C:
report := rs.report()
if report != nil {
rs.writePacketRTCP(report)
}
case <-rs.terminate:
return
}
}
}
func (rs *RTCPSender) report() rtcp.Packet {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if !rs.initialized {
return nil
}
systemTimeDiff := rs.timeNow().Sub(rs.lastTimeSystem)
ntpTime := rs.lastTimeNTP.Add(systemTimeDiff)
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*rs.clockRate)
return &rtcp.SenderReport{
SSRC: rs.senderSSRC,
NTPTime: ntpTimeGoToRTCP(ntpTime),
RTPTime: rtpTime,
PacketCount: rs.packetCount,
OctetCount: rs.octetCount,
}
(*rtcpsender.RTCPSender)(rs).Close()
}
// ProcessPacket extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if ptsEqualsDTS {
rs.initialized = true
rs.lastTimeRTP = pkt.Timestamp
rs.lastTimeNTP = ntp
rs.lastTimeSystem = rs.timeNow()
rs.senderSSRC = pkt.SSRC
}
rs.lastSequenceNumber = pkt.SequenceNumber
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))
(*rtcpsender.RTCPSender)(rs).ProcessPacket(pkt, ntp, ptsEqualsDTS)
}
// SenderSSRC returns the SSRC of outgoing RTP packets.
func (rs *RTCPSender) SenderSSRC() (uint32, bool) {
rs.mutex.RLock()
defer rs.mutex.RUnlock()
return rs.senderSSRC, rs.initialized
return (*rtcpsender.RTCPSender)(rs).SenderSSRC()
}
// LastPacketData returns metadata of the last RTP packet.
func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) {
rs.mutex.RLock()
defer rs.mutex.RUnlock()
return rs.lastSequenceNumber, rs.lastTimeRTP, rs.lastTimeNTP, rs.initialized
return (*rtcpsender.RTCPSender)(rs).LastPacketData()
}

View File

@@ -2,14 +2,14 @@
package rtplossdetector
import (
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
"github.com/pion/rtp"
)
// LossDetector detects lost packets.
type LossDetector struct {
initialized bool
expectedSeqNum uint16
}
//
// Deprecated: will be removed in the next version.
type LossDetector rtplossdetector.LossDetector
// New allocates a LossDetector.
func New() *LossDetector {
@@ -19,18 +19,5 @@ func New() *LossDetector {
// Process processes a RTP packet.
// It returns the number of lost packets.
func (r *LossDetector) Process(pkt *rtp.Packet) int {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return 0
}
if pkt.SequenceNumber != r.expectedSeqNum {
diff := pkt.SequenceNumber - r.expectedSeqNum
r.expectedSeqNum = pkt.SequenceNumber + 1
return int(diff)
}
r.expectedSeqNum = pkt.SequenceNumber + 1
return 0
return (*rtplossdetector.LossDetector)(r).Process(pkt)
}

View File

@@ -2,134 +2,26 @@
package rtpreorderer
import (
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
"github.com/pion/rtp"
)
const (
bufferSize = 64
)
// Reorderer filters incoming RTP packets, in order to
// - order packets
// - remove duplicate packets
type Reorderer struct {
initialized bool
expectedSeqNum uint16
buffer []*rtp.Packet
absPos uint16
negativeCount int
}
//
// Deprecated: will be removed in the next version.
type Reorderer rtpreorderer.Reorderer
// New allocates a Reorderer.
func New() *Reorderer {
return &Reorderer{
buffer: make([]*rtp.Packet, bufferSize),
}
r := &rtpreorderer.Reorderer{}
r.Initialize()
return (*Reorderer)(r)
}
// Process processes a RTP packet.
// It returns a sequence of ordered packets and the number of lost packets.
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, int) {
if !r.initialized {
r.initialized = true
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}, 0
}
relPos := int16(pkt.SequenceNumber - r.expectedSeqNum)
// packet is a duplicate or has been sent
// before the first packet processed by Reorderer.
// discard.
if relPos < 0 {
r.negativeCount++
// stream has been resetted, therefore reset reorderer too
if r.negativeCount > bufferSize {
r.negativeCount = 0
// clear buffer
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
r.buffer[p] = nil
}
// reset position
r.expectedSeqNum = pkt.SequenceNumber + 1
return []*rtp.Packet{pkt}, 0
}
return nil, 0
}
r.negativeCount = 0
// there's a missing packet and buffer is full.
// return entire buffer and clear it.
if relPos >= bufferSize {
n := 1
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
if r.buffer[p] != nil {
n++
}
}
ret := make([]*rtp.Packet, n)
pos := 0
for i := uint16(0); i < bufferSize; i++ {
p := (r.absPos + i) & (bufferSize - 1)
if r.buffer[p] != nil {
ret[pos], r.buffer[p] = r.buffer[p], nil
pos++
}
}
ret[pos] = pkt
r.expectedSeqNum = pkt.SequenceNumber + 1
return ret, int(relPos) - n + 1
}
// there's a missing packet
if relPos != 0 {
p := (r.absPos + uint16(relPos)) & (bufferSize - 1)
// current packet is a duplicate. discard
if r.buffer[p] != nil {
return nil, 0
}
// put current packet in buffer
r.buffer[p] = pkt
return nil, 0
}
// all packets have been received correctly.
// return them
n := uint16(1)
for {
p := (r.absPos + n) & (bufferSize - 1)
if r.buffer[p] == nil {
break
}
n++
}
ret := make([]*rtp.Packet, n)
ret[0] = pkt
r.absPos++
r.absPos &= (bufferSize - 1)
for i := uint16(1); i < n; i++ {
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
r.absPos++
r.absPos &= (bufferSize - 1)
}
r.expectedSeqNum = pkt.SequenceNumber + n
return ret, 0
return (*rtpreorderer.Reorderer)(r).Process(pkt)
}

View File

@@ -6,13 +6,16 @@ endif
test-examples:
go build -o /dev/null ./examples/...
test-internal:
go test -v $(RACE) -coverprofile=coverage-internal.txt ./internal/...
test-pkg:
go test -v $(RACE) -coverprofile=coverage-pkg.txt ./pkg/...
test-root:
go test -v $(RACE) -coverprofile=coverage-root.txt .
test-nodocker: test-examples test-pkg test-root
test-nodocker: test-examples test-internal test-pkg test-root
define DOCKERFILE_TEST
ARG ARCH

View File

@@ -6,11 +6,11 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer"
)
type serverSessionFormat struct {
@@ -26,22 +26,23 @@ type serverSessionFormat struct {
func (sf *serverSessionFormat) start() {
if sf.sm.ss.state != ServerSessionStatePlay {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.udpReorderer = rtpreorderer.New()
sf.udpReorderer = &rtpreorderer.Reorderer{}
sf.udpReorderer.Initialize()
} else {
sf.tcpLossDetector = rtplossdetector.New()
sf.tcpLossDetector = &rtplossdetector.LossDetector{}
}
var err error
sf.rtcpReceiver, err = rtcpreceiver.New(
sf.format.ClockRate(),
nil,
sf.sm.ss.s.receiverReportPeriod,
sf.sm.ss.s.timeNow,
func(pkt rtcp.Packet) {
sf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{
ClockRate: sf.format.ClockRate(),
Period: sf.sm.ss.s.receiverReportPeriod,
TimeNow: sf.sm.ss.s.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck
}
})
},
}
err := sf.rtcpReceiver.Initialize()
if err != nil {
panic(err)
}

View File

@@ -7,8 +7,8 @@ import (
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
)
type serverStreamFormat struct {
@@ -19,16 +19,17 @@ type serverStreamFormat struct {
}
func (sf *serverStreamFormat) initialize() {
sf.rtcpSender = rtcpsender.New(
sf.format.ClockRate(),
sf.sm.st.s.senderReportPeriod,
sf.sm.st.s.timeNow,
func(pkt rtcp.Packet) {
sf.rtcpSender = &rtcpsender.RTCPSender{
ClockRate: sf.format.ClockRate(),
Period: sf.sm.st.s.senderReportPeriod,
TimeNow: sf.sm.st.s.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
if !sf.sm.st.s.DisableRTCPSenderReports {
sf.sm.st.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck
}
},
)
}
sf.rtcpSender.Initialize()
}
func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {