Add E2E Test for RTX

Assert that generation of NACKs and sending of RTX operates as expected.
This commit is contained in:
Sean DuBois
2024-10-04 23:45:22 -04:00
parent 32f7063f1a
commit 5637661fef
8 changed files with 95 additions and 104 deletions

2
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/pion/datachannel v1.5.9
github.com/pion/dtls/v3 v3.0.2
github.com/pion/ice/v4 v4.0.1
github.com/pion/interceptor v0.1.34
github.com/pion/interceptor v0.1.36
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/rtcp v1.2.14

4
go.sum
View File

@@ -41,8 +41,8 @@ github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0=
github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k=
github.com/pion/ice/v4 v4.0.1 h1:2d3tPoTR90F3TcGYeXUwucGlXI3hds96cwv4kjZmb9s=
github.com/pion/ice/v4 v4.0.1/go.mod h1:2dpakjpd7+74L5j3TAe6gvkbI5UIzOgAnkimm9SuHvA=
github.com/pion/interceptor v0.1.34 h1:jb1MG9LTdQ4VVCSZDUbUzjeJNngzz4dBXcr2dL+ejfA=
github.com/pion/interceptor v0.1.34/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/interceptor v0.1.36 h1:WNOZUs5Vec3+NHeY6uGo4nvbxCcRglrI//DlUwLnl/M=
github.com/pion/interceptor v0.1.36/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=

View File

@@ -931,7 +931,7 @@ func TestICERestart_Error_Handling(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
offerPeerConnection, answerPeerConnection, wan := createVNetPair(t)
offerPeerConnection, answerPeerConnection, wan := createVNetPair(t, nil)
pushICEState := func(i ICEConnectionState) { iceStates <- i }
offerPeerConnection.OnICEConnectionStateChange(pushICEState)

View File

@@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"regexp"
"strings"
"sync"
@@ -322,7 +323,7 @@ func TestPeerConnection_Media_Disconnected(t *testing.T) {
m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())
pcOffer, pcAnswer, wan := createVNetPair(t)
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
keepPackets := &atomicBool{}
keepPackets.set(true)
@@ -1780,3 +1781,76 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) {
closePairNow(t, pcOffer, pcAnswer)
}
// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection
// the user gets retransmitted RTP packets with no extra configuration
func Test_PeerConnection_RTX_E2E(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
wan.AddChunkFilter(func(vnet.Chunk) bool {
return rand.Intn(5) != 4 //nolint: gosec
})
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)
rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()
rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC
rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
pkt, attributes, readRTPErr := track.ReadRTP()
if errors.Is(readRTPErr, io.EOF) {
return
} else if pkt.PayloadType == 0 {
continue
}
assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))
rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
rtxReadCancel()
}
}
})
assert.NoError(t, signalPair(pcOffer, pcAnswer))
func() {
for {
select {
case <-time.After(20 * time.Millisecond):
writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second})
assert.NoError(t, writeErr)
case <-rtxRead.Done():
return
}
}
}()
assert.NoError(t, wan.Stop())
closePairNow(t, pcOffer, pcAnswer)
}

View File

@@ -8,21 +8,16 @@ package webrtc
import (
"context"
"encoding/binary"
"errors"
"io"
"testing"
"time"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/transport/v3/test"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
)
func TestSetRTPParameters(t *testing.T) {
sender, receiver, wan := createVNetPair(t)
sender, receiver, wan := createVNetPair(t, nil)
outgoingTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
@@ -75,89 +70,3 @@ func TestSetRTPParameters(t *testing.T) {
assert.NoError(t, wan.Stop())
closePairNow(t, sender, receiver)
}
// Assert the behavior of reading a RTX with a distinct SSRC
// All the attributes should be populated and the packet unpacked
func Test_RTX_Read(t *testing.T) {
defer test.TimeOut(time.Second * 30).Stop()
pcOffer, pcAnswer, err := newPair()
assert.NoError(t, err)
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
assert.NoError(t, err)
rtpSender, err := pcOffer.AddTrack(track)
assert.NoError(t, err)
rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
ssrc := rtpSender.GetParameters().Encodings[0].SSRC
rtxRead, rtxReadCancel := context.WithCancel(context.Background())
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
for {
pkt, attributes, readRTPErr := track.ReadRTP()
if errors.Is(readRTPErr, io.EOF) {
return
} else if pkt.PayloadType == 0 {
continue
}
assert.NoError(t, readRTPErr)
assert.NotNil(t, pkt)
assert.Equal(t, pkt.SSRC, uint32(ssrc))
assert.Equal(t, pkt.PayloadType, uint8(96))
assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD})
rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
rtxSSRC := attributes.Get(AttributeRtxSsrc)
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
assert.Equal(t, rtxPayloadType, uint8(97))
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500)
rtxReadCancel()
}
}
})
assert.NoError(t, signalPair(pcOffer, pcAnswer))
func() {
for i := uint16(0); ; i++ {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: uint32(ssrc),
PayloadType: 96,
SequenceNumber: i,
},
Payload: []byte{0xB, 0xA, 0xD},
}
select {
case <-time.After(20 * time.Millisecond):
// Send the original packet
err = track.WriteRTP(&pkt)
assert.NoError(t, err)
rtxPayload := []byte{0x0, 0x0, 0xB, 0xA, 0xD}
binary.BigEndian.PutUint16(rtxPayload[0:2], pkt.Header.SequenceNumber)
// Send the RTX
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
Version: 2,
SSRC: uint32(rtxSsrc),
PayloadType: 97,
SequenceNumber: i + 500,
}, rtxPayload)
assert.NoError(t, err)
case <-rtxRead.Done():
return
}
}
}()
closePairNow(t, pcOffer, pcAnswer)
}

View File

@@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/pion/interceptor"
"github.com/pion/transport/v3/test"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
@@ -25,7 +26,7 @@ func Test_RTPReceiver_SetReadDeadline(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
sender, receiver, wan := createVNetPair(t)
sender, receiver, wan := createVNetPair(t, &interceptor.Registry{})
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)

View File

@@ -14,6 +14,7 @@ import (
"testing"
"time"
"github.com/pion/interceptor"
"github.com/pion/transport/v3/test"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/stretchr/testify/assert"
@@ -157,7 +158,7 @@ func Test_RTPSender_SetReadDeadline(t *testing.T) {
report := test.CheckRoutines(t)
defer report()
sender, receiver, wan := createVNetPair(t)
sender, receiver, wan := createVNetPair(t, &interceptor.Registry{})
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)

View File

@@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/assert"
)
func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Router) {
func createVNetPair(t *testing.T, interceptorRegistry *interceptor.Registry) (*PeerConnection, *PeerConnection, *vnet.Router) {
// Create a root router
wan, err := vnet.NewRouter(&vnet.RouterConfig{
CIDR: "1.2.3.0/24",
@@ -53,12 +53,18 @@ func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Route
// Start the virtual network by calling Start() on the root router
assert.NoError(t, wan.Start())
offerInterceptorRegistry := &interceptor.Registry{}
offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine), WithInterceptorRegistry(offerInterceptorRegistry)).NewPeerConnection(Configuration{})
offerOptions := []func(*API){WithSettingEngine(offerSettingEngine)}
if interceptorRegistry != nil {
offerOptions = append(offerOptions, WithInterceptorRegistry(interceptorRegistry))
}
offerPeerConnection, err := NewAPI(offerOptions...).NewPeerConnection(Configuration{})
assert.NoError(t, err)
answerInterceptorRegistry := &interceptor.Registry{}
answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine), WithInterceptorRegistry(answerInterceptorRegistry)).NewPeerConnection(Configuration{})
answerOptions := []func(*API){WithSettingEngine(answerSettingEngine)}
if interceptorRegistry != nil {
answerOptions = append(answerOptions, WithInterceptorRegistry(interceptorRegistry))
}
answerPeerConnection, err := NewAPI(answerOptions...).NewPeerConnection(Configuration{})
assert.NoError(t, err)
return offerPeerConnection, answerPeerConnection, wan