Consider first packet when reading Simulcast IDs (#3144)

The code currently ignores the first packet when reading Simulcast IDs
from a new SSRC, and probes only subsequent packets. This commit makes
it so that we consider the first packet as well (which we already have
read). Helps if the publisher only sends Simulcast IDs on the first
packet.
This commit is contained in:
Kostya Vasilyev
2025-11-17 18:11:03 -08:00
committed by GitHub
parent 17287fd0f0
commit e7e3b36fce
3 changed files with 128 additions and 7 deletions

View File

@@ -1785,7 +1785,20 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
return err
}
// try to read simulcast IDs from the packet we already have
var mid, rid, rsid string
if _, err = handleUnknownRTPPacket(
b[:i], uint8(midExtensionID), //nolint:gosec // G115
uint8(streamIDExtensionID), //nolint:gosec // G115
uint8(repairStreamIDExtensionID), //nolint:gosec // G115
&mid,
&rid,
&rsid,
); err != nil {
return err
}
// if the first packet didn't contain simuilcast IDs, then probe more packets
var paddingOnly bool
for readCount := 0; readCount <= simulcastProbeCount; readCount++ {
if mid == "" || (rid == "" && rsid == "") {
@@ -1799,7 +1812,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
return err
}
if _, paddingOnly, err = handleUnknownRTPPacket(
if paddingOnly, err = handleUnknownRTPPacket(
b[:i], uint8(midExtensionID), //nolint:gosec // G115
uint8(streamIDExtensionID), //nolint:gosec // G115
uint8(repairStreamIDExtensionID), //nolint:gosec // G115

View File

@@ -1397,7 +1397,8 @@ func TestPeerConnection_Start_Right_Receiver(t *testing.T) {
closePairNow(t, pcOffer, pcAnswer)
}
func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop
//nolint:cyclop,maintidx
func TestPeerConnection_Simulcast_Probe(t *testing.T) {
lim := test.TimeOut(time.Second * 30) //nolint
defer lim.Stop()
@@ -1457,6 +1458,114 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop
close(testFinished)
})
// Assert that we can send just one packet with Simulcast IDs (using extensions) and they will be properly received
t.Run("ExtractIDs", func(t *testing.T) {
offerer, answerer, err := newPair()
assert.NoError(t, err)
rids := []string{"layer_1", "layer_2", "layer_3"}
ridSelected := rids[0]
onTrackCalled := &atomic.Bool{}
answerer.OnTrack(func(remote *TrackRemote, receiver *RTPReceiver) {
assert.Equal(t, remote.rid, ridSelected)
onTrackCalled.Store(true)
})
vp8WriterA, err := NewTrackLocalStaticRTP(
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[0]),
)
assert.NoError(t, err)
vp8WriterB, err := NewTrackLocalStaticRTP(
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[1]),
)
assert.NoError(t, err)
vp8WriterC, err := NewTrackLocalStaticRTP(
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[2]),
)
assert.NoError(t, err)
sender, err := offerer.AddTrack(vp8WriterA)
assert.NoError(t, err)
assert.NotNil(t, sender)
assert.NoError(t, sender.AddEncoding(vp8WriterB))
assert.NoError(t, sender.AddEncoding(vp8WriterC))
assert.NoError(t, signalPair(offerer, answerer))
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
peerConnectionConnected.Wait()
parameters := sender.GetParameters()
var midID, ridID uint8
for _, extension := range parameters.HeaderExtensions {
switch extension.URI {
case sdp.SDESMidURI:
midID = uint8(extension.ID) //nolint:gosec // G115
case sdp.SDESRTPStreamIDURI:
ridID = uint8(extension.ID) //nolint:gosec // G115
}
}
assert.NotZero(t, midID)
assert.NotZero(t, ridID)
ticker := time.NewTicker(time.Millisecond * 20)
defer ticker.Stop()
testFinished := make(chan struct{})
seenOneStream, seenOneStreamCancel := context.WithCancel(context.Background())
go func() {
sentOnePacket := false
senderTrack := vp8WriterA
for {
select {
case <-testFinished:
return
case <-ticker.C:
answerer.dtlsTransport.lock.Lock()
if len(answerer.dtlsTransport.simulcastStreams) >= 1 {
seenOneStreamCancel()
}
answerer.dtlsTransport.lock.Unlock()
senderTrack.mu.Lock()
// We send just one packet with the RID, that's the point of this test
if !sentOnePacket && len(senderTrack.bindings) > 0 {
sentOnePacket = true
header := &rtp.Header{
Version: 2,
SSRC: util.RandUint32(),
}
header.Extension = true
header.ExtensionProfile = 0x1000
assert.NoError(t, header.SetExtension(midID, []byte("0")))
assert.NoError(t, header.SetExtension(ridID, []byte(ridSelected)))
_, err = senderTrack.bindings[0].writeStream.WriteRTP(header, []byte{0, 1, 2, 3, 4, 5})
assert.NoError(t, err)
}
senderTrack.mu.Unlock()
}
}
}()
<-seenOneStream.Done()
assert.Equal(t, true, onTrackCalled.Load())
closePairNow(t, offerer, answerer)
close(testFinished)
})
// Assert that NonSimulcast Traffic isn't incorrectly broken by the probe
t.Run("Break NonSimulcast", func(t *testing.T) {
unhandledSimulcastError := make(chan struct{})

View File

@@ -428,10 +428,10 @@ func handleUnknownRTPPacket(
streamIDExtensionID,
repairStreamIDExtensionID uint8,
mid, rid, rsid *string,
) (payloadType PayloadType, paddingOnly bool, err error) {
) (paddingOnly bool, err error) {
rp := &rtp.Packet{}
if err = rp.Unmarshal(buf); err != nil {
return 0, false, err
return false, err
}
if rp.Padding && len(rp.Payload) == 0 {
@@ -439,10 +439,9 @@ func handleUnknownRTPPacket(
}
if !rp.Header.Extension {
return payloadType, paddingOnly, nil
return paddingOnly, nil
}
payloadType = PayloadType(rp.PayloadType)
if payload := rp.GetExtension(midExtensionID); payload != nil {
*mid = string(payload)
}
@@ -455,5 +454,5 @@ func handleUnknownRTPPacket(
*rsid = string(payload)
}
return payloadType, paddingOnly, nil
return paddingOnly, nil
}