Properly handle non-media probes

libwebrtc has started sending media probes on an unannounced SSRC(0).
Currently Pion will ignore this as the SSRC hasn't been declared
explicitly and no RID/MID RTP Headers.

This adds a special case to accept SSRC 0 and Read the RTP packets. This
allows the TWCC reports to properly be generated.
This commit is contained in:
Sean DuBois
2024-07-19 14:56:52 -04:00
parent 19d022423d
commit c85269bee3
5 changed files with 96 additions and 16 deletions

View File

@@ -225,12 +225,59 @@ func Test_InterceptorRegistry_Build(t *testing.T) {
}, },
}) })
peerConnectionA, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{}) peerConnectionA, peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).newPair(Configuration{})
assert.NoError(t, err)
peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, registryBuildCount) assert.Equal(t, 2, registryBuildCount)
closePairNow(t, peerConnectionA, peerConnectionB) closePairNow(t, peerConnectionA, peerConnectionB)
} }
func Test_Interceptor_ZeroSSRC(t *testing.T) {
to := test.TimeOut(time.Second * 20)
defer to.Stop()
report := test.CheckRoutines(t)
defer report()
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
offerer, answerer, err := newPair()
assert.NoError(t, err)
_, err = offerer.AddTrack(track)
assert.NoError(t, err)
probeReceiverCreated := make(chan struct{})
go func() {
sequenceNumber := uint16(0)
for range time.NewTicker(time.Millisecond * 20).C {
track.mu.Lock()
if len(track.bindings) == 1 {
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: 0,
SequenceNumber: sequenceNumber,
}, []byte{0, 1, 2, 3, 4, 5})
assert.NoError(t, err)
}
sequenceNumber++
track.mu.Unlock()
if nonMediaBandwidthProbe, ok := answerer.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
assert.Equal(t, len(nonMediaBandwidthProbe.Tracks()), 1)
close(probeReceiverCreated)
return
}
}
}()
assert.NoError(t, signalPair(offerer, answerer))
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
peerConnectionConnected.Wait()
<-probeReceiverCreated
closePairNow(t, offerer, answerer)
}

View File

@@ -68,7 +68,8 @@ type PeerConnection struct {
// should be defined (see JSEP 3.4.1). // should be defined (see JSEP 3.4.1).
greaterMid int greaterMid int
rtpTransceivers []*RTPTransceiver rtpTransceivers []*RTPTransceiver
nonMediaBandwidthProbe atomic.Value // RTPReceiver
onSignalingStateChangeHandler func(SignalingState) onSignalingStateChangeHandler func(SignalingState)
onICEConnectionStateChangeHandler atomic.Value // func(ICEConnectionState) onICEConnectionStateChangeHandler atomic.Value // func(ICEConnectionState)
@@ -1524,6 +1525,32 @@ func (pc *PeerConnection) handleUndeclaredSSRC(ssrc SSRC, remoteDescription *Ses
return true, nil return true, nil
} }
// Chrome sends probing traffic on SSRC 0. This reads the packets to ensure that we properly
// generate TWCC reports for it. Since this isn't actually media we don't pass this to the user
func (pc *PeerConnection) handleNonMediaBandwidthProbe() {
nonMediaBandwidthProbe, err := pc.api.NewRTPReceiver(RTPCodecTypeVideo, pc.dtlsTransport)
if err != nil {
pc.log.Errorf("handleNonMediaBandwidthProbe failed to create RTPReceiver: %v", err)
return
}
if err = nonMediaBandwidthProbe.Receive(RTPReceiveParameters{
Encodings: []RTPDecodingParameters{{RTPCodingParameters: RTPCodingParameters{}}},
}); err != nil {
pc.log.Errorf("handleNonMediaBandwidthProbe failed to start RTPReceiver: %v", err)
return
}
pc.nonMediaBandwidthProbe.Store(nonMediaBandwidthProbe)
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
for {
if _, _, err = nonMediaBandwidthProbe.readRTP(b, nonMediaBandwidthProbe.Track()); err != nil {
pc.log.Tracef("handleNonMediaBandwidthProbe read exiting: %v", err)
return
}
}
}
func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocognit func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocognit
remoteDescription := pc.RemoteDescription() remoteDescription := pc.RemoteDescription()
if remoteDescription == nil { if remoteDescription == nil {
@@ -1656,6 +1683,11 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
continue continue
} }
if ssrc == 0 {
go pc.handleNonMediaBandwidthProbe()
continue
}
pc.dtlsTransport.storeSimulcastStream(stream) pc.dtlsTransport.storeSimulcastStream(stream)
if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines { if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines {
@@ -2072,6 +2104,9 @@ func (pc *PeerConnection) Close() error {
closeErrs = append(closeErrs, t.Stop()) closeErrs = append(closeErrs, t.Stop())
} }
} }
if nonMediaBandwidthProbe, ok := pc.nonMediaBandwidthProbe.Load().(*RTPReceiver); ok {
closeErrs = append(closeErrs, nonMediaBandwidthProbe.Stop())
}
pc.mu.Unlock() pc.mu.Unlock()
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5) // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)

View File

@@ -1528,7 +1528,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
SequenceNumber: sequenceNumber, SequenceNumber: sequenceNumber,
PayloadType: 96, PayloadType: 96,
Padding: true, Padding: true,
SSRC: uint32(i), SSRC: uint32(i + 1),
}, },
Payload: []byte{0x00, 0x02}, Payload: []byte{0x00, 0x02},
} }
@@ -1547,7 +1547,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
Version: 2, Version: 2,
SequenceNumber: sequenceNumber, SequenceNumber: sequenceNumber,
PayloadType: 96, PayloadType: 96,
SSRC: uint32(i), SSRC: uint32(i + 1),
}, },
Payload: []byte{0x00}, Payload: []byte{0x00},
} }
@@ -1591,7 +1591,7 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) {
Version: 2, Version: 2,
SequenceNumber: sequenceNumber, SequenceNumber: sequenceNumber,
PayloadType: 96, PayloadType: 96,
SSRC: uint32(i), SSRC: uint32(i + 1),
}, },
Payload: []byte{0x00}, Payload: []byte{0x00},
} }

View File

@@ -1046,7 +1046,7 @@ func TestPeerConnection_Renegotiation_Simulcast(t *testing.T) {
for ssrc, rid := range rids { for ssrc, rid := range rids {
header := &rtp.Header{ header := &rtp.Header{
Version: 2, Version: 2,
SSRC: uint32(ssrc), SSRC: uint32(ssrc + 1),
SequenceNumber: sequenceNumber, SequenceNumber: sequenceNumber,
PayloadType: 96, PayloadType: 96,
} }

View File

@@ -201,7 +201,7 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
var t *trackStreams var t *trackStreams
for idx, ts := range r.tracks { for idx, ts := range r.tracks {
if ts.track != nil && parameters.Encodings[i].SSRC != 0 && ts.track.SSRC() == parameters.Encodings[i].SSRC { if ts.track != nil && ts.track.SSRC() == parameters.Encodings[i].SSRC {
t = &r.tracks[idx] t = &r.tracks[idx]
break break
} }
@@ -210,12 +210,10 @@ func (r *RTPReceiver) startReceive(parameters RTPReceiveParameters) error {
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC) return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, parameters.Encodings[i].SSRC)
} }
if parameters.Encodings[i].SSRC != 0 { t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions)
t.streamInfo = createStreamInfo("", parameters.Encodings[i].SSRC, 0, codec, globalParams.HeaderExtensions) var err error
var err error if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil {
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *t.streamInfo); err != nil { return err
return err
}
} }
if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 { if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {