diff --git a/client_format.go b/client_format.go index 978ed77d..d0311765 100644 --- a/client_format.go +++ b/client_format.go @@ -12,8 +12,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/liberrors" "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" - "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" - "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) func clientPickLocalSSRC(cf *clientFormat) (uint32, error) { @@ -47,10 +45,8 @@ type clientFormat struct { onPacketRTP OnPacketRTPFunc localSSRC uint32 - udpReorderer *rtpreorderer.Reorderer // play - tcpLossDetector *rtplossdetector.LossDetector // play - rtcpReceiver *rtcpreceiver.RTCPReceiver // play - rtcpSender *rtcpsender.RTCPSender // record or back channel + rtcpReceiver *rtcpreceiver.RTCPReceiver // play + rtcpSender *rtcpsender.RTCPSender // record or back channel writePacketRTPInQueue func([]byte) error rtpPacketsReceived *uint64 rtpPacketsSent *uint64 @@ -95,18 +91,12 @@ func (cf *clientFormat) start() { } cf.rtcpSender.Initialize() } else { - if cf.cm.udpRTPListener != nil { - cf.udpReorderer = &rtpreorderer.Reorderer{} - cf.udpReorderer.Initialize() - } else { - cf.tcpLossDetector = &rtplossdetector.LossDetector{} - } - cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ - ClockRate: cf.format.ClockRate(), - LocalSSRC: &cf.localSSRC, - Period: cf.cm.c.receiverReportPeriod, - TimeNow: cf.cm.c.timeNow, + ClockRate: cf.format.ClockRate(), + LocalSSRC: &cf.localSSRC, + UnrealiableTransport: (cf.cm.udpRTPListener != nil), + Period: cf.cm.c.receiverReportPeriod, + TimeNow: cf.cm.c.timeNow, WritePacketRTCP: func(pkt rtcp.Packet) { if cf.cm.udpRTPListener != nil && cf.cm.udpRTCPListener.writeAddr != nil { cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck @@ -142,46 +132,32 @@ func (cf *clientFormat) remoteSSRC() (uint32, bool) { } func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) { - packets, lost := cf.udpReorderer.Process(pkt) - if lost != 0 { - cf.handlePacketsLost(uint64(lost)) - // do not return - } - now := cf.cm.c.timeNow() - - for _, pkt := range packets { - cf.handlePacketRTP(pkt, now) - } + cf.handlePacketRTP(pkt, now) } func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) { - lost := cf.tcpLossDetector.Process(pkt) - if lost != 0 { - cf.handlePacketsLost(uint64(lost)) - // do not return - } - now := cf.cm.c.timeNow() - cf.handlePacketRTP(pkt, now) } func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { - err := cf.rtcpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt)) + pkts, lost, err := cf.rtcpReceiver.ProcessPacket2(pkt, now, cf.format.PTSEqualsDTS(pkt)) if err != nil { cf.cm.onPacketRTPDecodeError(err) return } - atomic.AddUint64(cf.rtpPacketsReceived, 1) + if lost != 0 { + atomic.AddUint64(cf.rtpPacketsLost, lost) + cf.cm.c.OnPacketsLost(lost) + } - cf.onPacketRTP(pkt) -} + atomic.AddUint64(cf.rtpPacketsReceived, uint64(len(pkts))) -func (cf *clientFormat) handlePacketsLost(lost uint64) { - atomic.AddUint64(cf.rtpPacketsLost, lost) - cf.cm.c.OnPacketsLost(lost) + for _, pkt := range pkts { + cf.onPacketRTP(pkt) + } } func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error { diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index ddaf4e9c..15207fd8 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -27,12 +27,34 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } -// RTCPReceiver is a utility to generate RTCP receiver reports. +// RTCPReceiver is a utility to receive RTP packets. It is in charge of: +// - removing packets with wrong SSRC +// - removing duplicate packets (when transport is unreliable) +// - reordering packets (when transport is unrealiable) +// - counting lost packets +// - generating RTCP receiver reports type RTCPReceiver struct { - ClockRate int - LocalSSRC *uint32 - Period time.Duration - TimeNow func() time.Time + // Track clock rate. + ClockRate int + + // Local SSRC + LocalSSRC *uint32 + + // Whether the transport is unrealiable. + // This enables removing duplicate packets and reordering packets. + UnrealiableTransport bool + + // size of the buffer for reordering packets. + // It defaults to 64. + BufferSize int + + // Period of RTCP receiver reports. + Period time.Duration + + // time.Now function. + TimeNow func() time.Time + + // Called when a RTCP receiver report is ready to be written. WritePacketRTCP func(rtcp.Packet) mutex sync.RWMutex @@ -40,8 +62,11 @@ type RTCPReceiver struct { // data from RTP packets firstRTPPacketReceived bool timeInitialized bool + buffer []*rtp.Packet + absPos uint16 + negativeCount int sequenceNumberCycles uint16 - lastSequenceNumber uint16 + lastValidSeqNum uint16 remoteSSRC uint32 lastTimeRTP uint32 lastTimeSystem time.Time @@ -97,10 +122,22 @@ func (rr *RTCPReceiver) Initialize() error { rr.LocalSSRC = &v } + if rr.BufferSize == 0 { + rr.BufferSize = 64 + } + + if rr.Period == 0 { + return fmt.Errorf("invalid Period") + } + if rr.TimeNow == nil { rr.TimeNow = time.Now } + if rr.UnrealiableTransport { + rr.buffer = make([]*rtp.Packet, rr.BufferSize) + } + rr.terminate = make(chan struct{}) rr.done = make(chan struct{}) @@ -136,8 +173,8 @@ func (rr *RTCPReceiver) run() { } func (rr *RTCPReceiver) report() rtcp.Packet { - rr.mutex.Lock() - defer rr.mutex.Unlock() + rr.mutex.RLock() + defer rr.mutex.RUnlock() if !rr.firstRTPPacketReceived || rr.ClockRate == 0 { return nil @@ -150,7 +187,7 @@ func (rr *RTCPReceiver) report() rtcp.Packet { Reports: []rtcp.ReceptionReport{ { SSRC: rr.remoteSSRC, - LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastSequenceNumber), + LastSequenceNumber: uint32(rr.sequenceNumberCycles)<<16 | uint32(rr.lastValidSeqNum), // equivalent to taking the integer part after multiplying the // loss fraction by 256 FractionLost: uint8(float64(rr.totalLostSinceReport*256) / float64(rr.totalSinceReport)), @@ -177,7 +214,20 @@ func (rr *RTCPReceiver) report() rtcp.Packet { } // ProcessPacket extracts the needed data from RTP packets. +// +// Deprecated: replaced by ProcessPacket2. func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { + _, _, err := rr.ProcessPacket2(pkt, system, ptsEqualsDTS) + return err +} + +// ProcessPacket2 processes an incoming RTP packet. +// It returns reordered packets and number of lost packets. +func (rr *RTCPReceiver) ProcessPacket2( + pkt *rtp.Packet, + system time.Time, + ptsEqualsDTS bool, +) ([]*rtp.Packet, uint64, error) { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -185,7 +235,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua if !rr.firstRTPPacketReceived { rr.firstRTPPacketReceived = true rr.totalSinceReport = 1 - rr.lastSequenceNumber = pkt.SequenceNumber + rr.lastValidSeqNum = pkt.SequenceNumber rr.remoteSSRC = pkt.SSRC if ptsEqualsDTS { @@ -194,35 +244,44 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua rr.lastTimeSystem = system } - // subsequent packets - } else { - if pkt.SSRC != rr.remoteSSRC { - return fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) - } + return []*rtp.Packet{pkt}, 0, nil + } - diff := int32(pkt.SequenceNumber) - int32(rr.lastSequenceNumber) + if pkt.SSRC != rr.remoteSSRC { + return nil, 0, fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC) + } + + var pkts []*rtp.Packet + var lost uint64 + + if rr.UnrealiableTransport { + pkts, lost = rr.reorder(pkt) + } else { + pkts = []*rtp.Packet{pkt} + lost = uint64(pkt.SequenceNumber - rr.lastValidSeqNum - 1) + } + + rr.totalLost += uint32(lost) + rr.totalLostSinceReport += uint32(lost) + + // allow up to 24 bits + if rr.totalLost > 0xFFFFFF { + rr.totalLost = 0xFFFFFF + } + if rr.totalLostSinceReport > 0xFFFFFF { + rr.totalLostSinceReport = 0xFFFFFF + } + + for _, pkt := range pkts { + diff := int32(pkt.SequenceNumber) - int32(rr.lastValidSeqNum) // overflow if diff < -0x0FFF { rr.sequenceNumberCycles++ } - // detect lost packets - if pkt.SequenceNumber != (rr.lastSequenceNumber + 1) { - rr.totalLost += uint32(uint16(diff) - 1) - rr.totalLostSinceReport += uint32(uint16(diff) - 1) - - // allow up to 24 bits - if rr.totalLost > 0xFFFFFF { - rr.totalLost = 0xFFFFFF - } - if rr.totalLostSinceReport > 0xFFFFFF { - rr.totalLostSinceReport = 0xFFFFFF - } - } - rr.totalSinceReport += uint32(uint16(diff)) - rr.lastSequenceNumber = pkt.SequenceNumber + rr.lastValidSeqNum = pkt.SequenceNumber if ptsEqualsDTS { if rr.timeInitialized && rr.ClockRate != 0 { @@ -242,10 +301,105 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua } } - return nil + return pkts, lost, nil } -// ProcessSenderReport extracts the needed data from RTCP sender reports. +func (rr *RTCPReceiver) reorder(pkt *rtp.Packet) ([]*rtp.Packet, uint64) { + relPos := int16(pkt.SequenceNumber - rr.lastValidSeqNum - 1) // rr.expectedSeqNum) + + // packet is a duplicate or has been sent + // before the first packet processed by Reorderer. + // discard. + if relPos < 0 { + rr.negativeCount++ + + // stream has been resetted, therefore reset reorderer too + if rr.negativeCount > len(rr.buffer) { + rr.negativeCount = 0 + + // clear buffer + for i := uint16(0); i < uint16(len(rr.buffer)); i++ { + p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) + rr.buffer[p] = nil + } + + // reset position. + return []*rtp.Packet{pkt}, 0 + } + + return nil, 0 + } + + rr.negativeCount = 0 + + // there's a missing packet and buffer is full. + // return entire buffer and clear it. + if relPos >= int16(len(rr.buffer)) { + n := 1 + for i := uint16(0); i < uint16(len(rr.buffer)); i++ { + p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) + if rr.buffer[p] != nil { + n++ + } + } + + ret := make([]*rtp.Packet, n) + pos := 0 + + for i := uint16(0); i < uint16(len(rr.buffer)); i++ { + p := (rr.absPos + i) & (uint16(len(rr.buffer)) - 1) + if rr.buffer[p] != nil { + ret[pos], rr.buffer[p] = rr.buffer[p], nil + pos++ + } + } + + ret[pos] = pkt + + return ret, uint64(int(relPos) - n + 1) + } + + // there's a missing packet + if relPos != 0 { + p := (rr.absPos + uint16(relPos)) & (uint16(len(rr.buffer)) - 1) + + // current packet is a duplicate. discard + if rr.buffer[p] != nil { + return nil, 0 + } + + // put current packet in buffer + rr.buffer[p] = pkt + return nil, 0 + } + + // all packets have been received correctly. + // return them. + + n := uint16(1) + for { + p := (rr.absPos + n) & (uint16(len(rr.buffer)) - 1) + if rr.buffer[p] == nil { + break + } + n++ + } + + ret := make([]*rtp.Packet, n) + ret[0] = pkt + rr.absPos++ + rr.absPos &= (uint16(len(rr.buffer)) - 1) + + for i := uint16(1); i < n; i++ { + ret[i], rr.buffer[rr.absPos] = rr.buffer[rr.absPos], nil + rr.absPos++ + rr.absPos &= (uint16(len(rr.buffer)) - 1) + } + + return ret, 0 +} + +// ProcessSenderReport processes an incoming RTCP sender report. func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.Time) { rr.mutex.Lock() defer rr.mutex.Unlock() @@ -269,13 +423,13 @@ func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { // PacketNTP returns the NTP (absolute timestamp) of the packet. func (rr *RTCPReceiver) PacketNTP(ts uint32) (time.Time, bool) { - rr.mutex.Lock() - defer rr.mutex.Unlock() + rr.mutex.RLock() + defer rr.mutex.RUnlock() return rr.packetNTPUnsafe(ts) } -// SenderSSRC returns the SSRC of outgoing RTP packets. +// SenderSSRC returns the SSRC of incoming RTP packets. // // Deprecated: replaced by Stats(). func (rr *RTCPReceiver) SenderSSRC() (uint32, bool) { @@ -308,7 +462,7 @@ func (rr *RTCPReceiver) Stats() *Stats { return &Stats{ RemoteSSRC: rr.remoteSSRC, - LastSequenceNumber: rr.lastSequenceNumber, + LastSequenceNumber: rr.lastValidSeqNum, LastRTP: rr.lastTimeRTP, LastNTP: ntp, Jitter: rr.jitter, diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index 4328c88f..ea65f97a 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -13,27 +13,25 @@ func uint32Ptr(v uint32) *uint32 { return &v } -func TestRTCPReceiver(t *testing.T) { - pktGenerated := make(chan rtcp.Packet) +func TestErrorInvalidPeriod(t *testing.T) { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + } + err := rr.Initialize() + require.EqualError(t, err, "invalid Period") +} +func TestErrorDifferentSSRC(t *testing.T) { rr := &RTCPReceiver{ ClockRate: 90000, LocalSSRC: uint32Ptr(0x65f83afb), Period: 500 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - }, - WritePacketRTCP: func(pkt rtcp.Packet) { - pktGenerated <- pkt - }, } err := rr.Initialize() require.NoError(t, err) defer rr.Close() - stats := rr.Stats() - require.Nil(t, stats) - rtpPkt := rtp.Packet{ Header: rtp.Header{ Version: 2, @@ -41,7 +39,7 @@ func TestRTCPReceiver(t *testing.T) { PayloadType: 96, SequenceNumber: 945, Timestamp: 0xafb45733, - SSRC: 0xba9da416, + SSRC: 1434523, }, Payload: []byte("\x00\x00"), } @@ -49,88 +47,159 @@ func TestRTCPReceiver(t *testing.T) { err = rr.ProcessPacket(&rtpPkt, ts, true) require.NoError(t, err) - stats = rr.Stats() - require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, - LastRTP: 0xafb45733, - LastSequenceNumber: 945, - }, stats) - - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: func() uint64 { - d := time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) - s := uint64(d.UnixNano()) + 2208988800*1000000000 - return (s/1000000000)<<32 | (s % 1000000000) - }(), - RTPTime: 0xafb45733, - PacketCount: 714, - OctetCount: 859127, - } - ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) - - stats = rr.Stats() - require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, - LastRTP: 0xafb45733, - LastSequenceNumber: 945, - LastNTP: time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC).Local(), - }, stats) - rtpPkt = rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: true, PayloadType: 96, - SequenceNumber: 946, + SequenceNumber: 945, Timestamp: 0xafb45733, - SSRC: 0xba9da416, + SSRC: 754623214, }, Payload: []byte("\x00\x00"), } - ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) err = rr.ProcessPacket(&rtpPkt, ts, true) - require.NoError(t, err) - - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 947, - Timestamp: 0xafb45733 + 90000, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) - require.NoError(t, err) - - pkt := <-pktGenerated - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 947, - LastSenderReport: 3422027776, - Delay: 2 * 65536, - }, - }, - }, pkt) - - stats = rr.Stats() - require.Equal(t, &Stats{ - RemoteSSRC: 0xba9da416, - LastRTP: 2947921603, - LastSequenceNumber: 947, - LastNTP: time.Date(2008, 5, 20, 22, 15, 21, 0, time.UTC).Local(), - }, stats) + require.EqualError(t, err, "received packet with wrong SSRC 754623214, expected 1434523") } -func TestRTCPReceiverZeroClockRate(t *testing.T) { +func TestStatsBeforeData(t *testing.T) { + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + stats := rr.Stats() + require.Nil(t, stats) +} + +func TestStandard(t *testing.T) { + for _, ca := range []string{ + "reliable", + "unrealiable", + } { + t.Run(ca, func(t *testing.T) { + pktGenerated := make(chan rtcp.Packet) + + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + UnrealiableTransport: ca == "unrealiable", + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + pktGenerated <- pkt + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 945, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + stats := rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 0xafb45733, + LastSequenceNumber: 945, + }, stats) + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: func() uint64 { + d := time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + s := uint64(d.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) + }(), + RTPTime: 0xafb45733, + PacketCount: 714, + OctetCount: 859127, + } + ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 0xafb45733, + LastSequenceNumber: 945, + LastNTP: time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC).Local(), + }, stats) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 90000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + pkt := <-pktGenerated + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 947, + LastSenderReport: 3422027776, + Delay: 2 * 65536, + }, + }, + }, pkt) + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 2947921603, + LastSequenceNumber: 947, + LastNTP: time.Date(2008, 5, 20, 22, 15, 21, 0, time.UTC).Local(), + }, stats) + }) + } +} + +func TestZeroClockRate(t *testing.T) { pktGenerated := make(chan rtcp.Packet) rr := &RTCPReceiver{ @@ -234,8 +303,8 @@ func TestRTCPReceiverZeroClockRate(t *testing.T) { }, stats) } -func TestRTCPReceiverOverflow(t *testing.T) { - done := make(chan struct{}) +func TestSequenceNumberOverflow(t *testing.T) { + rtcpGenerated := make(chan struct{}) rr := &RTCPReceiver{ ClockRate: 90000, @@ -256,7 +325,7 @@ func TestRTCPReceiverOverflow(t *testing.T) { }, }, }, pkt) - close(done) + close(rtcpGenerated) }, } err := rr.Initialize() @@ -305,10 +374,98 @@ func TestRTCPReceiverOverflow(t *testing.T) { err = rr.ProcessPacket(&rtpPkt, ts, true) require.NoError(t, err) - <-done + <-rtcpGenerated } -func TestRTCPReceiverPacketsLost(t *testing.T) { +func TestJitter(t *testing.T) { + rtcpGenerated := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 948, + LastSenderReport: 0x887a17ce, + Delay: 2 * 65536, + Jitter: 45000 / 16, + }, + }, + }, pkt) + close(rtcpGenerated) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: 0xe363887a17ced916, + RTPTime: 0xafb45733, + PacketCount: 714, + OctetCount: 859127, + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 45000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 948, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, false) + require.NoError(t, err) + + <-rtcpGenerated +} + +func TestReliablePacketsLost(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ @@ -385,7 +542,7 @@ func TestRTCPReceiverPacketsLost(t *testing.T) { <-done } -func TestRTCPReceiverOverflowPacketsLost(t *testing.T) { +func TestReliableOverflowAndPacketsLost(t *testing.T) { done := make(chan struct{}) rr := &RTCPReceiver{ @@ -462,90 +619,373 @@ func TestRTCPReceiverOverflowPacketsLost(t *testing.T) { <-done } -func TestRTCPReceiverJitter(t *testing.T) { - done := make(chan struct{}) - - rr := &RTCPReceiver{ - ClockRate: 90000, - LocalSSRC: uint32Ptr(0x65f83afb), - Period: 500 * time.Millisecond, - TimeNow: func() time.Time { - return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) +func TestUnrealiableReorder(t *testing.T) { + sequence := []struct { + in *rtp.Packet + out []*rtp.Packet + }{ + { + // first packet + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65530, + }, + }, + []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: 65530, + }, + }}, }, - WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 948, - LastSenderReport: 0x887a17ce, - Delay: 2 * 65536, - Jitter: 45000 / 16, + { + // packet sent before first packet + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65529, + }, + }, + []*rtp.Packet(nil), + }, + { + // ok + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }, + []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }}, + }, + { + // duplicated + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65531, + }, + }, + []*rtp.Packet(nil), + }, + { + // gap + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65535, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65533, + PayloadType: 96, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + duplicated + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65533, + PayloadType: 97, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65532, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 65532, }, }, - }, pkt) - close(done) + { + Header: rtp.Header{ + SequenceNumber: 65533, + PayloadType: 96, + }, + }, + }, + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 65534, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 65534, + }, + }, + { + Header: rtp.Header{ + SequenceNumber: 65535, + }, + }, + }, + }, + { + // overflow + gap + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 1, + }, + }, + []*rtp.Packet(nil), + }, + { + // unordered + &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 0, + }, + }, + []*rtp.Packet{ + { + Header: rtp.Header{ + SequenceNumber: 0, + }, + }, + { + Header: rtp.Header{ + SequenceNumber: 1, + }, + }, + }, + }, + } + + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + UnrealiableTransport: true, + Period: 500 * time.Millisecond, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + rr.absPos = 40 + + for _, entry := range sequence { + var out []*rtp.Packet + var missing uint64 + out, missing, err = rr.ProcessPacket2(entry.in, time.Time{}, true) + require.NoError(t, err) + require.Equal(t, entry.out, out) + require.Equal(t, uint64(0), missing) + } +} + +func TestUnrealiableBufferFull(t *testing.T) { + rtcpReceived := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + UnrealiableTransport: true, + Period: 500 * time.Millisecond, + WritePacketRTCP: func(p rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 1710766843, + Reports: []rtcp.ReceptionReport{{ + SSRC: 0, + FractionLost: 131, + TotalLost: 34, + LastSequenceNumber: 1629, + Jitter: 0, + LastSenderReport: 0, + Delay: 0, + }}, + }, p) + close(rtcpReceived) }, } err := rr.Initialize() require.NoError(t, err) defer rr.Close() - srPkt := rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: 0xe363887a17ced916, - RTPTime: 0xafb45733, - PacketCount: 714, - OctetCount: 859127, - } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rr.ProcessSenderReport(&srPkt, ts) + rr.absPos = 25 + sn := uint16(1564) + toMiss := uint64(34) - rtpPkt := rtp.Packet{ + out, missing, err := rr.ProcessPacket2(&rtp.Packet{ Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 946, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, + SequenceNumber: sn, }, - Payload: []byte("\x00\x00"), + }, time.Time{}, true) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }}, out) + require.Equal(t, uint64(0), missing) + sn++ + + var expected []*rtp.Packet + + for i := uint64(0); i < 64-toMiss; i++ { + out, missing, err = rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn + uint16(toMiss), + }, + }, time.Time{}, true) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet(nil), out) + require.Equal(t, uint64(0), missing) + + expected = append(expected, &rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn + uint16(toMiss), + }, + }) + sn++ } - ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) + + out, missing, err = rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn + uint16(toMiss), + }, + }, time.Time{}, true) require.NoError(t, err) - rtpPkt = rtp.Packet{ + require.Equal(t, toMiss, missing) + expected = append(expected, &rtp.Packet{ Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 947, - Timestamp: 0xafb45733 + 45000, - SSRC: 0xba9da416, + SequenceNumber: sn + uint16(toMiss), }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, true) - require.NoError(t, err) + }) + require.Equal(t, expected, out) - rtpPkt = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 948, - Timestamp: 0xafb45733, - SSRC: 0xba9da416, - }, - Payload: []byte("\x00\x00"), - } - ts = time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) - err = rr.ProcessPacket(&rtpPkt, ts, false) - require.NoError(t, err) - - <-done + <-rtcpReceived +} + +func TestUnrealiableReset(t *testing.T) { + rtcpGenerated := make(chan struct{}) + + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + UnrealiableTransport: true, + Period: 500 * time.Millisecond, + WritePacketRTCP: func(p rtcp.Packet) { + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 1710766843, + Reports: []rtcp.ReceptionReport{{ + SSRC: 0, + LastSequenceNumber: 0x10000 + 40064, + }}, + }, p) + close(rtcpGenerated) + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + _, _, err = rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 50000, + }, + }, time.Time{}, true) + require.NoError(t, err) + + sn := uint16(40000) + + for range 64 { + var out []*rtp.Packet + var missing uint64 + out, missing, err = rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }, time.Time{}, true) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet(nil), out) + require.Equal(t, uint64(0), missing) + sn++ + } + + out, missing, err := rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }, time.Time{}, true) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: sn, + }, + }}, out) + require.Equal(t, uint64(0), missing) + + <-rtcpGenerated +} + +func TestUnrealiableCustomBufferSize(t *testing.T) { + customSize := 128 + + rr := &RTCPReceiver{ + ClockRate: 90000, + LocalSSRC: uint32Ptr(0x65f83afb), + UnrealiableTransport: true, + BufferSize: customSize, + Period: 500 * time.Millisecond, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + // Set absPos to an arbitrary value. + rr.absPos = 10 + + // Process first packet; behaves as usual. + firstSeq := uint16(50) + out, missing, err := rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: firstSeq, + }, + }, time.Time{}, true) + require.NoError(t, err) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: firstSeq, + }, + }}, out) + require.Equal(t, uint64(0), missing) + + // At this point, expectedSeqNum == firstSeq + 1 (i.e. 51). + // Now, send a packet with a gap larger than the custom buffer size. + // For BufferSize = 128, let's send a packet with SequenceNumber = 51 + 130 = 181. + nextSeq := uint16(181) + out, missing, err = rr.ProcessPacket2(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: nextSeq, + }, + }, time.Time{}, true) + require.NoError(t, err) + + // Since there are no packets buffered, n remains 1. + // relPos = 181 - 51 = 130; so missing should be 130 + require.Equal(t, uint64(130), missing) + require.Equal(t, []*rtp.Packet{{ + Header: rtp.Header{ + SequenceNumber: nextSeq, + }, + }}, out) } diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 14d7a020..8de68f45 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -16,7 +16,8 @@ func ntpTimeGoToRTCP(v time.Time) uint64 { return (s/1000000000)<<32 | (s % 1000000000) } -// RTCPSender is a utility to generate RTCP sender reports. +// RTCPSender is a utility to send RTP packets. +// It is in charge of generating RTCP sender reports. type RTCPSender struct { ClockRate int Period time.Duration diff --git a/pkg/rtplossdetector/lossdetector.go b/pkg/rtplossdetector/lossdetector.go index 219af570..a18c3be3 100644 --- a/pkg/rtplossdetector/lossdetector.go +++ b/pkg/rtplossdetector/lossdetector.go @@ -6,6 +6,8 @@ import ( ) // LossDetector detects lost packets. +// +// Deprecated: merged into rtcpreceiver.Receiver. type LossDetector struct { initialized bool expectedSeqNum uint16 diff --git a/pkg/rtpreorderer/reorderer.go b/pkg/rtpreorderer/reorderer.go index 016fe376..fcfc6b91 100644 --- a/pkg/rtpreorderer/reorderer.go +++ b/pkg/rtpreorderer/reorderer.go @@ -12,6 +12,8 @@ const ( // Reorderer filters incoming RTP packets, in order to // - order packets // - remove duplicate packets +// +// Deprecated: merged into rtcpreceiver.Receiver. type Reorderer struct { initialized bool expectedSeqNum uint16 diff --git a/server_session_format.go b/server_session_format.go index 3352de49..853e1c26 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -12,8 +12,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" - "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" - "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) func serverSessionPickLocalSSRC(sf *serverSessionFormat) (uint32, error) { @@ -47,8 +45,6 @@ type serverSessionFormat struct { onPacketRTP OnPacketRTPFunc localSSRC uint32 - udpReorderer *rtpreorderer.Reorderer // publish or back channel - tcpLossDetector *rtplossdetector.LossDetector rtcpReceiver *rtcpreceiver.RTCPReceiver writePacketRTPInQueue func([]byte) error rtpPacketsReceived *uint64 @@ -75,29 +71,23 @@ func (sf *serverSessionFormat) initialize() error { } func (sf *serverSessionFormat) start() { - switch *sf.sm.ss.setuppedTransport { - case TransportUDP, TransportUDPMulticast: - sf.writePacketRTPInQueue = sf.writePacketRTPInQueueUDP + udp := *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast - default: + if udp { + sf.writePacketRTPInQueue = sf.writePacketRTPInQueueUDP + } else { sf.writePacketRTPInQueue = sf.writePacketRTPInQueueTCP } if sf.sm.ss.state == ServerSessionStateRecord || sf.sm.media.IsBackChannel { - if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { - sf.udpReorderer = &rtpreorderer.Reorderer{} - sf.udpReorderer.Initialize() - } else { - sf.tcpLossDetector = &rtplossdetector.LossDetector{} - } - sf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ - ClockRate: sf.format.ClockRate(), - LocalSSRC: &sf.localSSRC, - Period: sf.sm.ss.s.receiverReportPeriod, - TimeNow: sf.sm.ss.s.timeNow, + ClockRate: sf.format.ClockRate(), + LocalSSRC: &sf.localSSRC, + UnrealiableTransport: udp, + Period: sf.sm.ss.s.receiverReportPeriod, + TimeNow: sf.sm.ss.s.timeNow, WritePacketRTCP: func(pkt rtcp.Packet) { - if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { + if udp { sf.sm.ss.WritePacketRTCP(sf.sm.media, pkt) //nolint:errcheck } }, @@ -127,63 +117,50 @@ func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) { } func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) { - packets, lost := sf.udpReorderer.Process(pkt) - if lost != 0 { - sf.onPacketRTPLost(uint64(lost)) - // do not return - } - - for _, pkt := range packets { - sf.handlePacketRTP(pkt, now) - } + sf.handlePacketRTP(pkt, now) } func (sf *serverSessionFormat) readPacketRTPTCP(pkt *rtp.Packet) { - lost := sf.tcpLossDetector.Process(pkt) - if lost != 0 { - sf.onPacketRTPLost(uint64(lost)) - // do not return - } - now := sf.sm.ss.s.timeNow() - sf.handlePacketRTP(pkt, now) } func (sf *serverSessionFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) { - err := sf.rtcpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt)) + pkts, lost, err := sf.rtcpReceiver.ProcessPacket2(pkt, now, sf.format.PTSEqualsDTS(pkt)) if err != nil { sf.sm.onPacketRTPDecodeError(err) return } - atomic.AddUint64(sf.rtpPacketsReceived, 1) + if lost != 0 { + atomic.AddUint64(sf.rtpPacketsLost, lost) - sf.onPacketRTP(pkt) -} + if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketsLost); ok { + h.OnPacketsLost(&ServerHandlerOnPacketsLostCtx{ + Session: sf.sm.ss, + Lost: lost, + }) + } else if h, ok2 := sf.sm.ss.s.Handler.(ServerHandlerOnPacketLost); ok2 { + h.OnPacketLost(&ServerHandlerOnPacketLostCtx{ + Session: sf.sm.ss, + Error: liberrors.ErrServerRTPPacketsLost{Lost: uint(lost)}, //nolint:staticcheck + }) + } else { + log.Printf("%d RTP %s lost", + lost, + func() string { + if lost == 1 { + return "packet" + } + return "packets" + }()) + } + } -func (sf *serverSessionFormat) onPacketRTPLost(lost uint64) { - atomic.AddUint64(sf.rtpPacketsLost, lost) + atomic.AddUint64(sf.rtpPacketsReceived, uint64(len(pkts))) - if h, ok := sf.sm.ss.s.Handler.(ServerHandlerOnPacketsLost); ok { - h.OnPacketsLost(&ServerHandlerOnPacketsLostCtx{ - Session: sf.sm.ss, - Lost: lost, - }) - } else if h, ok2 := sf.sm.ss.s.Handler.(ServerHandlerOnPacketLost); ok2 { - h.OnPacketLost(&ServerHandlerOnPacketLostCtx{ - Session: sf.sm.ss, - Error: liberrors.ErrServerRTPPacketsLost{Lost: uint(lost)}, //nolint:staticcheck - }) - } else { - log.Printf("%d RTP %s lost", - lost, - func() string { - if lost == 1 { - return "packet" - } - return "packets" - }()) + for _, pkt := range pkts { + sf.onPacketRTP(pkt) } }