mirror of
https://github.com/aler9/gortsplib
synced 2025-10-05 15:16:51 +08:00
expose back rtcpreceiver, rtcpsender, rtpreorderer, rtplossdetector (#755)
This commit is contained in:
@@ -20,8 +20,6 @@ import (
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/auth"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/bytecounter"
|
||||
@@ -30,6 +28,8 @@ import (
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/headers"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
|
||||
)
|
||||
@@ -1989,7 +1989,7 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet,
|
||||
cm := c.setuppedMedias[medi]
|
||||
cf := cm.formats[pkt.PayloadType]
|
||||
|
||||
cf.rtcpSender.ProcessPacketRTP(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
|
||||
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
|
||||
|
||||
ok := c.writer.push(func() error {
|
||||
return cf.writePacketRTPInQueue(byts)
|
||||
|
@@ -7,11 +7,11 @@ import (
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer"
|
||||
)
|
||||
|
||||
type clientFormat struct {
|
||||
@@ -93,7 +93,7 @@ func (cf *clientFormat) stop() {
|
||||
func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) {
|
||||
packets, lost := cf.udpReorderer.Process(pkt)
|
||||
if lost != 0 {
|
||||
cf.handlePacketsLost(lost)
|
||||
cf.handlePacketsLost(uint64(lost))
|
||||
// do not return
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) {
|
||||
func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) {
|
||||
lost := cf.tcpLossDetector.Process(pkt)
|
||||
if lost != 0 {
|
||||
cf.handlePacketsLost(lost)
|
||||
cf.handlePacketsLost(uint64(lost))
|
||||
// do not return
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) {
|
||||
}
|
||||
|
||||
func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) {
|
||||
err := cf.rtcpReceiver.ProcessPacketRTP(pkt, now, cf.format.PTSEqualsDTS(pkt))
|
||||
err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt))
|
||||
if err != nil {
|
||||
cf.cm.onPacketRTPDecodeError(err)
|
||||
return
|
||||
|
@@ -1,278 +0,0 @@
|
||||
// Package rtcpreceiver contains a utility to generate RTCP receiver reports.
|
||||
package rtcpreceiver
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
// seconds since 1st January 1900
|
||||
// higher 32 bits are the integer part, lower 32 bits are the fractional part
|
||||
func ntpTimeRTCPToGo(v uint64) time.Time {
|
||||
nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000
|
||||
return time.Unix(0, nano)
|
||||
}
|
||||
|
||||
func randUint32() (uint32, error) {
|
||||
var b [4]byte
|
||||
_, err := rand.Read(b[:])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
|
||||
}
|
||||
|
||||
// RTCPReceiver is a utility to generate RTCP receiver reports.
|
||||
type RTCPReceiver struct {
|
||||
ClockRate int
|
||||
LocalSSRC *uint32
|
||||
Period time.Duration
|
||||
TimeNow func() time.Time
|
||||
WritePacketRTCP func(rtcp.Packet)
|
||||
|
||||
mutex sync.RWMutex
|
||||
|
||||
// data from RTP packets
|
||||
firstRTPPacketReceived bool
|
||||
timeInitialized bool
|
||||
sequenceNumberCycles uint16
|
||||
lastSequenceNumber uint16
|
||||
remoteSSRC uint32
|
||||
lastTimeRTP uint32
|
||||
lastTimeSystem time.Time
|
||||
totalLost uint32
|
||||
totalLostSinceReport uint32
|
||||
totalSinceReport uint32
|
||||
jitter float64
|
||||
|
||||
// data from RTCP packets
|
||||
firstSenderReportReceived bool
|
||||
lastSenderReportTimeNTP uint64
|
||||
lastSenderReportTimeRTP uint32
|
||||
lastSenderReportTimeSystem time.Time
|
||||
|
||||
terminate chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Initialize initializes RTCPReceiver.
|
||||
func (rr *RTCPReceiver) Initialize() error {
|
||||
if rr.LocalSSRC == nil {
|
||||
v, err := randUint32()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr.LocalSSRC = &v
|
||||
}
|
||||
|
||||
if rr.TimeNow == nil {
|
||||
rr.TimeNow = time.Now
|
||||
}
|
||||
|
||||
rr.terminate = make(chan struct{})
|
||||
rr.done = make(chan struct{})
|
||||
|
||||
go rr.run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the RTCPReceiver.
|
||||
func (rr *RTCPReceiver) Close() {
|
||||
close(rr.terminate)
|
||||
<-rr.done
|
||||
}
|
||||
|
||||
func (rr *RTCPReceiver) run() {
|
||||
defer close(rr.done)
|
||||
|
||||
t := time.NewTicker(rr.Period)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
report := rr.report()
|
||||
if report != nil {
|
||||
rr.WritePacketRTCP(report)
|
||||
}
|
||||
|
||||
case <-rr.terminate:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rr *RTCPReceiver) report() rtcp.Packet {
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
if !rr.firstRTPPacketReceived {
|
||||
return nil
|
||||
}
|
||||
|
||||
system := rr.TimeNow()
|
||||
|
||||
report := &rtcp.ReceiverReport{
|
||||
SSRC: *rr.LocalSSRC,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: rr.remoteSSRC,
|
||||
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber),
|
||||
// equivalent to taking the integer part after multiplying the
|
||||
// loss fraction by 256
|
||||
FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)),
|
||||
TotalLost: rr.totalLost,
|
||||
Jitter: uint32(rr.jitter),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if rr.firstSenderReportReceived {
|
||||
// middle 32 bits out of 64 in the NTP timestamp of last sender report
|
||||
report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16)
|
||||
|
||||
// delay, expressed in units of 1/65536 seconds, between
|
||||
// receiving the last SR packet from source SSRC_n and sending this
|
||||
// reception report block
|
||||
report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536)
|
||||
}
|
||||
|
||||
rr.totalLostSinceReport = 0
|
||||
rr.totalSinceReport = 0
|
||||
|
||||
return report
|
||||
}
|
||||
|
||||
// ProcessPacketRTP extracts the needed data from RTP packets.
|
||||
func (rr *RTCPReceiver) ProcessPacketRTP(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
// first packet
|
||||
if !rr.firstRTPPacketReceived {
|
||||
rr.firstRTPPacketReceived = true
|
||||
rr.totalSinceReport = 1
|
||||
rr.lastSequenceNumber = pkt.SequenceNumber
|
||||
rr.remoteSSRC = pkt.SSRC
|
||||
|
||||
if ptsEqualsDTS {
|
||||
rr.timeInitialized = true
|
||||
rr.lastTimeRTP = pkt.Timestamp
|
||||
rr.lastTimeSystem = system
|
||||
}
|
||||
|
||||
// subsequent packets
|
||||
} else {
|
||||
if pkt.SSRC != rr.remoteSSRC {
|
||||
return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC)
|
||||
}
|
||||
|
||||
diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber)
|
||||
|
||||
// overflow
|
||||
if diff < -0x0FFF {
|
||||
rr.sequenceNumberCycles++
|
||||
}
|
||||
|
||||
// detect lost packets
|
||||
if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) {
|
||||
rr.totalLost += uint32(uint16(diff) - 1)
|
||||
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
|
||||
|
||||
// allow up to 24 bits
|
||||
if rr.totalLost > 0xFFFFFF {
|
||||
rr.totalLost = 0xFFFFFF
|
||||
}
|
||||
if rr.totalLostSinceReport > 0xFFFFFF {
|
||||
rr.totalLostSinceReport = 0xFFFFFF
|
||||
}
|
||||
}
|
||||
|
||||
rr.totalSinceReport += uint32(uint16(diff))
|
||||
rr.lastSequenceNumber = pkt.SequenceNumber
|
||||
|
||||
if ptsEqualsDTS {
|
||||
if rr.timeInitialized {
|
||||
// update jitter
|
||||
// https://tools.ietf.org/html/rfc3550#page-39
|
||||
D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) -
|
||||
(float64(pkt.Timestamp) - float64(rr.lastTimeRTP))
|
||||
if D < 0 {
|
||||
D = -D
|
||||
}
|
||||
rr.jitter += (D - rr.jitter) / 16
|
||||
}
|
||||
|
||||
rr.timeInitialized = true
|
||||
rr.lastTimeRTP = pkt.Timestamp
|
||||
rr.lastTimeSystem = system
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessSenderReport extracts the needed data from RTCP sender reports.
|
||||
func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) {
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
rr.firstSenderReportReceived = true
|
||||
rr.lastSenderReportTimeNTP = sr.NTPTime
|
||||
rr.lastSenderReportTimeRTP = sr.RTPTime
|
||||
rr.lastSenderReportTimeSystem = system
|
||||
}
|
||||
|
||||
func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) {
|
||||
if !rr.firstSenderReportReceived {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
timeDiff := int32(ts - rr.lastSenderReportTimeRTP)
|
||||
timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate)
|
||||
|
||||
return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true
|
||||
}
|
||||
|
||||
// PacketNTP returns the NTP timestamp of the packet.
|
||||
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
return rr.packetNTPUnsafe(ts)
|
||||
}
|
||||
|
||||
// Stats are statistics.
|
||||
type Stats struct {
|
||||
RemoteSSRC uint32
|
||||
LastSequenceNumber uint16
|
||||
LastRTP uint32
|
||||
LastNTP time.Time
|
||||
Jitter float64
|
||||
}
|
||||
|
||||
// Stats returns statistics.
|
||||
func (rr *RTCPReceiver) Stats() *Stats {
|
||||
rr.mutex.RLock()
|
||||
defer rr.mutex.RUnlock()
|
||||
|
||||
if !rr.firstRTPPacketReceived {
|
||||
return nil
|
||||
}
|
||||
|
||||
ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP)
|
||||
|
||||
return &Stats{
|
||||
RemoteSSRC: rr.remoteSSRC,
|
||||
LastSequenceNumber: rr.lastSequenceNumber,
|
||||
LastRTP: rr.lastTimeRTP,
|
||||
LastNTP: ntp,
|
||||
Jitter: rr.jitter,
|
||||
}
|
||||
}
|
@@ -1,402 +0,0 @@
|
||||
package rtcpreceiver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func uint32Ptr(v uint32) *uint32 {
|
||||
return &v
|
||||
}
|
||||
|
||||
func TestRTCPReceiverBase(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
|
||||
},
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: 0xba9da416,
|
||||
LastSequenceNumber: 947,
|
||||
LastSenderReport: 0x887a17ce,
|
||||
Delay: 2 * 65536,
|
||||
},
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
srPkt := rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: 0xe363887a17ced916,
|
||||
RTPTime: 0xafb45733,
|
||||
PacketCount: 714,
|
||||
OctetCount: 859127,
|
||||
}
|
||||
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
rr.ProcessSenderReport(&srPkt, ts)
|
||||
|
||||
rtpPkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 946,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 947,
|
||||
Timestamp: 0xafb45733 + 90000,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestRTCPReceiverOverflow(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 250 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
},
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: 0xba9da416,
|
||||
LastSequenceNumber: 1 << 16,
|
||||
LastSenderReport: 0x887a17ce,
|
||||
Delay: 1 * 65536,
|
||||
},
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
|
||||
srPkt := rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: 0xe363887a17ced916,
|
||||
RTPTime: 1287981738,
|
||||
PacketCount: 714,
|
||||
OctetCount: 859127,
|
||||
}
|
||||
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
rr.ProcessSenderReport(&srPkt, ts)
|
||||
|
||||
rtpPkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 0xffff,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 0x0000,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestRTCPReceiverPacketsLost(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
},
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: 0xba9da416,
|
||||
LastSequenceNumber: 0x0122,
|
||||
LastSenderReport: 0x887a17ce,
|
||||
FractionLost: func() uint8 {
|
||||
v := float64(1) / 3
|
||||
return uint8(v * 256)
|
||||
}(),
|
||||
TotalLost: 1,
|
||||
Delay: 1 * 65536,
|
||||
},
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
srPkt := rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: 0xe363887a17ced916,
|
||||
RTPTime: 1287981738,
|
||||
PacketCount: 714,
|
||||
OctetCount: 859127,
|
||||
}
|
||||
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
rr.ProcessSenderReport(&srPkt, ts)
|
||||
|
||||
rtpPkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 0x0120,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 0x0122,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestRTCPReceiverOverflowPacketsLost(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
},
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: 0xba9da416,
|
||||
LastSequenceNumber: 1<<16 | 0x0002,
|
||||
LastSenderReport: 0x887a17ce,
|
||||
FractionLost: func() uint8 {
|
||||
v := float64(2) / 4
|
||||
return uint8(v * 256)
|
||||
}(),
|
||||
TotalLost: 2,
|
||||
Delay: 1 * 65536,
|
||||
},
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
srPkt := rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: 0xe363887a17ced916,
|
||||
RTPTime: 1287981738,
|
||||
PacketCount: 714,
|
||||
OctetCount: 859127,
|
||||
}
|
||||
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
rr.ProcessSenderReport(&srPkt, ts)
|
||||
|
||||
rtpPkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 0xffff,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 0x0002,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestRTCPReceiverJitter(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
|
||||
},
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: 0xba9da416,
|
||||
LastSequenceNumber: 948,
|
||||
LastSenderReport: 0x887a17ce,
|
||||
Delay: 2 * 65536,
|
||||
Jitter: 45000 / 16,
|
||||
},
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
srPkt := rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: 0xe363887a17ced916,
|
||||
RTPTime: 0xafb45733,
|
||||
PacketCount: 714,
|
||||
OctetCount: 859127,
|
||||
}
|
||||
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
rr.ProcessSenderReport(&srPkt, ts)
|
||||
|
||||
rtpPkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 946,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 947,
|
||||
Timestamp: 0xafb45733 + 45000,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 948,
|
||||
Timestamp: 0xafb45733,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
|
||||
err = rr.ProcessPacketRTP(&rtpPkt, ts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
<-done
|
||||
}
|
@@ -1,143 +0,0 @@
|
||||
// Package rtcpsender contains a utility to generate RTCP sender reports.
|
||||
package rtcpsender
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
// seconds since 1st January 1900
|
||||
// higher 32 bits are the integer part, lower 32 bits are the fractional part
|
||||
func ntpTimeGoToRTCP(v time.Time) uint64 {
|
||||
s := uint64(v.UnixNano()) + 2208988800*1000000000
|
||||
return (s/1000000000)<<32 | (s % 1000000000)
|
||||
}
|
||||
|
||||
// RTCPSender is a utility to generate RTCP sender reports.
|
||||
type RTCPSender struct {
|
||||
ClockRate int
|
||||
Period time.Duration
|
||||
TimeNow func() time.Time
|
||||
WritePacketRTCP func(rtcp.Packet)
|
||||
|
||||
mutex sync.RWMutex
|
||||
|
||||
// data from RTP packets
|
||||
firstRTPPacketSent bool
|
||||
lastTimeRTP uint32
|
||||
lastTimeNTP time.Time
|
||||
lastTimeSystem time.Time
|
||||
localSSRC uint32
|
||||
lastSequenceNumber uint16
|
||||
packetCount uint32
|
||||
octetCount uint32
|
||||
|
||||
terminate chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Initialize initializes a RTCPSender.
|
||||
func (rs *RTCPSender) Initialize() {
|
||||
if rs.TimeNow == nil {
|
||||
rs.TimeNow = time.Now
|
||||
}
|
||||
|
||||
rs.terminate = make(chan struct{})
|
||||
rs.done = make(chan struct{})
|
||||
|
||||
go rs.run()
|
||||
}
|
||||
|
||||
// Close closes the RTCPSender.
|
||||
func (rs *RTCPSender) Close() {
|
||||
close(rs.terminate)
|
||||
<-rs.done
|
||||
}
|
||||
|
||||
func (rs *RTCPSender) run() {
|
||||
defer close(rs.done)
|
||||
|
||||
t := time.NewTicker(rs.Period)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
report := rs.report()
|
||||
if report != nil {
|
||||
rs.WritePacketRTCP(report)
|
||||
}
|
||||
|
||||
case <-rs.terminate:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RTCPSender) report() rtcp.Packet {
|
||||
rs.mutex.Lock()
|
||||
defer rs.mutex.Unlock()
|
||||
|
||||
if !rs.firstRTPPacketSent {
|
||||
return nil
|
||||
}
|
||||
|
||||
systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem)
|
||||
ntpTime := rs.lastTimeNTP.Add(systemTimeDiff)
|
||||
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate))
|
||||
|
||||
return &rtcp.SenderReport{
|
||||
SSRC: rs.localSSRC,
|
||||
NTPTime: ntpTimeGoToRTCP(ntpTime),
|
||||
RTPTime: rtpTime,
|
||||
PacketCount: rs.packetCount,
|
||||
OctetCount: rs.octetCount,
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessPacketRTP extracts data from RTP packets.
|
||||
func (rs *RTCPSender) ProcessPacketRTP(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
|
||||
rs.mutex.Lock()
|
||||
defer rs.mutex.Unlock()
|
||||
|
||||
if ptsEqualsDTS {
|
||||
rs.firstRTPPacketSent = true
|
||||
rs.lastTimeRTP = pkt.Timestamp
|
||||
rs.lastTimeNTP = ntp
|
||||
rs.lastTimeSystem = rs.TimeNow()
|
||||
rs.localSSRC = pkt.SSRC
|
||||
}
|
||||
|
||||
rs.lastSequenceNumber = pkt.SequenceNumber
|
||||
|
||||
rs.packetCount++
|
||||
rs.octetCount += uint32(len(pkt.Payload))
|
||||
}
|
||||
|
||||
// Stats are statistics.
|
||||
type Stats struct {
|
||||
LocalSSRC uint32
|
||||
LastSequenceNumber uint16
|
||||
LastRTP uint32
|
||||
LastNTP time.Time
|
||||
}
|
||||
|
||||
// Stats returns statistics.
|
||||
func (rs *RTCPSender) Stats() *Stats {
|
||||
rs.mutex.RLock()
|
||||
defer rs.mutex.RUnlock()
|
||||
|
||||
if !rs.firstRTPPacketSent {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &Stats{
|
||||
LocalSSRC: rs.localSSRC,
|
||||
LastSequenceNumber: rs.lastSequenceNumber,
|
||||
LastRTP: rs.lastTimeRTP,
|
||||
LastNTP: rs.lastTimeNTP,
|
||||
}
|
||||
}
|
@@ -1,100 +0,0 @@
|
||||
package rtcpsender
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRTCPSender(t *testing.T) {
|
||||
var curTime time.Time
|
||||
var mutex sync.Mutex
|
||||
|
||||
setCurTime := func(v time.Time) {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
curTime = v
|
||||
}
|
||||
|
||||
sent := make(chan struct{})
|
||||
|
||||
rs := &RTCPSender{
|
||||
ClockRate: 90000,
|
||||
Period: 100 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
return curTime
|
||||
},
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: func() uint64 {
|
||||
// timeDiff = (24 - 22) = 2
|
||||
// 21 + 2 = 23
|
||||
d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC)
|
||||
s := uint64(d.UnixNano()) + 2208988800*1000000000
|
||||
return (s/1000000000)<<32 | (s % 1000000000)
|
||||
}(),
|
||||
RTPTime: 1287987768 + 2*90000,
|
||||
PacketCount: 3,
|
||||
OctetCount: 6,
|
||||
}, pkt)
|
||||
close(sent)
|
||||
},
|
||||
}
|
||||
rs.Initialize()
|
||||
defer rs.Close()
|
||||
|
||||
setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC))
|
||||
rtpPkt := rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 946,
|
||||
Timestamp: 1287987768,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
|
||||
rs.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
|
||||
setCurTime(time.Date(2008, 5, 20, 22, 16, 22, 0, time.UTC))
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 947,
|
||||
Timestamp: 1287987768,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
rs.ProcessPacketRTP(&rtpPkt, ts, true)
|
||||
|
||||
rtpPkt = rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
Version: 2,
|
||||
Marker: true,
|
||||
PayloadType: 96,
|
||||
SequenceNumber: 948,
|
||||
Timestamp: 1287987768,
|
||||
SSRC: 0xba9da416,
|
||||
},
|
||||
Payload: []byte("\x00\x00"),
|
||||
}
|
||||
ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
|
||||
rs.ProcessPacketRTP(&rtpPkt, ts, false)
|
||||
|
||||
setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC))
|
||||
|
||||
<-sent
|
||||
}
|
@@ -1,31 +0,0 @@
|
||||
// Package rtplossdetector implements an algorithm that detects lost packets.
|
||||
package rtplossdetector
|
||||
|
||||
import (
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
// LossDetector detects lost packets.
|
||||
type LossDetector struct {
|
||||
initialized bool
|
||||
expectedSeqNum uint16
|
||||
}
|
||||
|
||||
// Process processes a RTP packet.
|
||||
// It returns the number of lost packets.
|
||||
func (r *LossDetector) Process(pkt *rtp.Packet) uint64 {
|
||||
if !r.initialized {
|
||||
r.initialized = true
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return 0
|
||||
}
|
||||
|
||||
if pkt.SequenceNumber != r.expectedSeqNum {
|
||||
diff := pkt.SequenceNumber - r.expectedSeqNum
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return uint64(diff)
|
||||
}
|
||||
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return 0
|
||||
}
|
@@ -1,33 +0,0 @@
|
||||
package rtplossdetector
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLossDetector(t *testing.T) {
|
||||
d := &LossDetector{}
|
||||
|
||||
c := d.Process(&rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
SequenceNumber: 65530,
|
||||
},
|
||||
})
|
||||
require.Equal(t, uint64(0), c)
|
||||
|
||||
c = d.Process(&rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
SequenceNumber: 65531,
|
||||
},
|
||||
})
|
||||
require.Equal(t, uint64(0), c)
|
||||
|
||||
c = d.Process(&rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
SequenceNumber: 65535,
|
||||
},
|
||||
})
|
||||
require.Equal(t, uint64(3), c)
|
||||
}
|
@@ -1,133 +0,0 @@
|
||||
// Package rtpreorderer implements a filter to reorder incoming RTP packets.
|
||||
package rtpreorderer
|
||||
|
||||
import (
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const (
|
||||
bufferSize = 64
|
||||
)
|
||||
|
||||
// Reorderer filters incoming RTP packets, in order to
|
||||
// - order packets
|
||||
// - remove duplicate packets
|
||||
type Reorderer struct {
|
||||
initialized bool
|
||||
expectedSeqNum uint16
|
||||
buffer []*rtp.Packet
|
||||
absPos uint16
|
||||
negativeCount int
|
||||
}
|
||||
|
||||
// Initialize initializes a Reorderer.
|
||||
func (r *Reorderer) Initialize() {
|
||||
r.buffer = make([]*rtp.Packet, bufferSize)
|
||||
}
|
||||
|
||||
// Process processes a RTP packet.
|
||||
// It returns a sequence of ordered packets and the number of lost packets.
|
||||
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint64) {
|
||||
if !r.initialized {
|
||||
r.initialized = true
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return []*rtp.Packet{pkt}, 0
|
||||
}
|
||||
|
||||
relPos := int16(pkt.SequenceNumber - r.expectedSeqNum)
|
||||
|
||||
// packet is a duplicate or has been sent
|
||||
// before the first packet processed by Reorderer.
|
||||
// discard.
|
||||
if relPos < 0 {
|
||||
r.negativeCount++
|
||||
|
||||
// stream has been resetted, therefore reset reorderer too
|
||||
if r.negativeCount > bufferSize {
|
||||
r.negativeCount = 0
|
||||
|
||||
// clear buffer
|
||||
for i := uint16(0); i < bufferSize; i++ {
|
||||
p := (r.absPos + i) & (bufferSize - 1)
|
||||
r.buffer[p] = nil
|
||||
}
|
||||
|
||||
// reset position
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return []*rtp.Packet{pkt}, 0
|
||||
}
|
||||
|
||||
return nil, 0
|
||||
}
|
||||
r.negativeCount = 0
|
||||
|
||||
// there's a missing packet and buffer is full.
|
||||
// return entire buffer and clear it.
|
||||
if relPos >= bufferSize {
|
||||
n := 1
|
||||
for i := uint16(0); i < bufferSize; i++ {
|
||||
p := (r.absPos + i) & (bufferSize - 1)
|
||||
if r.buffer[p] != nil {
|
||||
n++
|
||||
}
|
||||
}
|
||||
|
||||
ret := make([]*rtp.Packet, n)
|
||||
pos := 0
|
||||
|
||||
for i := uint16(0); i < bufferSize; i++ {
|
||||
p := (r.absPos + i) & (bufferSize - 1)
|
||||
if r.buffer[p] != nil {
|
||||
ret[pos], r.buffer[p] = r.buffer[p], nil
|
||||
pos++
|
||||
}
|
||||
}
|
||||
|
||||
ret[pos] = pkt
|
||||
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return ret, uint64(int(relPos) - n + 1)
|
||||
}
|
||||
|
||||
// there's a missing packet
|
||||
if relPos != 0 {
|
||||
p := (r.absPos + uint16(relPos)) & (bufferSize - 1)
|
||||
|
||||
// current packet is a duplicate. discard
|
||||
if r.buffer[p] != nil {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// put current packet in buffer
|
||||
r.buffer[p] = pkt
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// all packets have been received correctly.
|
||||
// return them
|
||||
|
||||
n := uint16(1)
|
||||
for {
|
||||
p := (r.absPos + n) & (bufferSize - 1)
|
||||
if r.buffer[p] == nil {
|
||||
break
|
||||
}
|
||||
n++
|
||||
}
|
||||
|
||||
ret := make([]*rtp.Packet, n)
|
||||
|
||||
ret[0] = pkt
|
||||
r.absPos++
|
||||
r.absPos &= (bufferSize - 1)
|
||||
|
||||
for i := uint16(1); i < n; i++ {
|
||||
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
|
||||
r.absPos++
|
||||
r.absPos &= (bufferSize - 1)
|
||||
}
|
||||
|
||||
r.expectedSeqNum = pkt.SequenceNumber + n
|
||||
|
||||
return ret, 0
|
||||
}
|
@@ -2,19 +2,67 @@
|
||||
package rtcpreceiver
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
// seconds since 1st January 1900
|
||||
// higher 32 bits are the integer part, lower 32 bits are the fractional part
|
||||
func ntpTimeRTCPToGo(v uint64) time.Time {
|
||||
nano := int64((v>>32)*1000000000+(v&0xFFFFFFFF)) - 2208988800*1000000000
|
||||
return time.Unix(0, nano)
|
||||
}
|
||||
|
||||
func randUint32() (uint32, error) {
|
||||
var b [4]byte
|
||||
_, err := rand.Read(b[:])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
|
||||
}
|
||||
|
||||
// RTCPReceiver is a utility to generate RTCP receiver reports.
|
||||
//
|
||||
// Deprecated: will be removed in the next version.
|
||||
type RTCPReceiver rtcpreceiver.RTCPReceiver
|
||||
type RTCPReceiver struct {
|
||||
ClockRate int
|
||||
LocalSSRC *uint32
|
||||
Period time.Duration
|
||||
TimeNow func() time.Time
|
||||
WritePacketRTCP func(rtcp.Packet)
|
||||
|
||||
mutex sync.RWMutex
|
||||
|
||||
// data from RTP packets
|
||||
firstRTPPacketReceived bool
|
||||
timeInitialized bool
|
||||
sequenceNumberCycles uint16
|
||||
lastSequenceNumber uint16
|
||||
remoteSSRC uint32
|
||||
lastTimeRTP uint32
|
||||
lastTimeSystem time.Time
|
||||
totalLost uint32
|
||||
totalLostSinceReport uint32
|
||||
totalSinceReport uint32
|
||||
jitter float64
|
||||
|
||||
// data from RTCP packets
|
||||
firstSenderReportReceived bool
|
||||
lastSenderReportTimeNTP uint64
|
||||
lastSenderReportTimeRTP uint32
|
||||
lastSenderReportTimeSystem time.Time
|
||||
|
||||
terminate chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// New allocates a RTCPReceiver.
|
||||
//
|
||||
// Deprecated: replaced by Initialize().
|
||||
func New(
|
||||
clockRate int,
|
||||
receiverSSRC *uint32,
|
||||
@@ -22,7 +70,7 @@ func New(
|
||||
timeNow func() time.Time,
|
||||
writePacketRTCP func(rtcp.Packet),
|
||||
) (*RTCPReceiver, error) {
|
||||
rr := &rtcpreceiver.RTCPReceiver{
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: clockRate,
|
||||
LocalSSRC: receiverSSRC,
|
||||
Period: period,
|
||||
@@ -34,34 +82,233 @@ func New(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return (*RTCPReceiver)(rr), nil
|
||||
return rr, nil
|
||||
}
|
||||
|
||||
// Initialize initializes RTCPReceiver.
|
||||
func (rr *RTCPReceiver) Initialize() error {
|
||||
if rr.LocalSSRC == nil {
|
||||
v, err := randUint32()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rr.LocalSSRC = &v
|
||||
}
|
||||
|
||||
if rr.TimeNow == nil {
|
||||
rr.TimeNow = time.Now
|
||||
}
|
||||
|
||||
rr.terminate = make(chan struct{})
|
||||
rr.done = make(chan struct{})
|
||||
|
||||
go rr.run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rr *RTCPReceiver) run() {
|
||||
defer close(rr.done)
|
||||
|
||||
t := time.NewTicker(rr.Period)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
report := rr.report()
|
||||
if report != nil {
|
||||
rr.WritePacketRTCP(report)
|
||||
}
|
||||
|
||||
case <-rr.terminate:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rr *RTCPReceiver) report() rtcp.Packet {
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
if !rr.firstRTPPacketReceived {
|
||||
return nil
|
||||
}
|
||||
|
||||
system := rr.TimeNow()
|
||||
|
||||
report := &rtcp.ReceiverReport{
|
||||
SSRC: *rr.LocalSSRC,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
{
|
||||
SSRC: rr.remoteSSRC,
|
||||
LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber),
|
||||
// equivalent to taking the integer part after multiplying the
|
||||
// loss fraction by 256
|
||||
FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)),
|
||||
TotalLost: rr.totalLost,
|
||||
Jitter: uint32(rr.jitter),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if rr.firstSenderReportReceived {
|
||||
// middle 32 bits out of 64 in the NTP timestamp of last sender report
|
||||
report.Reports[0].LastSenderReport = uint32(rr.lastSenderReportTimeNTP >> 16)
|
||||
|
||||
// delay, expressed in units of 1/65536 seconds, between
|
||||
// receiving the last SR packet from source SSRC_n and sending this
|
||||
// reception report block
|
||||
report.Reports[0].Delay = uint32(system.Sub(rr.lastSenderReportTimeSystem).Seconds() * 65536)
|
||||
}
|
||||
|
||||
rr.totalLostSinceReport = 0
|
||||
rr.totalSinceReport = 0
|
||||
|
||||
return report
|
||||
}
|
||||
|
||||
// Close closes the RTCPReceiver.
|
||||
func (rr *RTCPReceiver) Close() {
|
||||
(*rtcpreceiver.RTCPReceiver)(rr).Close()
|
||||
close(rr.terminate)
|
||||
<-rr.done
|
||||
}
|
||||
|
||||
// ProcessPacket extracts the needed data from RTP packets.
|
||||
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
|
||||
return (*rtcpreceiver.RTCPReceiver)(rr).ProcessPacketRTP(pkt, system, ptsEqualsDTS)
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
// first packet
|
||||
if !rr.firstRTPPacketReceived {
|
||||
rr.firstRTPPacketReceived = true
|
||||
rr.totalSinceReport = 1
|
||||
rr.lastSequenceNumber = pkt.SequenceNumber
|
||||
rr.remoteSSRC = pkt.SSRC
|
||||
|
||||
if ptsEqualsDTS {
|
||||
rr.timeInitialized = true
|
||||
rr.lastTimeRTP = pkt.Timestamp
|
||||
rr.lastTimeSystem = system
|
||||
}
|
||||
|
||||
// subsequent packets
|
||||
} else {
|
||||
if pkt.SSRC != rr.remoteSSRC {
|
||||
return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC)
|
||||
}
|
||||
|
||||
diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber)
|
||||
|
||||
// overflow
|
||||
if diff < -0x0FFF {
|
||||
rr.sequenceNumberCycles++
|
||||
}
|
||||
|
||||
// detect lost packets
|
||||
if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) {
|
||||
rr.totalLost += uint32(uint16(diff) - 1)
|
||||
rr.totalLostSinceReport += uint32(uint16(diff) - 1)
|
||||
|
||||
// allow up to 24 bits
|
||||
if rr.totalLost > 0xFFFFFF {
|
||||
rr.totalLost = 0xFFFFFF
|
||||
}
|
||||
if rr.totalLostSinceReport > 0xFFFFFF {
|
||||
rr.totalLostSinceReport = 0xFFFFFF
|
||||
}
|
||||
}
|
||||
|
||||
rr.totalSinceReport += uint32(uint16(diff))
|
||||
rr.lastSequenceNumber = pkt.SequenceNumber
|
||||
|
||||
if ptsEqualsDTS {
|
||||
if rr.timeInitialized {
|
||||
// update jitter
|
||||
// https://tools.ietf.org/html/rfc3550#page-39
|
||||
D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) -
|
||||
(float64(pkt.Timestamp) - float64(rr.lastTimeRTP))
|
||||
if D < 0 {
|
||||
D = -D
|
||||
}
|
||||
rr.jitter += (D - rr.jitter) / 16
|
||||
}
|
||||
|
||||
rr.timeInitialized = true
|
||||
rr.lastTimeRTP = pkt.Timestamp
|
||||
rr.lastTimeSystem = system
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessSenderReport extracts the needed data from RTCP sender reports.
|
||||
func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) {
|
||||
(*rtcpreceiver.RTCPReceiver)(rr).ProcessSenderReport(sr, system)
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
rr.firstSenderReportReceived = true
|
||||
rr.lastSenderReportTimeNTP = sr.NTPTime
|
||||
rr.lastSenderReportTimeRTP = sr.RTPTime
|
||||
rr.lastSenderReportTimeSystem = system
|
||||
}
|
||||
|
||||
func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) {
|
||||
if !rr.firstSenderReportReceived {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
timeDiff := int32(ts - rr.lastSenderReportTimeRTP)
|
||||
timeDiffGo := (time.Duration(timeDiff) * time.Second) / time.Duration(rr.ClockRate)
|
||||
|
||||
return ntpTimeRTCPToGo(rr.lastSenderReportTimeNTP).Add(timeDiffGo), true
|
||||
}
|
||||
|
||||
// PacketNTP returns the NTP timestamp of the packet.
|
||||
func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) {
|
||||
return (*rtcpreceiver.RTCPReceiver)(rr).PacketNTP(ts)
|
||||
rr.mutex.Lock()
|
||||
defer rr.mutex.Unlock()
|
||||
|
||||
return rr.packetNTPUnsafe(ts)
|
||||
}
|
||||
|
||||
// SenderSSRC returns the SSRC of outgoing RTP packets.
|
||||
//
|
||||
// Deprecated: replaced by Stats().
|
||||
func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) {
|
||||
stats := (*rtcpreceiver.RTCPReceiver)(rr).Stats()
|
||||
stats := rr.Stats()
|
||||
if stats == nil {
|
||||
return 0, false
|
||||
}
|
||||
return stats.RemoteSSRC, true
|
||||
}
|
||||
|
||||
// Stats are statistics.
|
||||
type Stats struct {
|
||||
RemoteSSRC uint32
|
||||
LastSequenceNumber uint16
|
||||
LastRTP uint32
|
||||
LastNTP time.Time
|
||||
Jitter float64
|
||||
}
|
||||
|
||||
// Stats returns statistics.
|
||||
func (rr *RTCPReceiver) Stats() *Stats {
|
||||
rr.mutex.RLock()
|
||||
defer rr.mutex.RUnlock()
|
||||
|
||||
if !rr.firstRTPPacketReceived {
|
||||
return nil
|
||||
}
|
||||
|
||||
ntp, _ := rr.packetNTPUnsafe(rr.lastTimeRTP)
|
||||
|
||||
return &Stats{
|
||||
RemoteSSRC: rr.remoteSSRC,
|
||||
LastSequenceNumber: rr.lastSequenceNumber,
|
||||
LastRTP: rr.lastTimeRTP,
|
||||
LastNTP: ntp,
|
||||
Jitter: rr.jitter,
|
||||
}
|
||||
}
|
||||
|
@@ -16,14 +16,14 @@ func uint32Ptr(v uint32) *uint32 {
|
||||
func TestRTCPReceiverBase(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr, err := New(
|
||||
90000,
|
||||
uint32Ptr(0x65f83afb),
|
||||
500*time.Millisecond,
|
||||
func() time.Time {
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
|
||||
},
|
||||
func(pkt rtcp.Packet) {
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
@@ -36,7 +36,9 @@ func TestRTCPReceiverBase(t *testing.T) {
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
})
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
@@ -86,14 +88,14 @@ func TestRTCPReceiverBase(t *testing.T) {
|
||||
func TestRTCPReceiverOverflow(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr, err := New(
|
||||
90000,
|
||||
uint32Ptr(0x65f83afb),
|
||||
250*time.Millisecond,
|
||||
func() time.Time {
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 250 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
},
|
||||
func(pkt rtcp.Packet) {
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
@@ -106,7 +108,9 @@ func TestRTCPReceiverOverflow(t *testing.T) {
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
})
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
@@ -158,14 +162,14 @@ func TestRTCPReceiverOverflow(t *testing.T) {
|
||||
func TestRTCPReceiverPacketsLost(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr, err := New(
|
||||
90000,
|
||||
uint32Ptr(0x65f83afb),
|
||||
500*time.Millisecond,
|
||||
func() time.Time {
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
},
|
||||
func(pkt rtcp.Packet) {
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
@@ -183,7 +187,9 @@ func TestRTCPReceiverPacketsLost(t *testing.T) {
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
})
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
@@ -233,14 +239,14 @@ func TestRTCPReceiverPacketsLost(t *testing.T) {
|
||||
func TestRTCPReceiverOverflowPacketsLost(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr, err := New(
|
||||
90000,
|
||||
uint32Ptr(0x65f83afb),
|
||||
500*time.Millisecond,
|
||||
func() time.Time {
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
|
||||
},
|
||||
func(pkt rtcp.Packet) {
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
@@ -258,7 +264,9 @@ func TestRTCPReceiverOverflowPacketsLost(t *testing.T) {
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
})
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
@@ -308,14 +316,14 @@ func TestRTCPReceiverOverflowPacketsLost(t *testing.T) {
|
||||
func TestRTCPReceiverJitter(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
|
||||
rr, err := New(
|
||||
90000,
|
||||
uint32Ptr(0x65f83afb),
|
||||
500*time.Millisecond,
|
||||
func() time.Time {
|
||||
rr := &RTCPReceiver{
|
||||
ClockRate: 90000,
|
||||
LocalSSRC: uint32Ptr(0x65f83afb),
|
||||
Period: 500 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
|
||||
},
|
||||
func(pkt rtcp.Packet) {
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.ReceiverReport{
|
||||
SSRC: 0x65f83afb,
|
||||
Reports: []rtcp.ReceptionReport{
|
||||
@@ -329,7 +337,9 @@ func TestRTCPReceiverJitter(t *testing.T) {
|
||||
},
|
||||
}, pkt)
|
||||
close(done)
|
||||
})
|
||||
},
|
||||
}
|
||||
err := rr.Initialize()
|
||||
require.NoError(t, err)
|
||||
defer rr.Close()
|
||||
|
||||
|
@@ -2,26 +2,53 @@
|
||||
package rtcpsender
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
// seconds since 1st January 1900
|
||||
// higher 32 bits are the integer part, lower 32 bits are the fractional part
|
||||
func ntpTimeGoToRTCP(v time.Time) uint64 {
|
||||
s := uint64(v.UnixNano()) + 2208988800*1000000000
|
||||
return (s/1000000000)<<32 | (s % 1000000000)
|
||||
}
|
||||
|
||||
// RTCPSender is a utility to generate RTCP sender reports.
|
||||
//
|
||||
// Deprecated: will be removed in the next version.
|
||||
type RTCPSender rtcpsender.RTCPSender
|
||||
type RTCPSender struct {
|
||||
ClockRate int
|
||||
Period time.Duration
|
||||
TimeNow func() time.Time
|
||||
WritePacketRTCP func(rtcp.Packet)
|
||||
|
||||
mutex sync.RWMutex
|
||||
|
||||
// data from RTP packets
|
||||
firstRTPPacketSent bool
|
||||
lastTimeRTP uint32
|
||||
lastTimeNTP time.Time
|
||||
lastTimeSystem time.Time
|
||||
localSSRC uint32
|
||||
lastSequenceNumber uint16
|
||||
packetCount uint32
|
||||
octetCount uint32
|
||||
|
||||
terminate chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// New allocates a RTCPSender.
|
||||
//
|
||||
// Deprecated: replaced by Initialize().
|
||||
func New(
|
||||
clockRate int,
|
||||
period time.Duration,
|
||||
timeNow func() time.Time,
|
||||
writePacketRTCP func(rtcp.Packet),
|
||||
) *RTCPSender {
|
||||
rs := &rtcpsender.RTCPSender{
|
||||
rs := &RTCPSender{
|
||||
ClockRate: clockRate,
|
||||
Period: period,
|
||||
TimeNow: timeNow,
|
||||
@@ -29,22 +56,92 @@ func New(
|
||||
}
|
||||
rs.Initialize()
|
||||
|
||||
return (*RTCPSender)(rs)
|
||||
return rs
|
||||
}
|
||||
|
||||
// Initialize initializes a RTCPSender.
|
||||
func (rs *RTCPSender) Initialize() {
|
||||
if rs.TimeNow == nil {
|
||||
rs.TimeNow = time.Now
|
||||
}
|
||||
|
||||
rs.terminate = make(chan struct{})
|
||||
rs.done = make(chan struct{})
|
||||
|
||||
go rs.run()
|
||||
}
|
||||
|
||||
func (rs *RTCPSender) run() {
|
||||
defer close(rs.done)
|
||||
|
||||
t := time.NewTicker(rs.Period)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
report := rs.report()
|
||||
if report != nil {
|
||||
rs.WritePacketRTCP(report)
|
||||
}
|
||||
|
||||
case <-rs.terminate:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RTCPSender) report() rtcp.Packet {
|
||||
rs.mutex.Lock()
|
||||
defer rs.mutex.Unlock()
|
||||
|
||||
if !rs.firstRTPPacketSent {
|
||||
return nil
|
||||
}
|
||||
|
||||
systemTimeDiff := rs.TimeNow().Sub(rs.lastTimeSystem)
|
||||
ntpTime := rs.lastTimeNTP.Add(systemTimeDiff)
|
||||
rtpTime := rs.lastTimeRTP + uint32(systemTimeDiff.Seconds()*float64(rs.ClockRate))
|
||||
|
||||
return &rtcp.SenderReport{
|
||||
SSRC: rs.localSSRC,
|
||||
NTPTime: ntpTimeGoToRTCP(ntpTime),
|
||||
RTPTime: rtpTime,
|
||||
PacketCount: rs.packetCount,
|
||||
OctetCount: rs.octetCount,
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the RTCPSender.
|
||||
func (rs *RTCPSender) Close() {
|
||||
(*rtcpsender.RTCPSender)(rs).Close()
|
||||
close(rs.terminate)
|
||||
<-rs.done
|
||||
}
|
||||
|
||||
// ProcessPacket extracts data from RTP packets.
|
||||
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
|
||||
(*rtcpsender.RTCPSender)(rs).ProcessPacketRTP(pkt, ntp, ptsEqualsDTS)
|
||||
rs.mutex.Lock()
|
||||
defer rs.mutex.Unlock()
|
||||
|
||||
if ptsEqualsDTS {
|
||||
rs.firstRTPPacketSent = true
|
||||
rs.lastTimeRTP = pkt.Timestamp
|
||||
rs.lastTimeNTP = ntp
|
||||
rs.lastTimeSystem = rs.TimeNow()
|
||||
rs.localSSRC = pkt.SSRC
|
||||
}
|
||||
|
||||
rs.lastSequenceNumber = pkt.SequenceNumber
|
||||
|
||||
rs.packetCount++
|
||||
rs.octetCount += uint32(len(pkt.Payload))
|
||||
}
|
||||
|
||||
// SenderSSRC returns the SSRC of outgoing RTP packets.
|
||||
//
|
||||
// Deprecated: replaced by Stats().
|
||||
func (rs *RTCPSender) SenderSSRC() (uint32, bool) {
|
||||
stats := (*rtcpsender.RTCPSender)(rs).Stats()
|
||||
stats := rs.Stats()
|
||||
if stats == nil {
|
||||
return 0, false
|
||||
}
|
||||
@@ -53,11 +150,38 @@ func (rs *RTCPSender) SenderSSRC() (uint32, bool) {
|
||||
}
|
||||
|
||||
// LastPacketData returns metadata of the last RTP packet.
|
||||
//
|
||||
// Deprecated: replaced by Stats().
|
||||
func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) {
|
||||
stats := (*rtcpsender.RTCPSender)(rs).Stats()
|
||||
stats := rs.Stats()
|
||||
if stats == nil {
|
||||
return 0, 0, time.Time{}, false
|
||||
}
|
||||
|
||||
return stats.LastSequenceNumber, stats.LastRTP, stats.LastNTP, true
|
||||
}
|
||||
|
||||
// Stats are statistics.
|
||||
type Stats struct {
|
||||
LocalSSRC uint32
|
||||
LastSequenceNumber uint16
|
||||
LastRTP uint32
|
||||
LastNTP time.Time
|
||||
}
|
||||
|
||||
// Stats returns statistics.
|
||||
func (rs *RTCPSender) Stats() *Stats {
|
||||
rs.mutex.RLock()
|
||||
defer rs.mutex.RUnlock()
|
||||
|
||||
if !rs.firstRTPPacketSent {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &Stats{
|
||||
LocalSSRC: rs.localSSRC,
|
||||
LastSequenceNumber: rs.lastSequenceNumber,
|
||||
LastRTP: rs.lastTimeRTP,
|
||||
LastNTP: rs.lastTimeNTP,
|
||||
}
|
||||
}
|
||||
|
@@ -22,15 +22,15 @@ func TestRTCPSender(t *testing.T) {
|
||||
|
||||
sent := make(chan struct{})
|
||||
|
||||
rs := New(
|
||||
90000,
|
||||
100*time.Millisecond,
|
||||
func() time.Time {
|
||||
rs := &RTCPSender{
|
||||
ClockRate: 90000,
|
||||
Period: 100 * time.Millisecond,
|
||||
TimeNow: func() time.Time {
|
||||
mutex.Lock()
|
||||
defer mutex.Unlock()
|
||||
return curTime
|
||||
},
|
||||
func(pkt rtcp.Packet) {
|
||||
WritePacketRTCP: func(pkt rtcp.Packet) {
|
||||
require.Equal(t, &rtcp.SenderReport{
|
||||
SSRC: 0xba9da416,
|
||||
NTPTime: func() uint64 {
|
||||
@@ -45,7 +45,9 @@ func TestRTCPSender(t *testing.T) {
|
||||
OctetCount: 6,
|
||||
}, pkt)
|
||||
close(sent)
|
||||
})
|
||||
},
|
||||
}
|
||||
rs.Initialize()
|
||||
defer rs.Close()
|
||||
|
||||
setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC))
|
||||
|
@@ -2,16 +2,18 @@
|
||||
package rtplossdetector
|
||||
|
||||
import (
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
// LossDetector detects lost packets.
|
||||
//
|
||||
// Deprecated: will be removed in the next version.
|
||||
type LossDetector rtplossdetector.LossDetector
|
||||
type LossDetector struct {
|
||||
initialized bool
|
||||
expectedSeqNum uint16
|
||||
}
|
||||
|
||||
// New allocates a LossDetector.
|
||||
//
|
||||
// Deprecated: Useless.
|
||||
func New() *LossDetector {
|
||||
return &LossDetector{}
|
||||
}
|
||||
@@ -19,5 +21,18 @@ func New() *LossDetector {
|
||||
// Process processes a RTP packet.
|
||||
// It returns the number of lost packets.
|
||||
func (r *LossDetector) Process(pkt *rtp.Packet) uint {
|
||||
return uint((*rtplossdetector.LossDetector)(r).Process(pkt))
|
||||
if !r.initialized {
|
||||
r.initialized = true
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return 0
|
||||
}
|
||||
|
||||
if pkt.SequenceNumber != r.expectedSeqNum {
|
||||
diff := pkt.SequenceNumber - r.expectedSeqNum
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return uint(diff)
|
||||
}
|
||||
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return 0
|
||||
}
|
||||
|
@@ -2,27 +2,141 @@
|
||||
package rtpreorderer
|
||||
|
||||
import (
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const (
|
||||
bufferSize = 64
|
||||
)
|
||||
|
||||
// Reorderer filters incoming RTP packets, in order to
|
||||
// - order packets
|
||||
// - remove duplicate packets
|
||||
//
|
||||
// Deprecated: will be removed in the next version.
|
||||
type Reorderer rtpreorderer.Reorderer
|
||||
type Reorderer struct {
|
||||
initialized bool
|
||||
expectedSeqNum uint16
|
||||
buffer []*rtp.Packet
|
||||
absPos uint16
|
||||
negativeCount int
|
||||
}
|
||||
|
||||
// New allocates a Reorderer.
|
||||
//
|
||||
// Deprecated: replaced by Initialize().
|
||||
func New() *Reorderer {
|
||||
r := &rtpreorderer.Reorderer{}
|
||||
r := &Reorderer{}
|
||||
r.Initialize()
|
||||
return (*Reorderer)(r)
|
||||
return r
|
||||
}
|
||||
|
||||
// Initialize initializes a Reorderer.
|
||||
func (r *Reorderer) Initialize() {
|
||||
r.buffer = make([]*rtp.Packet, bufferSize)
|
||||
}
|
||||
|
||||
// Process processes a RTP packet.
|
||||
// It returns a sequence of ordered packets and the number of lost packets.
|
||||
func (r *Reorderer) Process(pkt *rtp.Packet) ([]*rtp.Packet, uint) {
|
||||
v1, v2 := (*rtpreorderer.Reorderer)(r).Process(pkt)
|
||||
return v1, uint(v2)
|
||||
if !r.initialized {
|
||||
r.initialized = true
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return []*rtp.Packet{pkt}, 0
|
||||
}
|
||||
|
||||
relPos := int16(pkt.SequenceNumber - r.expectedSeqNum)
|
||||
|
||||
// packet is a duplicate or has been sent
|
||||
// before the first packet processed by Reorderer.
|
||||
// discard.
|
||||
if relPos < 0 {
|
||||
r.negativeCount++
|
||||
|
||||
// stream has been resetted, therefore reset reorderer too
|
||||
if r.negativeCount > bufferSize {
|
||||
r.negativeCount = 0
|
||||
|
||||
// clear buffer
|
||||
for i := uint16(0); i < bufferSize; i++ {
|
||||
p := (r.absPos + i) & (bufferSize - 1)
|
||||
r.buffer[p] = nil
|
||||
}
|
||||
|
||||
// reset position
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return []*rtp.Packet{pkt}, 0
|
||||
}
|
||||
|
||||
return nil, 0
|
||||
}
|
||||
r.negativeCount = 0
|
||||
|
||||
// there's a missing packet and buffer is full.
|
||||
// return entire buffer and clear it.
|
||||
if relPos >= bufferSize {
|
||||
n := 1
|
||||
for i := uint16(0); i < bufferSize; i++ {
|
||||
p := (r.absPos + i) & (bufferSize - 1)
|
||||
if r.buffer[p] != nil {
|
||||
n++
|
||||
}
|
||||
}
|
||||
|
||||
ret := make([]*rtp.Packet, n)
|
||||
pos := 0
|
||||
|
||||
for i := uint16(0); i < bufferSize; i++ {
|
||||
p := (r.absPos + i) & (bufferSize - 1)
|
||||
if r.buffer[p] != nil {
|
||||
ret[pos], r.buffer[p] = r.buffer[p], nil
|
||||
pos++
|
||||
}
|
||||
}
|
||||
|
||||
ret[pos] = pkt
|
||||
|
||||
r.expectedSeqNum = pkt.SequenceNumber + 1
|
||||
return ret, uint(int(relPos) - n + 1)
|
||||
}
|
||||
|
||||
// there's a missing packet
|
||||
if relPos != 0 {
|
||||
p := (r.absPos + uint16(relPos)) & (bufferSize - 1)
|
||||
|
||||
// current packet is a duplicate. discard
|
||||
if r.buffer[p] != nil {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// put current packet in buffer
|
||||
r.buffer[p] = pkt
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
// all packets have been received correctly.
|
||||
// return them
|
||||
|
||||
n := uint16(1)
|
||||
for {
|
||||
p := (r.absPos + n) & (bufferSize - 1)
|
||||
if r.buffer[p] == nil {
|
||||
break
|
||||
}
|
||||
n++
|
||||
}
|
||||
|
||||
ret := make([]*rtp.Packet, n)
|
||||
|
||||
ret[0] = pkt
|
||||
r.absPos++
|
||||
r.absPos &= (bufferSize - 1)
|
||||
|
||||
for i := uint16(1); i < n; i++ {
|
||||
ret[i], r.buffer[r.absPos] = r.buffer[r.absPos], nil
|
||||
r.absPos++
|
||||
r.absPos &= (bufferSize - 1)
|
||||
}
|
||||
|
||||
r.expectedSeqNum = pkt.SequenceNumber + n
|
||||
|
||||
return ret, 0
|
||||
}
|
||||
|
@@ -164,7 +164,7 @@ func TestReorder(t *testing.T) {
|
||||
for _, entry := range sequence {
|
||||
out, missing := r.Process(entry.in)
|
||||
require.Equal(t, entry.out, out)
|
||||
require.Equal(t, uint64(0), missing)
|
||||
require.Equal(t, uint(0), missing)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ func TestBufferIsFull(t *testing.T) {
|
||||
r.Initialize()
|
||||
r.absPos = 25
|
||||
sn := uint16(1564)
|
||||
toMiss := uint64(34)
|
||||
toMiss := uint(34)
|
||||
|
||||
out, missing := r.Process(&rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
@@ -185,19 +185,19 @@ func TestBufferIsFull(t *testing.T) {
|
||||
SequenceNumber: sn,
|
||||
},
|
||||
}}, out)
|
||||
require.Equal(t, uint64(0), missing)
|
||||
require.Equal(t, uint(0), missing)
|
||||
sn++
|
||||
|
||||
var expected []*rtp.Packet
|
||||
|
||||
for i := uint64(0); i < 64-toMiss; i++ {
|
||||
for i := uint(0); i < 64-toMiss; i++ {
|
||||
out, missing = r.Process(&rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
SequenceNumber: sn + uint16(toMiss),
|
||||
},
|
||||
})
|
||||
require.Equal(t, []*rtp.Packet(nil), out)
|
||||
require.Equal(t, uint64(0), missing)
|
||||
require.Equal(t, uint(0), missing)
|
||||
|
||||
expected = append(expected, &rtp.Packet{
|
||||
Header: rtp.Header{
|
||||
@@ -242,7 +242,7 @@ func TestReset(t *testing.T) {
|
||||
},
|
||||
})
|
||||
require.Equal(t, []*rtp.Packet(nil), out)
|
||||
require.Equal(t, uint64(0), missing)
|
||||
require.Equal(t, uint(0), missing)
|
||||
sn++
|
||||
}
|
||||
|
||||
@@ -256,5 +256,5 @@ func TestReset(t *testing.T) {
|
||||
SequenceNumber: sn,
|
||||
},
|
||||
}}, out)
|
||||
require.Equal(t, uint64(0), missing)
|
||||
require.Equal(t, uint(0), missing)
|
||||
}
|
@@ -6,16 +6,13 @@ endif
|
||||
test-examples:
|
||||
go build -o /dev/null ./examples/...
|
||||
|
||||
test-internal:
|
||||
go test -v $(RACE) -coverprofile=coverage-internal.txt ./internal/...
|
||||
|
||||
test-pkg:
|
||||
go test -v $(RACE) -coverprofile=coverage-pkg.txt ./pkg/...
|
||||
|
||||
test-root:
|
||||
go test -v $(RACE) -coverprofile=coverage-root.txt .
|
||||
|
||||
test-nodocker: test-examples test-internal test-pkg test-root
|
||||
test-nodocker: test-examples test-pkg test-root
|
||||
|
||||
define DOCKERFILE_TEST
|
||||
ARG ARCH
|
||||
|
@@ -15,13 +15,13 @@ import (
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/description"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/headers"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
|
||||
)
|
||||
|
@@ -8,11 +8,11 @@ import (
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer"
|
||||
)
|
||||
|
||||
type serverSessionFormat struct {
|
||||
@@ -79,7 +79,7 @@ func (sf *serverSessionFormat) stop() {
|
||||
func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) {
|
||||
packets, lost := sf.udpReorderer.Process(pkt)
|
||||
if lost != 0 {
|
||||
sf.onPacketRTPLost(lost)
|
||||
sf.onPacketRTPLost(uint64(lost))
|
||||
// do not return
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time)
|
||||
func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) {
|
||||
lost := sf.tcpLossDetector.Process(pkt)
|
||||
if lost != 0 {
|
||||
sf.onPacketRTPLost(lost)
|
||||
sf.onPacketRTPLost(uint64(lost))
|
||||
// do not return
|
||||
}
|
||||
|
||||
@@ -101,7 +101,7 @@ func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) {
|
||||
}
|
||||
|
||||
func (sf *serverSessionFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) {
|
||||
err := sf.rtcpReceiver.ProcessPacketRTP(pkt, now, sf.format.PTSEqualsDTS(pkt))
|
||||
err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
|
||||
if err != nil {
|
||||
sf.sm.onPacketRTPDecodeError(err)
|
||||
return
|
||||
|
@@ -7,8 +7,8 @@ import (
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
|
||||
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/format"
|
||||
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
|
||||
)
|
||||
|
||||
type serverStreamFormat struct {
|
||||
@@ -36,7 +36,7 @@ func (sf *serverStreamFormat) initialize() {
|
||||
}
|
||||
|
||||
func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
|
||||
sf.rtcpSender.ProcessPacketRTP(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
|
||||
sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
|
||||
|
||||
le := uint64(len(byts))
|
||||
|
||||
|
Reference in New Issue
Block a user