diff --git a/client_media.go b/client_media.go index c6c4f3ba..8615d957 100644 --- a/client_media.go +++ b/client_media.go @@ -146,7 +146,7 @@ func (cm *clientMedia) stop() { } } -func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat { +func (cm *clientMedia) findFormatBySSRC(ssrc uint32) *clientFormat { for _, format := range cm.formats { stats := format.rtcpReceiver.Stats() if stats != nil && stats.RemoteSSRC == ssrc { @@ -226,7 +226,7 @@ func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := cm.findFormatWithSSRC(sr.SSRC) + format := cm.findFormatBySSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } @@ -311,7 +311,7 @@ func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := cm.findFormatWithSSRC(sr.SSRC) + format := cm.findFormatBySSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 931bc671..bdeb60e5 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -107,6 +107,12 @@ func (rr *RTCPReceiver) Initialize() error { return nil } +// Close closes the RTCPReceiver. +func (rr *RTCPReceiver) Close() { + close(rr.terminate) + <-rr.done +} + func (rr *RTCPReceiver) run() { defer close(rr.done) @@ -131,7 +137,7 @@ func (rr *RTCPReceiver) report() rtcp.Packet { rr.mutex.Lock() defer rr.mutex.Unlock() - if !rr.firstRTPPacketReceived { + if !rr.firstRTPPacketReceived || rr.ClockRate == 0 { return nil } @@ -168,12 +174,6 @@ func (rr *RTCPReceiver) report() rtcp.Packet { return report } -// Close closes the RTCPReceiver. -func (rr *RTCPReceiver) Close() { - close(rr.terminate) - <-rr.done -} - // ProcessPacket extracts the needed data from RTP packets. func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error { rr.mutex.Lock() @@ -223,7 +223,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua rr.lastSequenceNumber = pkt.SequenceNumber if ptsEqualsDTS { - if rr.timeInitialized { + if rr.timeInitialized && rr.ClockRate != 0 { // update jitter // https://tools.ietf.org/html/rfc3550#page-39 D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) - @@ -255,7 +255,7 @@ func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.T } func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) { - if !rr.firstSenderReportReceived { + if !rr.firstSenderReportReceived || rr.ClockRate == 0 { return time.Time{}, false } diff --git a/pkg/rtcpreceiver/rtcpreceiver_test.go b/pkg/rtcpreceiver/rtcpreceiver_test.go index 773c9064..4328c88f 100644 --- a/pkg/rtcpreceiver/rtcpreceiver_test.go +++ b/pkg/rtcpreceiver/rtcpreceiver_test.go @@ -13,8 +13,8 @@ func uint32Ptr(v uint32) *uint32 { return &v } -func TestRTCPReceiverBase(t *testing.T) { - done := make(chan struct{}) +func TestRTCPReceiver(t *testing.T) { + pktGenerated := make(chan rtcp.Packet) rr := &RTCPReceiver{ ClockRate: 90000, @@ -24,24 +24,155 @@ func TestRTCPReceiverBase(t *testing.T) { return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) }, WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.ReceiverReport{ - SSRC: 0x65f83afb, - Reports: []rtcp.ReceptionReport{ - { - SSRC: 0xba9da416, - LastSequenceNumber: 947, - LastSenderReport: 0x887a17ce, - Delay: 2 * 65536, - }, - }, - }, pkt) - close(done) + pktGenerated <- pkt }, } err := rr.Initialize() require.NoError(t, err) defer rr.Close() + stats := rr.Stats() + require.Nil(t, stats) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 945, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 0xafb45733, + LastSequenceNumber: 945, + }, stats) + + srPkt := rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: func() uint64 { + d := time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + s := uint64(d.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) + }(), + RTPTime: 0xafb45733, + PacketCount: 714, + OctetCount: 859127, + } + ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + rr.ProcessSenderReport(&srPkt, ts) + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 0xafb45733, + LastSequenceNumber: 945, + LastNTP: time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC).Local(), + }, stats) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 947, + Timestamp: 0xafb45733 + 90000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + pkt := <-pktGenerated + require.Equal(t, &rtcp.ReceiverReport{ + SSRC: 0x65f83afb, + Reports: []rtcp.ReceptionReport{ + { + SSRC: 0xba9da416, + LastSequenceNumber: 947, + LastSenderReport: 3422027776, + Delay: 2 * 65536, + }, + }, + }, pkt) + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 2947921603, + LastSequenceNumber: 947, + LastNTP: time.Date(2008, 5, 20, 22, 15, 21, 0, time.UTC).Local(), + }, stats) +} + +func TestRTCPReceiverZeroClockRate(t *testing.T) { + pktGenerated := make(chan rtcp.Packet) + + rr := &RTCPReceiver{ + ClockRate: 0, + LocalSSRC: uint32Ptr(0x65f83afb), + Period: 500 * time.Millisecond, + TimeNow: func() time.Time { + return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC) + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + pktGenerated <- pkt + }, + } + err := rr.Initialize() + require.NoError(t, err) + defer rr.Close() + + stats := rr.Stats() + require.Nil(t, stats) + + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 945, + Timestamp: 0xafb45733, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + err = rr.ProcessPacket(&rtpPkt, ts, true) + require.NoError(t, err) + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 0xafb45733, + LastSequenceNumber: 945, + }, stats) + srPkt := rtcp.SenderReport{ SSRC: 0xba9da416, NTPTime: 0xe363887a17ced916, @@ -49,10 +180,17 @@ func TestRTCPReceiverBase(t *testing.T) { PacketCount: 714, OctetCount: 859127, } - ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) rr.ProcessSenderReport(&srPkt, ts) - rtpPkt := rtp.Packet{ + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 0xafb45733, + LastSequenceNumber: 945, + }, stats) + + rtpPkt = rtp.Packet{ Header: rtp.Header{ Version: 2, Marker: true, @@ -82,7 +220,18 @@ func TestRTCPReceiverBase(t *testing.T) { err = rr.ProcessPacket(&rtpPkt, ts, true) require.NoError(t, err) - <-done + select { + case <-pktGenerated: + t.Error("should not happen") + case <-time.After(500 * time.Millisecond): + } + + stats = rr.Stats() + require.Equal(t, &Stats{ + RemoteSSRC: 0xba9da416, + LastRTP: 2947921603, + LastSequenceNumber: 947, + }, stats) } func TestRTCPReceiverOverflow(t *testing.T) { diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 929d7a6a..9cc64aee 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -71,6 +71,12 @@ func (rs *RTCPSender) Initialize() { go rs.run() } +// Close closes the RTCPSender. +func (rs *RTCPSender) Close() { + close(rs.terminate) + <-rs.done +} + func (rs *RTCPSender) run() { defer close(rs.done) @@ -95,7 +101,7 @@ func (rs *RTCPSender) report() rtcp.Packet { rs.mutex.Lock() defer rs.mutex.Unlock() - if !rs.firstRTPPacketSent { + if !rs.firstRTPPacketSent || rs.ClockRate == 0 { return nil } @@ -112,12 +118,6 @@ func (rs *RTCPSender) report() rtcp.Packet { } } -// Close closes the RTCPSender. -func (rs *RTCPSender) Close() { - close(rs.terminate) - <-rs.done -} - // ProcessPacket extracts data from RTP packets. func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) { rs.mutex.Lock() diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtcpsender/rtcpsender_test.go index faab96e4..6583c294 100644 --- a/pkg/rtcpsender/rtcpsender_test.go +++ b/pkg/rtcpsender/rtcpsender_test.go @@ -20,7 +20,7 @@ func TestRTCPSender(t *testing.T) { curTime = v } - sent := make(chan struct{}) + pktGenerated := make(chan rtcp.Packet) rs := &RTCPSender{ ClockRate: 90000, @@ -31,25 +31,15 @@ func TestRTCPSender(t *testing.T) { return curTime }, WritePacketRTCP: func(pkt rtcp.Packet) { - require.Equal(t, &rtcp.SenderReport{ - SSRC: 0xba9da416, - NTPTime: func() uint64 { - // timeDiff = (24 - 22) = 2 - // 21 + 2 = 23 - d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC) - s := uint64(d.UnixNano()) + 2208988800*1000000000 - return (s/1000000000)<<32 | (s % 1000000000) - }(), - RTPTime: 1287987768 + 2*90000, - PacketCount: 3, - OctetCount: 6, - }, pkt) - close(sent) + pktGenerated <- pkt }, } rs.Initialize() defer rs.Close() + stats := rs.Stats() + require.Nil(t, stats) + setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC)) rtpPkt := rtp.Packet{ Header: rtp.Header{ @@ -96,5 +86,84 @@ func TestRTCPSender(t *testing.T) { setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC)) - <-sent + pkt := <-pktGenerated + require.Equal(t, &rtcp.SenderReport{ + SSRC: 0xba9da416, + NTPTime: func() uint64 { + d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC) + s := uint64(d.UnixNano()) + 2208988800*1000000000 + return (s/1000000000)<<32 | (s % 1000000000) + }(), + RTPTime: 1287987768 + 2*90000, + PacketCount: 3, + OctetCount: 6, + }, pkt) + + stats = rs.Stats() + require.Equal(t, &Stats{ + LocalSSRC: 0xba9da416, + LastSequenceNumber: 948, + LastRTP: 1287987768, + LastNTP: time.Date(2008, time.May, 20, 22, 15, 21, 0, time.UTC), + }, stats) +} + +func TestRTCPSenderZeroClockRate(t *testing.T) { + var curTime time.Time + var mutex sync.Mutex + + setCurTime := func(v time.Time) { + mutex.Lock() + defer mutex.Unlock() + curTime = v + } + + pktGenerated := make(chan rtcp.Packet) + + rs := &RTCPSender{ + ClockRate: 0, + Period: 100 * time.Millisecond, + TimeNow: func() time.Time { + mutex.Lock() + defer mutex.Unlock() + return curTime + }, + WritePacketRTCP: func(pkt rtcp.Packet) { + pktGenerated <- pkt + }, + } + rs.Initialize() + defer rs.Close() + + stats := rs.Stats() + require.Nil(t, stats) + + setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC)) + rtpPkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 946, + Timestamp: 1287987768, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) + rs.ProcessPacket(&rtpPkt, ts, true) + + select { + case <-pktGenerated: + t.Error("should not happen") + case <-time.After(500 * time.Millisecond): + } + + stats = rs.Stats() + require.Equal(t, &Stats{ + LocalSSRC: 0xba9da416, + LastSequenceNumber: 946, + LastRTP: 1287987768, + LastNTP: time.Date(2008, time.May, 20, 22, 15, 20, 0, time.UTC), + }, stats) } diff --git a/server_session_media.go b/server_session_media.go index dfe38273..f94ce5ad 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -112,7 +112,7 @@ func (sm *serverSessionMedia) stop() { } } -func (sm *serverSessionMedia) findFormatWithSSRC(ssrc uint32) *serverSessionFormat { +func (sm *serverSessionMedia) findFormatBySSRC(ssrc uint32) *serverSessionFormat { for _, format := range sm.formats { stats := format.rtcpReceiver.Stats() if stats != nil && stats.RemoteSSRC == ssrc { @@ -223,7 +223,7 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := sm.findFormatWithSSRC(sr.SSRC) + format := sm.findFormatBySSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } @@ -303,7 +303,7 @@ func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := sm.findFormatWithSSRC(sr.SSRC) + format := sm.findFormatBySSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) }