fix crash when reading tracks with zero clock rate (bluenviron/mediamtx#4476) (#772)

also prevents RTCP sender and RTCP reports from being emitted when
track has clock rate set to zero.
This commit is contained in:
Alessandro Ros
2025-05-01 18:08:13 +02:00
committed by GitHub
parent bfacd10d35
commit 3e555e2d18
6 changed files with 273 additions and 55 deletions

View File

@@ -146,7 +146,7 @@ func (cm *clientMedia) stop() {
}
}
func (cm *clientMedia) findFormatWithSSRC(ssrc uint32) *clientFormat {
func (cm *clientMedia) findFormatBySSRC(ssrc uint32) *clientFormat {
for _, format := range cm.formats {
stats := format.rtcpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
@@ -226,7 +226,7 @@ func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatWithSSRC(sr.SSRC)
format := cm.findFormatBySSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
@@ -311,7 +311,7 @@ func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatWithSSRC(sr.SSRC)
format := cm.findFormatBySSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}

View File

@@ -107,6 +107,12 @@ func (rr *RTCPReceiver) Initialize() error {
return nil
}
// Close closes the RTCPReceiver.
func (rr *RTCPReceiver) Close() {
close(rr.terminate)
<-rr.done
}
func (rr *RTCPReceiver) run() {
defer close(rr.done)
@@ -131,7 +137,7 @@ func (rr *RTCPReceiver) report() rtcp.Packet {
rr.mutex.Lock()
defer rr.mutex.Unlock()
if !rr.firstRTPPacketReceived {
if !rr.firstRTPPacketReceived || rr.ClockRate == 0 {
return nil
}
@@ -168,12 +174,6 @@ func (rr *RTCPReceiver) report() rtcp.Packet {
return report
}
// Close closes the RTCPReceiver.
func (rr *RTCPReceiver) Close() {
close(rr.terminate)
<-rr.done
}
// ProcessPacket extracts the needed data from RTP packets.
func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqualsDTS bool) error {
rr.mutex.Lock()
@@ -223,7 +223,7 @@ func (rr *RTCPReceiver) ProcessPacket(pkt *rtp.Packet, system time.Time, ptsEqua
rr.lastSequenceNumber = pkt.SequenceNumber
if ptsEqualsDTS {
if rr.timeInitialized {
if rr.timeInitialized && rr.ClockRate != 0 {
// update jitter
// https://tools.ietf.org/html/rfc3550#page-39
D := system.Sub(rr.lastTimeSystem).Seconds()*float64(rr.ClockRate) -
@@ -255,7 +255,7 @@ func (rr *RTCPReceiver) ProcessSenderReport(sr *rtcp.SenderReport, system time.T
}
func (rr *RTCPReceiver) packetNTPUnsafe(ts uint32) (time.Time, bool) {
if !rr.firstSenderReportReceived {
if !rr.firstSenderReportReceived || rr.ClockRate == 0 {
return time.Time{}, false
}

View File

@@ -13,8 +13,8 @@ func uint32Ptr(v uint32) *uint32 {
return &v
}
func TestRTCPReceiverBase(t *testing.T) {
done := make(chan struct{})
func TestRTCPReceiver(t *testing.T) {
pktGenerated := make(chan rtcp.Packet)
rr := &RTCPReceiver{
ClockRate: 90000,
@@ -24,24 +24,155 @@ func TestRTCPReceiverBase(t *testing.T) {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 947,
LastSenderReport: 0x887a17ce,
Delay: 2 * 65536,
},
},
}, pkt)
close(done)
pktGenerated <- pkt
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
stats := rr.Stats()
require.Nil(t, stats)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 945,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
stats = rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 0xafb45733,
LastSequenceNumber: 945,
}, stats)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: func() uint64 {
d := time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC)
s := uint64(d.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}(),
RTPTime: 0xafb45733,
PacketCount: 714,
OctetCount: 859127,
}
ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
stats = rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 0xafb45733,
LastSequenceNumber: 945,
LastNTP: time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC).Local(),
}, stats)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 947,
Timestamp: 0xafb45733 + 90000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 21, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
pkt := <-pktGenerated
require.Equal(t, &rtcp.ReceiverReport{
SSRC: 0x65f83afb,
Reports: []rtcp.ReceptionReport{
{
SSRC: 0xba9da416,
LastSequenceNumber: 947,
LastSenderReport: 3422027776,
Delay: 2 * 65536,
},
},
}, pkt)
stats = rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 2947921603,
LastSequenceNumber: 947,
LastNTP: time.Date(2008, 5, 20, 22, 15, 21, 0, time.UTC).Local(),
}, stats)
}
func TestRTCPReceiverZeroClockRate(t *testing.T) {
pktGenerated := make(chan rtcp.Packet)
rr := &RTCPReceiver{
ClockRate: 0,
LocalSSRC: uint32Ptr(0x65f83afb),
Period: 500 * time.Millisecond,
TimeNow: func() time.Time {
return time.Date(2008, 0o5, 20, 22, 15, 22, 0, time.UTC)
},
WritePacketRTCP: func(pkt rtcp.Packet) {
pktGenerated <- pkt
},
}
err := rr.Initialize()
require.NoError(t, err)
defer rr.Close()
stats := rr.Stats()
require.Nil(t, stats)
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 945,
Timestamp: 0xafb45733,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
stats = rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 0xafb45733,
LastSequenceNumber: 945,
}, stats)
srPkt := rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: 0xe363887a17ced916,
@@ -49,10 +180,17 @@ func TestRTCPReceiverBase(t *testing.T) {
PacketCount: 714,
OctetCount: 859127,
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rr.ProcessSenderReport(&srPkt, ts)
rtpPkt := rtp.Packet{
stats = rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 0xafb45733,
LastSequenceNumber: 945,
}, stats)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
@@ -82,7 +220,18 @@ func TestRTCPReceiverBase(t *testing.T) {
err = rr.ProcessPacket(&rtpPkt, ts, true)
require.NoError(t, err)
<-done
select {
case <-pktGenerated:
t.Error("should not happen")
case <-time.After(500 * time.Millisecond):
}
stats = rr.Stats()
require.Equal(t, &Stats{
RemoteSSRC: 0xba9da416,
LastRTP: 2947921603,
LastSequenceNumber: 947,
}, stats)
}
func TestRTCPReceiverOverflow(t *testing.T) {

View File

@@ -71,6 +71,12 @@ func (rs *RTCPSender) Initialize() {
go rs.run()
}
// Close closes the RTCPSender.
func (rs *RTCPSender) Close() {
close(rs.terminate)
<-rs.done
}
func (rs *RTCPSender) run() {
defer close(rs.done)
@@ -95,7 +101,7 @@ func (rs *RTCPSender) report() rtcp.Packet {
rs.mutex.Lock()
defer rs.mutex.Unlock()
if !rs.firstRTPPacketSent {
if !rs.firstRTPPacketSent || rs.ClockRate == 0 {
return nil
}
@@ -112,12 +118,6 @@ func (rs *RTCPSender) report() rtcp.Packet {
}
}
// Close closes the RTCPSender.
func (rs *RTCPSender) Close() {
close(rs.terminate)
<-rs.done
}
// ProcessPacket extracts data from RTP packets.
func (rs *RTCPSender) ProcessPacket(pkt *rtp.Packet, ntp time.Time, ptsEqualsDTS bool) {
rs.mutex.Lock()

View File

@@ -20,7 +20,7 @@ func TestRTCPSender(t *testing.T) {
curTime = v
}
sent := make(chan struct{})
pktGenerated := make(chan rtcp.Packet)
rs := &RTCPSender{
ClockRate: 90000,
@@ -31,25 +31,15 @@ func TestRTCPSender(t *testing.T) {
return curTime
},
WritePacketRTCP: func(pkt rtcp.Packet) {
require.Equal(t, &rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: func() uint64 {
// timeDiff = (24 - 22) = 2
// 21 + 2 = 23
d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC)
s := uint64(d.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}(),
RTPTime: 1287987768 + 2*90000,
PacketCount: 3,
OctetCount: 6,
}, pkt)
close(sent)
pktGenerated <- pkt
},
}
rs.Initialize()
defer rs.Close()
stats := rs.Stats()
require.Nil(t, stats)
setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC))
rtpPkt := rtp.Packet{
Header: rtp.Header{
@@ -96,5 +86,84 @@ func TestRTCPSender(t *testing.T) {
setCurTime(time.Date(2008, 5, 20, 22, 16, 24, 0, time.UTC))
<-sent
pkt := <-pktGenerated
require.Equal(t, &rtcp.SenderReport{
SSRC: 0xba9da416,
NTPTime: func() uint64 {
d := time.Date(2008, 5, 20, 22, 15, 23, 0, time.UTC)
s := uint64(d.UnixNano()) + 2208988800*1000000000
return (s/1000000000)<<32 | (s % 1000000000)
}(),
RTPTime: 1287987768 + 2*90000,
PacketCount: 3,
OctetCount: 6,
}, pkt)
stats = rs.Stats()
require.Equal(t, &Stats{
LocalSSRC: 0xba9da416,
LastSequenceNumber: 948,
LastRTP: 1287987768,
LastNTP: time.Date(2008, time.May, 20, 22, 15, 21, 0, time.UTC),
}, stats)
}
func TestRTCPSenderZeroClockRate(t *testing.T) {
var curTime time.Time
var mutex sync.Mutex
setCurTime := func(v time.Time) {
mutex.Lock()
defer mutex.Unlock()
curTime = v
}
pktGenerated := make(chan rtcp.Packet)
rs := &RTCPSender{
ClockRate: 0,
Period: 100 * time.Millisecond,
TimeNow: func() time.Time {
mutex.Lock()
defer mutex.Unlock()
return curTime
},
WritePacketRTCP: func(pkt rtcp.Packet) {
pktGenerated <- pkt
},
}
rs.Initialize()
defer rs.Close()
stats := rs.Stats()
require.Nil(t, stats)
setCurTime(time.Date(2008, 5, 20, 22, 16, 20, 0, time.UTC))
rtpPkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 1287987768,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rs.ProcessPacket(&rtpPkt, ts, true)
select {
case <-pktGenerated:
t.Error("should not happen")
case <-time.After(500 * time.Millisecond):
}
stats = rs.Stats()
require.Equal(t, &Stats{
LocalSSRC: 0xba9da416,
LastSequenceNumber: 946,
LastRTP: 1287987768,
LastNTP: time.Date(2008, time.May, 20, 22, 15, 20, 0, time.UTC),
}, stats)
}

View File

@@ -112,7 +112,7 @@ func (sm *serverSessionMedia) stop() {
}
}
func (sm *serverSessionMedia) findFormatWithSSRC(ssrc uint32) *serverSessionFormat {
func (sm *serverSessionMedia) findFormatBySSRC(ssrc uint32) *serverSessionFormat {
for _, format := range sm.formats {
stats := format.rtcpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
@@ -223,7 +223,7 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatWithSSRC(sr.SSRC)
format := sm.findFormatBySSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
@@ -303,7 +303,7 @@ func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatWithSSRC(sr.SSRC)
format := sm.findFormatBySSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}