mirror of
https://github.com/pion/webrtc.git
synced 2025-09-27 03:25:58 +08:00

The way currently DTLS fingerprints and ICE credentials are picked is causing interop issues as described in #2621 Peers which don't use Bundle can use different fingerprints and credentials in each media section. Even though is not (yet) supported by Pion, receiving an SDP offer from such a peer is valid. Additionally if Bundle is being used the group attribute determines which media section is the master bundle section, which establishes the transport. Currently Pion always just uses the first credentials/fingerprint it can find in the SDP, which results in not spec compliant behavior. This PR attempts to fix the above issues and make Pion more spec compliant and interoperable. Fixes #2621
1860 lines
50 KiB
Go
1860 lines
50 KiB
Go
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
//go:build !js
|
|
// +build !js
|
|
|
|
package webrtc
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pion/logging"
|
|
"github.com/pion/rtcp"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/sdp/v3"
|
|
"github.com/pion/transport/v3/test"
|
|
"github.com/pion/transport/v3/vnet"
|
|
"github.com/pion/webrtc/v4/internal/util"
|
|
"github.com/pion/webrtc/v4/pkg/media"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
var (
|
|
errIncomingTrackIDInvalid = errors.New("incoming Track ID is invalid")
|
|
errIncomingTrackLabelInvalid = errors.New("incoming Track Label is invalid")
|
|
errNoTransceiverwithMid = errors.New("no transceiver with mid")
|
|
)
|
|
|
|
/*
|
|
Integration test for bi-directional peers
|
|
|
|
This asserts we can send RTP and RTCP both ways, and blocks until
|
|
each side gets something (and asserts payload contents)
|
|
*/
|
|
// nolint: gocyclo
|
|
func TestPeerConnection_Media_Sample(t *testing.T) {
|
|
const (
|
|
expectedTrackID = "video"
|
|
expectedStreamID = "pion"
|
|
)
|
|
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pcOffer, pcAnswer, err := newPair()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
awaitRTPRecv := make(chan bool)
|
|
awaitRTPRecvClosed := make(chan bool)
|
|
awaitRTPSend := make(chan bool)
|
|
|
|
awaitRTCPSenderRecv := make(chan bool)
|
|
awaitRTCPSenderSend := make(chan error)
|
|
|
|
awaitRTCPReceiverRecv := make(chan error)
|
|
awaitRTCPReceiverSend := make(chan error)
|
|
|
|
trackMetadataValid := make(chan error)
|
|
|
|
pcAnswer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
|
|
if track.ID() != expectedTrackID {
|
|
trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackIDInvalid, expectedTrackID, track.ID())
|
|
return
|
|
}
|
|
|
|
if track.StreamID() != expectedStreamID {
|
|
trackMetadataValid <- fmt.Errorf("%w: expected(%s) actual(%s)", errIncomingTrackLabelInvalid, expectedStreamID, track.StreamID())
|
|
return
|
|
}
|
|
close(trackMetadataValid)
|
|
|
|
go func() {
|
|
for {
|
|
time.Sleep(time.Millisecond * 100)
|
|
if routineErr := pcAnswer.WriteRTCP([]rtcp.Packet{&rtcp.RapidResynchronizationRequest{SenderSSRC: uint32(track.SSRC()), MediaSSRC: uint32(track.SSRC())}}); routineErr != nil {
|
|
awaitRTCPReceiverSend <- routineErr
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-awaitRTCPSenderRecv:
|
|
close(awaitRTCPReceiverSend)
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
_, _, routineErr := receiver.Read(make([]byte, 1400))
|
|
if routineErr != nil {
|
|
awaitRTCPReceiverRecv <- routineErr
|
|
} else {
|
|
close(awaitRTCPReceiverRecv)
|
|
}
|
|
}()
|
|
|
|
haveClosedAwaitRTPRecv := false
|
|
for {
|
|
p, _, routineErr := track.ReadRTP()
|
|
if routineErr != nil {
|
|
close(awaitRTPRecvClosed)
|
|
return
|
|
} else if bytes.Equal(p.Payload, []byte{0x10, 0x00}) && !haveClosedAwaitRTPRecv {
|
|
haveClosedAwaitRTPRecv = true
|
|
close(awaitRTPRecv)
|
|
}
|
|
}
|
|
})
|
|
|
|
vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, expectedTrackID, expectedStreamID)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
sender, err := pcOffer.AddTrack(vp8Track)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
time.Sleep(time.Millisecond * 100)
|
|
if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
|
|
continue
|
|
}
|
|
if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
|
|
fmt.Println(routineErr)
|
|
}
|
|
|
|
select {
|
|
case <-awaitRTPRecv:
|
|
close(awaitRTPSend)
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
parameters := sender.GetParameters()
|
|
|
|
for {
|
|
time.Sleep(time.Millisecond * 100)
|
|
if routineErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{SenderSSRC: uint32(parameters.Encodings[0].SSRC), MediaSSRC: uint32(parameters.Encodings[0].SSRC)}}); routineErr != nil {
|
|
awaitRTCPSenderSend <- routineErr
|
|
}
|
|
|
|
select {
|
|
case <-awaitRTCPReceiverRecv:
|
|
close(awaitRTCPSenderSend)
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
|
|
close(awaitRTCPSenderRecv)
|
|
}
|
|
}()
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
err, ok := <-trackMetadataValid
|
|
if ok {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
<-awaitRTPRecv
|
|
<-awaitRTPSend
|
|
|
|
<-awaitRTCPSenderRecv
|
|
if err, ok = <-awaitRTCPSenderSend; ok {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
<-awaitRTCPReceiverRecv
|
|
if err, ok = <-awaitRTCPReceiverSend; ok {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
<-awaitRTPRecvClosed
|
|
}
|
|
|
|
/*
|
|
PeerConnection should be able to be torn down at anytime
|
|
This test adds an input track and asserts
|
|
|
|
* OnTrack doesn't fire since no video packets will arrive
|
|
* No goroutine leaks
|
|
* No deadlocks on shutdown
|
|
*/
|
|
func TestPeerConnection_Media_Shutdown(t *testing.T) {
|
|
iceCompleteAnswer := make(chan struct{})
|
|
iceCompleteOffer := make(chan struct{})
|
|
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pcOffer, pcAnswer, err := newPair()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
_, err = pcOffer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeAudio, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
opusTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeOpus}, "audio", "pion1")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if _, err = pcOffer.AddTrack(opusTrack); err != nil {
|
|
t.Fatal(err)
|
|
} else if _, err = pcAnswer.AddTrack(vp8Track); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var onTrackFiredLock sync.Mutex
|
|
onTrackFired := false
|
|
|
|
pcAnswer.OnTrack(func(*TrackRemote, *RTPReceiver) {
|
|
onTrackFiredLock.Lock()
|
|
defer onTrackFiredLock.Unlock()
|
|
onTrackFired = true
|
|
})
|
|
|
|
pcAnswer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
|
|
if iceState == ICEConnectionStateConnected {
|
|
close(iceCompleteAnswer)
|
|
}
|
|
})
|
|
pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
|
|
if iceState == ICEConnectionStateConnected {
|
|
close(iceCompleteOffer)
|
|
}
|
|
})
|
|
|
|
err = signalPair(pcOffer, pcAnswer)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
<-iceCompleteAnswer
|
|
<-iceCompleteOffer
|
|
|
|
// Each PeerConnection should have one sender, one receiver and one transceiver
|
|
for _, pc := range []*PeerConnection{pcOffer, pcAnswer} {
|
|
senders := pc.GetSenders()
|
|
if len(senders) != 1 {
|
|
t.Errorf("Each PeerConnection should have one RTPSender, we have %d", len(senders))
|
|
}
|
|
|
|
receivers := pc.GetReceivers()
|
|
if len(receivers) != 2 {
|
|
t.Errorf("Each PeerConnection should have two RTPReceivers, we have %d", len(receivers))
|
|
}
|
|
|
|
transceivers := pc.GetTransceivers()
|
|
if len(transceivers) != 2 {
|
|
t.Errorf("Each PeerConnection should have two RTPTransceivers, we have %d", len(transceivers))
|
|
}
|
|
}
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
|
|
onTrackFiredLock.Lock()
|
|
if onTrackFired {
|
|
t.Fatalf("PeerConnection OnTrack fired even though we got no packets")
|
|
}
|
|
onTrackFiredLock.Unlock()
|
|
}
|
|
|
|
/*
|
|
Integration test for behavior around media and disconnected peers
|
|
|
|
* Sending RTP and RTCP to a disconnected Peer shouldn't return an error
|
|
*/
|
|
func TestPeerConnection_Media_Disconnected(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
s := SettingEngine{}
|
|
s.SetICETimeouts(time.Second/2, time.Second/2, time.Second/8)
|
|
|
|
m := &MediaEngine{}
|
|
assert.NoError(t, m.RegisterDefaultCodecs())
|
|
|
|
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
|
|
|
|
keepPackets := &atomicBool{}
|
|
keepPackets.set(true)
|
|
|
|
// Add a filter that monitors the traffic on the router
|
|
wan.AddChunkFilter(func(vnet.Chunk) bool {
|
|
return keepPackets.get()
|
|
})
|
|
|
|
vp8Track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
vp8Sender, err := pcOffer.AddTrack(vp8Track)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
haveDisconnected := make(chan error)
|
|
pcOffer.OnICEConnectionStateChange(func(iceState ICEConnectionState) {
|
|
if iceState == ICEConnectionStateDisconnected {
|
|
close(haveDisconnected)
|
|
} else if iceState == ICEConnectionStateConnected {
|
|
// Assert that DTLS is done by pull remote certificate, don't tear down the PC early
|
|
for {
|
|
if len(vp8Sender.Transport().GetRemoteCertificate()) != 0 {
|
|
if pcAnswer.sctpTransport.association() != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
}
|
|
|
|
keepPackets.set(false)
|
|
}
|
|
})
|
|
|
|
if err = signalPair(pcOffer, pcAnswer); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err, ok := <-haveDisconnected
|
|
if ok {
|
|
t.Fatal(err)
|
|
}
|
|
for i := 0; i <= 5; i++ {
|
|
if rtpErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); rtpErr != nil {
|
|
t.Fatal(rtpErr)
|
|
} else if rtcpErr := pcOffer.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: 0}}); rtcpErr != nil {
|
|
t.Fatal(rtcpErr)
|
|
}
|
|
}
|
|
|
|
assert.NoError(t, wan.Stop())
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
}
|
|
|
|
type undeclaredSsrcLogger struct{ unhandledSimulcastError chan struct{} }
|
|
|
|
func (u *undeclaredSsrcLogger) Trace(string) {}
|
|
func (u *undeclaredSsrcLogger) Tracef(string, ...interface{}) {}
|
|
func (u *undeclaredSsrcLogger) Debug(string) {}
|
|
func (u *undeclaredSsrcLogger) Debugf(string, ...interface{}) {}
|
|
func (u *undeclaredSsrcLogger) Info(string) {}
|
|
func (u *undeclaredSsrcLogger) Infof(string, ...interface{}) {}
|
|
func (u *undeclaredSsrcLogger) Warn(string) {}
|
|
func (u *undeclaredSsrcLogger) Warnf(string, ...interface{}) {}
|
|
func (u *undeclaredSsrcLogger) Error(string) {}
|
|
func (u *undeclaredSsrcLogger) Errorf(format string, _ ...interface{}) {
|
|
if format == incomingUnhandledRTPSsrc {
|
|
close(u.unhandledSimulcastError)
|
|
}
|
|
}
|
|
|
|
type undeclaredSsrcLoggerFactory struct{ unhandledSimulcastError chan struct{} }
|
|
|
|
func (u *undeclaredSsrcLoggerFactory) NewLogger(string) logging.LeveledLogger {
|
|
return &undeclaredSsrcLogger{u.unhandledSimulcastError}
|
|
}
|
|
|
|
// Filter SSRC lines
|
|
func filterSsrc(offer string) (filteredSDP string) {
|
|
scanner := bufio.NewScanner(strings.NewReader(offer))
|
|
for scanner.Scan() {
|
|
l := scanner.Text()
|
|
if strings.HasPrefix(l, "a=ssrc") {
|
|
continue
|
|
}
|
|
|
|
filteredSDP += l + "\n"
|
|
}
|
|
return
|
|
}
|
|
|
|
// If a SessionDescription has a single media section and no SSRC
|
|
// assume that it is meant to handle all RTP packets
|
|
func TestUndeclaredSSRC(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
t.Run("No SSRC", func(t *testing.T) {
|
|
pcOffer, pcAnswer, err := newPair()
|
|
assert.NoError(t, err)
|
|
|
|
vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pcOffer.AddTrack(vp8Writer)
|
|
assert.NoError(t, err)
|
|
|
|
onTrackFired := make(chan struct{})
|
|
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
|
|
assert.Equal(t, trackRemote.StreamID(), vp8Writer.StreamID())
|
|
assert.Equal(t, trackRemote.ID(), vp8Writer.ID())
|
|
close(onTrackFired)
|
|
})
|
|
|
|
offer, err := pcOffer.CreateOffer(nil)
|
|
assert.NoError(t, err)
|
|
|
|
offerGatheringComplete := GatheringCompletePromise(pcOffer)
|
|
assert.NoError(t, pcOffer.SetLocalDescription(offer))
|
|
<-offerGatheringComplete
|
|
|
|
offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP)
|
|
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
|
|
|
|
answer, err := pcAnswer.CreateAnswer(nil)
|
|
assert.NoError(t, err)
|
|
|
|
answerGatheringComplete := GatheringCompletePromise(pcAnswer)
|
|
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
|
|
<-answerGatheringComplete
|
|
|
|
assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
|
|
|
|
sendVideoUntilDone(onTrackFired, t, []*TrackLocalStaticSample{vp8Writer})
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
})
|
|
|
|
t.Run("Has RID", func(t *testing.T) {
|
|
unhandledSimulcastError := make(chan struct{})
|
|
|
|
m := &MediaEngine{}
|
|
assert.NoError(t, m.RegisterDefaultCodecs())
|
|
|
|
pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
|
|
LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
|
|
}), WithMediaEngine(m)).newPair(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
vp8Writer, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pcOffer.AddTrack(vp8Writer)
|
|
assert.NoError(t, err)
|
|
|
|
offer, err := pcOffer.CreateOffer(nil)
|
|
assert.NoError(t, err)
|
|
|
|
offerGatheringComplete := GatheringCompletePromise(pcOffer)
|
|
assert.NoError(t, pcOffer.SetLocalDescription(offer))
|
|
<-offerGatheringComplete
|
|
|
|
// Append RID to end of SessionDescription. Will not be considered unhandled anymore
|
|
offer.SDP = filterSsrc(pcOffer.LocalDescription().SDP) + "a=" + sdpAttributeRid + "\r\n"
|
|
assert.NoError(t, pcAnswer.SetRemoteDescription(offer))
|
|
|
|
answer, err := pcAnswer.CreateAnswer(nil)
|
|
assert.NoError(t, err)
|
|
|
|
answerGatheringComplete := GatheringCompletePromise(pcAnswer)
|
|
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
|
|
<-answerGatheringComplete
|
|
|
|
assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
|
|
|
|
sendVideoUntilDone(unhandledSimulcastError, t, []*TrackLocalStaticSample{vp8Writer})
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
})
|
|
}
|
|
|
|
func TestAddTransceiverFromTrackSendOnly(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
track, err := NewTrackLocalStaticSample(
|
|
RTPCodecCapability{MimeType: "audio/Opus"},
|
|
"track-id",
|
|
"stream-id",
|
|
)
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
|
|
Direction: RTPTransceiverDirectionSendonly,
|
|
})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
if transceiver.Receiver() != nil {
|
|
t.Errorf("Transceiver shouldn't have a receiver")
|
|
}
|
|
|
|
if transceiver.Sender() == nil {
|
|
t.Errorf("Transceiver should have a sender")
|
|
}
|
|
|
|
if len(pc.GetTransceivers()) != 1 {
|
|
t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
|
|
}
|
|
|
|
if len(pc.GetSenders()) != 1 {
|
|
t.Errorf("PeerConnection should have one sender but has %d", len(pc.GetSenders()))
|
|
}
|
|
|
|
offer, err := pc.CreateOffer(nil)
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendonly) {
|
|
t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendonly)
|
|
}
|
|
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestAddTransceiverFromTrackSendRecv(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
track, err := NewTrackLocalStaticSample(
|
|
RTPCodecCapability{MimeType: "audio/Opus"},
|
|
"track-id",
|
|
"stream-id",
|
|
)
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
|
|
Direction: RTPTransceiverDirectionSendrecv,
|
|
})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
if transceiver.Receiver() == nil {
|
|
t.Errorf("Transceiver should have a receiver")
|
|
}
|
|
|
|
if transceiver.Sender() == nil {
|
|
t.Errorf("Transceiver should have a sender")
|
|
}
|
|
|
|
if len(pc.GetTransceivers()) != 1 {
|
|
t.Errorf("PeerConnection should have one transceiver but has %d", len(pc.GetTransceivers()))
|
|
}
|
|
|
|
offer, err := pc.CreateOffer(nil)
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
if !offerMediaHasDirection(offer, RTPCodecTypeAudio, RTPTransceiverDirectionSendrecv) {
|
|
t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionSendrecv)
|
|
}
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestAddTransceiverAddTrack_Reuse(t *testing.T) {
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
tr, err := pc.AddTransceiverFromKind(
|
|
RTPCodecTypeVideo,
|
|
RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
|
|
)
|
|
assert.NoError(t, err)
|
|
|
|
assert.Equal(t, []*RTPTransceiver{tr}, pc.GetTransceivers())
|
|
|
|
addTrack := func() (TrackLocal, *RTPSender) {
|
|
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
|
|
assert.NoError(t, err)
|
|
|
|
sender, err := pc.AddTrack(track)
|
|
assert.NoError(t, err)
|
|
|
|
return track, sender
|
|
}
|
|
|
|
track1, sender1 := addTrack()
|
|
assert.Equal(t, 1, len(pc.GetTransceivers()))
|
|
assert.Equal(t, sender1, tr.Sender())
|
|
assert.Equal(t, track1, tr.Sender().Track())
|
|
require.NoError(t, pc.RemoveTrack(sender1))
|
|
|
|
track2, _ := addTrack()
|
|
assert.Equal(t, 1, len(pc.GetTransceivers()))
|
|
assert.Equal(t, track2, tr.Sender().Track())
|
|
|
|
addTrack()
|
|
assert.Equal(t, 2, len(pc.GetTransceivers()))
|
|
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestAddTransceiverAddTrack_NewRTPSender_Error(t *testing.T) {
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pc.AddTransceiverFromKind(
|
|
RTPCodecTypeVideo,
|
|
RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly},
|
|
)
|
|
assert.NoError(t, err)
|
|
|
|
dtlsTransport := pc.dtlsTransport
|
|
pc.dtlsTransport = nil
|
|
|
|
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pc.AddTrack(track)
|
|
assert.Error(t, err, "DTLSTransport must not be nil")
|
|
|
|
assert.Equal(t, 1, len(pc.GetTransceivers()))
|
|
|
|
pc.dtlsTransport = dtlsTransport
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestRtpSenderReceiver_ReadClose_Error(t *testing.T) {
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
tr, err := pc.AddTransceiverFromKind(
|
|
RTPCodecTypeVideo,
|
|
RTPTransceiverInit{Direction: RTPTransceiverDirectionSendrecv},
|
|
)
|
|
assert.NoError(t, err)
|
|
|
|
sender, receiver := tr.Sender(), tr.Receiver()
|
|
assert.NoError(t, sender.Stop())
|
|
_, _, err = sender.Read(make([]byte, 0, 1400))
|
|
assert.ErrorIs(t, err, io.ErrClosedPipe)
|
|
|
|
assert.NoError(t, receiver.Stop())
|
|
_, _, err = receiver.Read(make([]byte, 0, 1400))
|
|
assert.ErrorIs(t, err, io.ErrClosedPipe)
|
|
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
// nolint: dupl
|
|
func TestAddTransceiverFromKind(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
transceiver, err := pc.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{
|
|
Direction: RTPTransceiverDirectionRecvonly,
|
|
})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
if transceiver.Receiver() == nil {
|
|
t.Errorf("Transceiver should have a receiver")
|
|
}
|
|
|
|
if transceiver.Sender() != nil {
|
|
t.Errorf("Transceiver shouldn't have a sender")
|
|
}
|
|
|
|
offer, err := pc.CreateOffer(nil)
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
if !offerMediaHasDirection(offer, RTPCodecTypeVideo, RTPTransceiverDirectionRecvonly) {
|
|
t.Errorf("Direction on SDP is not %s", RTPTransceiverDirectionRecvonly)
|
|
}
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestAddTransceiverFromTrackFailsRecvOnly(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
track, err := NewTrackLocalStaticSample(
|
|
RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f"},
|
|
"track-id",
|
|
"track-label",
|
|
)
|
|
if err != nil {
|
|
t.Error(err.Error())
|
|
}
|
|
|
|
transceiver, err := pc.AddTransceiverFromTrack(track, RTPTransceiverInit{
|
|
Direction: RTPTransceiverDirectionRecvonly,
|
|
})
|
|
|
|
if transceiver != nil {
|
|
t.Error("AddTransceiverFromTrack shouldn't succeed with Direction RTPTransceiverDirectionRecvonly")
|
|
}
|
|
|
|
assert.NotNil(t, err)
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestPlanBMediaExchange(t *testing.T) {
|
|
runTest := func(trackCount int, t *testing.T) {
|
|
addSingleTrack := func(p *PeerConnection) *TrackLocalStaticSample {
|
|
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, fmt.Sprintf("video-%d", util.RandUint32()), fmt.Sprintf("video-%d", util.RandUint32()))
|
|
assert.NoError(t, err)
|
|
|
|
_, err = p.AddTrack(track)
|
|
assert.NoError(t, err)
|
|
|
|
return track
|
|
}
|
|
|
|
pcOffer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
|
|
assert.NoError(t, err)
|
|
|
|
pcAnswer, err := NewPeerConnection(Configuration{SDPSemantics: SDPSemanticsPlanB})
|
|
assert.NoError(t, err)
|
|
|
|
var onTrackWaitGroup sync.WaitGroup
|
|
onTrackWaitGroup.Add(trackCount)
|
|
pcAnswer.OnTrack(func(*TrackRemote, *RTPReceiver) {
|
|
onTrackWaitGroup.Done()
|
|
})
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
onTrackWaitGroup.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo)
|
|
assert.NoError(t, err)
|
|
|
|
outboundTracks := []*TrackLocalStaticSample{}
|
|
for i := 0; i < trackCount; i++ {
|
|
outboundTracks = append(outboundTracks, addSingleTrack(pcOffer))
|
|
}
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
func() {
|
|
for {
|
|
select {
|
|
case <-time.After(20 * time.Millisecond):
|
|
for _, track := range outboundTracks {
|
|
assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}))
|
|
}
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
}
|
|
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
t.Run("Single Track", func(t *testing.T) {
|
|
runTest(1, t)
|
|
})
|
|
t.Run("Multi Track", func(t *testing.T) {
|
|
runTest(2, t)
|
|
})
|
|
}
|
|
|
|
// TestPeerConnection_Start_Only_Negotiated_Senders tests that only
|
|
// the current negotiated transceivers senders provided in an
|
|
// offer/answer are started
|
|
func TestPeerConnection_Start_Only_Negotiated_Senders(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pcOffer, err := NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
defer func() { assert.NoError(t, pcOffer.Close()) }()
|
|
|
|
pcAnswer, err := NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
defer func() { assert.NoError(t, pcAnswer.Close()) }()
|
|
|
|
track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
|
|
require.NoError(t, err)
|
|
|
|
sender1, err := pcOffer.AddTrack(track1)
|
|
require.NoError(t, err)
|
|
|
|
offer, err := pcOffer.CreateOffer(nil)
|
|
assert.NoError(t, err)
|
|
|
|
offerGatheringComplete := GatheringCompletePromise(pcOffer)
|
|
assert.NoError(t, pcOffer.SetLocalDescription(offer))
|
|
<-offerGatheringComplete
|
|
assert.NoError(t, pcAnswer.SetRemoteDescription(*pcOffer.LocalDescription()))
|
|
answer, err := pcAnswer.CreateAnswer(nil)
|
|
assert.NoError(t, err)
|
|
answerGatheringComplete := GatheringCompletePromise(pcAnswer)
|
|
assert.NoError(t, pcAnswer.SetLocalDescription(answer))
|
|
<-answerGatheringComplete
|
|
|
|
// Add a new track between providing the offer and applying the answer
|
|
|
|
track2, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2")
|
|
require.NoError(t, err)
|
|
|
|
sender2, err := pcOffer.AddTrack(track2)
|
|
require.NoError(t, err)
|
|
|
|
// apply answer so we'll test generateMatchedSDP
|
|
assert.NoError(t, pcOffer.SetRemoteDescription(*pcAnswer.LocalDescription()))
|
|
|
|
// Wait for senders to be started by startTransports spawned goroutine
|
|
pcOffer.ops.Done()
|
|
|
|
// sender1 should be started but sender2 should not be started
|
|
assert.True(t, sender1.hasSent(), "sender1 is not started but should be started")
|
|
assert.False(t, sender2.hasSent(), "sender2 is started but should not be started")
|
|
}
|
|
|
|
// TestPeerConnection_Start_Right_Receiver tests that the right
|
|
// receiver (the receiver which transceiver has the same media section as the track)
|
|
// is started for the specified track
|
|
func TestPeerConnection_Start_Right_Receiver(t *testing.T) {
|
|
isTransceiverReceiverStarted := func(pc *PeerConnection, mid string) (bool, error) {
|
|
for _, transceiver := range pc.GetTransceivers() {
|
|
if transceiver.Mid() != mid {
|
|
continue
|
|
}
|
|
return transceiver.Receiver() != nil && transceiver.Receiver().haveReceived(), nil
|
|
}
|
|
return false, fmt.Errorf("%w: %q", errNoTransceiverwithMid, mid)
|
|
}
|
|
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pcOffer, pcAnswer, err := newPair()
|
|
require.NoError(t, err)
|
|
|
|
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
|
|
assert.NoError(t, err)
|
|
|
|
track1, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1")
|
|
require.NoError(t, err)
|
|
|
|
sender1, err := pcOffer.AddTrack(track1)
|
|
require.NoError(t, err)
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
pcOffer.ops.Done()
|
|
pcAnswer.ops.Done()
|
|
|
|
// transceiver with mid 0 should be started
|
|
started, err := isTransceiverReceiverStarted(pcAnswer, "0")
|
|
assert.NoError(t, err)
|
|
assert.True(t, started, "transceiver with mid 0 should be started")
|
|
|
|
// Remove track
|
|
assert.NoError(t, pcOffer.RemoveTrack(sender1))
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
pcOffer.ops.Done()
|
|
pcAnswer.ops.Done()
|
|
|
|
// transceiver with mid 0 should not be started
|
|
started, err = isTransceiverReceiverStarted(pcAnswer, "0")
|
|
assert.NoError(t, err)
|
|
assert.False(t, started, "transceiver with mid 0 should not be started")
|
|
|
|
// Add a new transceiver (we're not using AddTrack since it'll reuse the transceiver with mid 0)
|
|
_, err = pcOffer.AddTransceiverFromTrack(track1)
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pcAnswer.AddTransceiverFromKind(RTPCodecTypeVideo, RTPTransceiverInit{Direction: RTPTransceiverDirectionRecvonly})
|
|
assert.NoError(t, err)
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
pcOffer.ops.Done()
|
|
pcAnswer.ops.Done()
|
|
|
|
// transceiver with mid 0 should not be started
|
|
started, err = isTransceiverReceiverStarted(pcAnswer, "0")
|
|
assert.NoError(t, err)
|
|
assert.False(t, started, "transceiver with mid 0 should not be started")
|
|
// transceiver with mid 2 should be started
|
|
started, err = isTransceiverReceiverStarted(pcAnswer, "2")
|
|
assert.NoError(t, err)
|
|
assert.True(t, started, "transceiver with mid 2 should be started")
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
}
|
|
|
|
func TestPeerConnection_Simulcast_Probe(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30) //nolint
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
// Assert that failed Simulcast probing doesn't cause
|
|
// the handleUndeclaredSSRC to be leaked
|
|
t.Run("Leak", func(t *testing.T) {
|
|
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
|
|
assert.NoError(t, err)
|
|
|
|
offerer, answerer, err := newPair()
|
|
assert.NoError(t, err)
|
|
|
|
_, err = offerer.AddTrack(track)
|
|
assert.NoError(t, err)
|
|
|
|
ticker := time.NewTicker(time.Millisecond * 20)
|
|
defer ticker.Stop()
|
|
testFinished := make(chan struct{})
|
|
seenFiveStreams, seenFiveStreamsCancel := context.WithCancel(context.Background())
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-testFinished:
|
|
return
|
|
case <-ticker.C:
|
|
answerer.dtlsTransport.lock.Lock()
|
|
if len(answerer.dtlsTransport.simulcastStreams) >= 5 {
|
|
seenFiveStreamsCancel()
|
|
}
|
|
answerer.dtlsTransport.lock.Unlock()
|
|
|
|
track.mu.Lock()
|
|
if len(track.bindings) == 1 {
|
|
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
|
|
Version: 2,
|
|
SSRC: util.RandUint32(),
|
|
}, []byte{0, 1, 2, 3, 4, 5})
|
|
assert.NoError(t, err)
|
|
}
|
|
track.mu.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
assert.NoError(t, signalPair(offerer, answerer))
|
|
|
|
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer)
|
|
peerConnectionConnected.Wait()
|
|
|
|
<-seenFiveStreams.Done()
|
|
|
|
closePairNow(t, offerer, answerer)
|
|
close(testFinished)
|
|
})
|
|
|
|
// Assert that NonSimulcast Traffic isn't incorrectly broken by the probe
|
|
t.Run("Break NonSimulcast", func(t *testing.T) {
|
|
unhandledSimulcastError := make(chan struct{})
|
|
|
|
m := &MediaEngine{}
|
|
assert.NoError(t, m.RegisterDefaultCodecs())
|
|
assert.NoError(t, ConfigureSimulcastExtensionHeaders(m))
|
|
|
|
pcOffer, pcAnswer, err := NewAPI(WithSettingEngine(SettingEngine{
|
|
LoggerFactory: &undeclaredSsrcLoggerFactory{unhandledSimulcastError},
|
|
}), WithMediaEngine(m)).newPair(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
firstTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "firstTrack", "firstTrack")
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pcOffer.AddTrack(firstTrack)
|
|
assert.NoError(t, err)
|
|
|
|
secondTrack, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "secondTrack", "secondTrack")
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pcOffer.AddTrack(secondTrack)
|
|
assert.NoError(t, err)
|
|
|
|
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sessionDescription string) (filtered string) {
|
|
shouldDiscard := false
|
|
|
|
scanner := bufio.NewScanner(strings.NewReader(sessionDescription))
|
|
for scanner.Scan() {
|
|
if strings.HasPrefix(scanner.Text(), "m=video") {
|
|
shouldDiscard = !shouldDiscard
|
|
} else if strings.HasPrefix(scanner.Text(), "a=group:BUNDLE") {
|
|
filtered += "a=group:BUNDLE 1 2\r\n"
|
|
continue
|
|
}
|
|
|
|
if !shouldDiscard {
|
|
filtered += scanner.Text() + "\r\n"
|
|
}
|
|
}
|
|
return
|
|
}))
|
|
|
|
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, pcOffer, pcAnswer)
|
|
peerConnectionConnected.Wait()
|
|
|
|
sequenceNumber := uint16(0)
|
|
sendRTPPacket := func() {
|
|
sequenceNumber++
|
|
assert.NoError(t, firstTrack.WriteRTP(&rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
},
|
|
Payload: []byte{0x00},
|
|
}))
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
|
|
for ; sequenceNumber <= 5; sequenceNumber++ {
|
|
sendRTPPacket()
|
|
}
|
|
|
|
trackRemoteChan := make(chan *TrackRemote, 1)
|
|
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
|
|
trackRemoteChan <- trackRemote
|
|
})
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
trackRemote := func() *TrackRemote {
|
|
for {
|
|
select {
|
|
case t := <-trackRemoteChan:
|
|
return t
|
|
default:
|
|
sendRTPPacket()
|
|
}
|
|
}
|
|
}()
|
|
|
|
func() {
|
|
for {
|
|
select {
|
|
case <-unhandledSimulcastError:
|
|
return
|
|
default:
|
|
sendRTPPacket()
|
|
}
|
|
}
|
|
}()
|
|
|
|
_, _, err = trackRemote.Read(make([]byte, 1500))
|
|
assert.NoError(t, err)
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
})
|
|
}
|
|
|
|
// Assert that CreateOffer returns an error for a RTPSender with no codecs
|
|
// pion/webrtc#1702
|
|
func TestPeerConnection_CreateOffer_NoCodecs(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
m := &MediaEngine{}
|
|
|
|
pc, err := NewAPI(WithMediaEngine(m)).NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pc.AddTrack(track)
|
|
assert.NoError(t, err)
|
|
|
|
_, err = pc.CreateOffer(nil)
|
|
assert.Equal(t, err, ErrSenderWithNoCodecs)
|
|
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
// Assert that AddTrack is thread-safe
|
|
func TestPeerConnection_RaceReplaceTrack(t *testing.T) {
|
|
pc, err := NewPeerConnection(Configuration{})
|
|
assert.NoError(t, err)
|
|
|
|
addTrack := func() *TrackLocalStaticSample {
|
|
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "foo", "bar")
|
|
assert.NoError(t, err)
|
|
_, err = pc.AddTrack(track)
|
|
assert.NoError(t, err)
|
|
return track
|
|
}
|
|
|
|
for i := 0; i < 10; i++ {
|
|
addTrack()
|
|
}
|
|
for _, tr := range pc.GetTransceivers() {
|
|
assert.NoError(t, pc.RemoveTrack(tr.Sender()))
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
tracks := make([]*TrackLocalStaticSample, 10)
|
|
wg.Add(10)
|
|
for i := 0; i < 10; i++ {
|
|
go func(j int) {
|
|
tracks[j] = addTrack()
|
|
wg.Done()
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
for _, track := range tracks {
|
|
have := false
|
|
for _, t := range pc.GetTransceivers() {
|
|
if t.Sender() != nil && t.Sender().Track() == track {
|
|
have = true
|
|
break
|
|
}
|
|
}
|
|
if !have {
|
|
t.Errorf("track was added but not found on senders")
|
|
}
|
|
}
|
|
|
|
assert.NoError(t, pc.Close())
|
|
}
|
|
|
|
func TestPeerConnection_Simulcast(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
rids := []string{"a", "b", "c"}
|
|
|
|
t.Run("E2E", func(t *testing.T) {
|
|
pcOffer, pcAnswer, err := newPair()
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]))
|
|
assert.NoError(t, err)
|
|
|
|
sender, err := pcOffer.AddTrack(vp8WriterA)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, sender)
|
|
|
|
assert.NoError(t, sender.AddEncoding(vp8WriterB))
|
|
assert.NoError(t, sender.AddEncoding(vp8WriterC))
|
|
|
|
var ridMapLock sync.RWMutex
|
|
ridMap := map[string]int{}
|
|
|
|
assertRidCorrect := func(t *testing.T) {
|
|
ridMapLock.Lock()
|
|
defer ridMapLock.Unlock()
|
|
|
|
for _, rid := range rids {
|
|
assert.Equal(t, ridMap[rid], 1)
|
|
}
|
|
assert.Equal(t, len(ridMap), 3)
|
|
}
|
|
|
|
ridsFullfilled := func() bool {
|
|
ridMapLock.Lock()
|
|
defer ridMapLock.Unlock()
|
|
|
|
ridCount := len(ridMap)
|
|
return ridCount == 3
|
|
}
|
|
|
|
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
|
|
ridMapLock.Lock()
|
|
defer ridMapLock.Unlock()
|
|
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
|
|
})
|
|
|
|
parameters := sender.GetParameters()
|
|
assert.Equal(t, "a", parameters.Encodings[0].RID)
|
|
assert.Equal(t, "b", parameters.Encodings[1].RID)
|
|
assert.Equal(t, "c", parameters.Encodings[2].RID)
|
|
|
|
var midID, ridID uint8
|
|
for _, extension := range parameters.HeaderExtensions {
|
|
switch extension.URI {
|
|
case sdp.SDESMidURI:
|
|
midID = uint8(extension.ID)
|
|
case sdp.SDESRTPStreamIDURI:
|
|
ridID = uint8(extension.ID)
|
|
}
|
|
}
|
|
assert.NotZero(t, midID)
|
|
assert.NotZero(t, ridID)
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
// padding only packets should not affect simulcast probe
|
|
var sequenceNumber uint16
|
|
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 96,
|
|
Padding: true,
|
|
},
|
|
Payload: []byte{0x00, 0x02},
|
|
}
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
|
|
|
|
for ; !ridsFullfilled(); sequenceNumber++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 96,
|
|
},
|
|
Payload: []byte{0x00},
|
|
}
|
|
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
|
|
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
|
|
assertRidCorrect(t)
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
})
|
|
|
|
t.Run("RTCP", func(t *testing.T) {
|
|
pcOffer, pcAnswer, err := newPair()
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]))
|
|
assert.NoError(t, err)
|
|
|
|
sender, err := pcOffer.AddTrack(vp8WriterA)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, sender)
|
|
|
|
assert.NoError(t, sender.AddEncoding(vp8WriterB))
|
|
assert.NoError(t, sender.AddEncoding(vp8WriterC))
|
|
|
|
rtcpCounter := uint64(0)
|
|
pcAnswer.OnTrack(func(trackRemote *TrackRemote, receiver *RTPReceiver) {
|
|
_, _, simulcastReadErr := receiver.ReadSimulcastRTCP(trackRemote.RID())
|
|
assert.NoError(t, simulcastReadErr)
|
|
atomic.AddUint64(&rtcpCounter, 1)
|
|
})
|
|
|
|
var midID, ridID uint8
|
|
for _, extension := range sender.GetParameters().HeaderExtensions {
|
|
switch extension.URI {
|
|
case sdp.SDESMidURI:
|
|
midID = uint8(extension.ID)
|
|
case sdp.SDESRTPStreamIDURI:
|
|
ridID = uint8(extension.ID)
|
|
}
|
|
}
|
|
assert.NotZero(t, midID)
|
|
assert.NotZero(t, ridID)
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
for sequenceNumber := uint16(0); atomic.LoadUint64(&rtcpCounter) < 3; sequenceNumber++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for _, track := range []*TrackLocalStaticRTP{vp8WriterA, vp8WriterB, vp8WriterC} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 96,
|
|
},
|
|
Payload: []byte{0x00},
|
|
}
|
|
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
|
|
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
})
|
|
}
|
|
|
|
type simulcastTestTrackLocal struct {
|
|
*TrackLocalStaticRTP
|
|
}
|
|
|
|
// don't use ssrc&payload in bindings to let the test write different stream packets.
|
|
func (s *simulcastTestTrackLocal) WriteRTP(pkt *rtp.Packet) error {
|
|
packet := getPacketAllocationFromPool()
|
|
|
|
defer resetPacketPoolAllocation(packet)
|
|
|
|
*packet = *pkt
|
|
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
writeErrs := []error{}
|
|
|
|
for _, b := range s.bindings {
|
|
if _, err := b.writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
|
|
writeErrs = append(writeErrs, err)
|
|
}
|
|
}
|
|
|
|
return util.FlattenErrs(writeErrs)
|
|
}
|
|
|
|
func TestPeerConnection_Simulcast_RTX(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
rids := []string{"a", "b"}
|
|
pcOffer, pcAnswer, err := newPair()
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterAStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterBStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterA, vp8WriterB := &simulcastTestTrackLocal{vp8WriterAStatic}, &simulcastTestTrackLocal{vp8WriterBStatic}
|
|
|
|
sender, err := pcOffer.AddTrack(vp8WriterA)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, sender)
|
|
|
|
assert.NoError(t, sender.AddEncoding(vp8WriterB))
|
|
|
|
var ridMapLock sync.RWMutex
|
|
ridMap := map[string]int{}
|
|
|
|
assertRidCorrect := func(t *testing.T) {
|
|
ridMapLock.Lock()
|
|
defer ridMapLock.Unlock()
|
|
|
|
for _, rid := range rids {
|
|
assert.Equal(t, ridMap[rid], 1)
|
|
}
|
|
assert.Equal(t, len(ridMap), 2)
|
|
}
|
|
|
|
ridsFullfilled := func() bool {
|
|
ridMapLock.Lock()
|
|
defer ridMapLock.Unlock()
|
|
|
|
ridCount := len(ridMap)
|
|
return ridCount == 2
|
|
}
|
|
|
|
var rtxPacketRead atomic.Int32
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
|
|
ridMapLock.Lock()
|
|
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
|
|
ridMapLock.Unlock()
|
|
|
|
defer wg.Done()
|
|
|
|
for {
|
|
_, attr, rerr := trackRemote.ReadRTP()
|
|
if rerr != nil {
|
|
break
|
|
}
|
|
if pt, ok := attr.Get(AttributeRtxPayloadType).(byte); ok {
|
|
if pt == 97 {
|
|
rtxPacketRead.Add(1)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
parameters := sender.GetParameters()
|
|
assert.Equal(t, "a", parameters.Encodings[0].RID)
|
|
assert.Equal(t, "b", parameters.Encodings[1].RID)
|
|
|
|
var midID, ridID, rsid uint8
|
|
for _, extension := range parameters.HeaderExtensions {
|
|
switch extension.URI {
|
|
case sdp.SDESMidURI:
|
|
midID = uint8(extension.ID)
|
|
case sdp.SDESRTPStreamIDURI:
|
|
ridID = uint8(extension.ID)
|
|
case sdesRepairRTPStreamIDURI:
|
|
rsid = uint8(extension.ID)
|
|
}
|
|
}
|
|
assert.NotZero(t, midID)
|
|
assert.NotZero(t, ridID)
|
|
assert.NotZero(t, rsid)
|
|
|
|
err = signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
|
|
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
|
|
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
|
|
res := re.ReplaceAllString(sdp, "")
|
|
return res
|
|
})
|
|
assert.NoError(t, err)
|
|
|
|
// padding only packets should not affect simulcast probe
|
|
var sequenceNumber uint16
|
|
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 96,
|
|
Padding: true,
|
|
SSRC: uint32(i + 1),
|
|
},
|
|
Payload: []byte{0x00, 0x02},
|
|
}
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
|
|
|
|
for ; !ridsFullfilled(); sequenceNumber++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 96,
|
|
SSRC: uint32(i + 1),
|
|
},
|
|
Payload: []byte{0x00},
|
|
}
|
|
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
|
|
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
|
|
assertRidCorrect(t)
|
|
|
|
for i := 0; i < simulcastProbeCount+10; i++ {
|
|
sequenceNumber++
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
for j, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 97,
|
|
SSRC: uint32(100 + j),
|
|
},
|
|
Payload: []byte{0x00, 0x00, 0x00, 0x00, 0x00},
|
|
}
|
|
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
|
|
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
|
|
assert.NoError(t, pkt.Header.SetExtension(rsid, []byte(track.RID())))
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
|
|
for ; rtxPacketRead.Load() == 0; sequenceNumber++ {
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: sequenceNumber,
|
|
PayloadType: 96,
|
|
SSRC: uint32(i + 1),
|
|
},
|
|
Payload: []byte{0x00},
|
|
}
|
|
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
|
|
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
|
|
|
|
assert.NoError(t, track.WriteRTP(pkt))
|
|
}
|
|
}
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
|
|
wg.Wait()
|
|
|
|
assert.Greater(t, rtxPacketRead.Load(), int32(0), "no rtx packet read")
|
|
}
|
|
|
|
// Everytime we receive a new SSRC we probe it and try to determine the proper way to handle it.
|
|
// In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
|
|
// know the SSRC ahead of time
|
|
// * Undeclared SSRC in a single media section (https://github.com/pion/webrtc/issues/880)
|
|
// * Simulcast
|
|
//
|
|
// The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only
|
|
// contained one Media Section we would never fire the OnTrack. We would assume it was a failed
|
|
// Undeclared SSRC processing. This test asserts that we properly handled this.
|
|
func TestPeerConnection_Simulcast_NoDataChannel(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 30)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pcSender, pcReceiver, err := newPair()
|
|
assert.NoError(t, err)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(4)
|
|
|
|
var connectionWg sync.WaitGroup
|
|
connectionWg.Add(2)
|
|
|
|
connectionStateChangeHandler := func(state PeerConnectionState) {
|
|
if state == PeerConnectionStateConnected {
|
|
connectionWg.Done()
|
|
}
|
|
}
|
|
|
|
pcSender.OnConnectionStateChange(connectionStateChangeHandler)
|
|
pcReceiver.OnConnectionStateChange(connectionStateChangeHandler)
|
|
|
|
pcReceiver.OnTrack(func(*TrackRemote, *RTPReceiver) {
|
|
defer wg.Done()
|
|
})
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
vp8WriterA, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("a"))
|
|
assert.NoError(t, err)
|
|
|
|
sender, err := pcSender.AddTrack(vp8WriterA)
|
|
assert.NoError(t, err)
|
|
assert.NotNil(t, sender)
|
|
|
|
vp8WriterB, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("b"))
|
|
assert.NoError(t, err)
|
|
err = sender.AddEncoding(vp8WriterB)
|
|
assert.NoError(t, err)
|
|
|
|
vp8WriterC, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion", WithRTPStreamID("c"))
|
|
assert.NoError(t, err)
|
|
err = sender.AddEncoding(vp8WriterC)
|
|
assert.NoError(t, err)
|
|
|
|
parameters := sender.GetParameters()
|
|
var midID, ridID, rsidID uint8
|
|
for _, extension := range parameters.HeaderExtensions {
|
|
switch extension.URI {
|
|
case sdp.SDESMidURI:
|
|
midID = uint8(extension.ID)
|
|
case sdp.SDESRTPStreamIDURI:
|
|
ridID = uint8(extension.ID)
|
|
case sdesRepairRTPStreamIDURI:
|
|
rsidID = uint8(extension.ID)
|
|
}
|
|
}
|
|
assert.NotZero(t, midID)
|
|
assert.NotZero(t, ridID)
|
|
assert.NotZero(t, rsidID)
|
|
|
|
// signaling
|
|
offerSDP, err := pcSender.CreateOffer(nil)
|
|
assert.NoError(t, err)
|
|
err = pcSender.SetLocalDescription(offerSDP)
|
|
assert.NoError(t, err)
|
|
|
|
err = pcReceiver.SetRemoteDescription(offerSDP)
|
|
assert.NoError(t, err)
|
|
answerSDP, err := pcReceiver.CreateAnswer(nil)
|
|
assert.NoError(t, err)
|
|
|
|
answerGatheringComplete := GatheringCompletePromise(pcReceiver)
|
|
err = pcReceiver.SetLocalDescription(answerSDP)
|
|
assert.NoError(t, err)
|
|
<-answerGatheringComplete
|
|
|
|
assert.NoError(t, pcSender.SetRemoteDescription(*pcReceiver.LocalDescription()))
|
|
|
|
connectionWg.Wait()
|
|
|
|
var seqNo uint16
|
|
for i := 0; i < 100; i++ {
|
|
pkt := &rtp.Packet{
|
|
Header: rtp.Header{
|
|
Version: 2,
|
|
SequenceNumber: seqNo,
|
|
PayloadType: 96,
|
|
},
|
|
Payload: []byte{0x00, 0x00},
|
|
}
|
|
|
|
assert.NoError(t, pkt.SetExtension(ridID, []byte("a")))
|
|
assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
|
|
assert.NoError(t, vp8WriterA.WriteRTP(pkt))
|
|
|
|
assert.NoError(t, pkt.SetExtension(ridID, []byte("b")))
|
|
assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
|
|
assert.NoError(t, vp8WriterB.WriteRTP(pkt))
|
|
|
|
assert.NoError(t, pkt.SetExtension(ridID, []byte("c")))
|
|
assert.NoError(t, pkt.SetExtension(midID, []byte(sender.rtpTransceiver.Mid())))
|
|
assert.NoError(t, vp8WriterC.WriteRTP(pkt))
|
|
|
|
seqNo++
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
closePairNow(t, pcSender, pcReceiver)
|
|
}
|
|
|
|
// Check that PayloadType of 0 is handled correctly. At one point
|
|
// we incorrectly assumed 0 meant an invalid stream and wouldn't update things
|
|
// properly
|
|
func TestPeerConnection_Zero_PayloadType(t *testing.T) {
|
|
lim := test.TimeOut(time.Second * 5)
|
|
defer lim.Stop()
|
|
|
|
report := test.CheckRoutines(t)
|
|
defer report()
|
|
|
|
pcOffer, pcAnswer, err := newPair()
|
|
require.NoError(t, err)
|
|
|
|
audioTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypePCMU}, "audio", "audio")
|
|
require.NoError(t, err)
|
|
|
|
_, err = pcOffer.AddTrack(audioTrack)
|
|
require.NoError(t, err)
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
trackFired := make(chan struct{})
|
|
|
|
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
|
|
require.Equal(t, track.Codec().MimeType, MimeTypePCMU)
|
|
close(trackFired)
|
|
})
|
|
|
|
func() {
|
|
ticker := time.NewTicker(20 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-trackFired:
|
|
return
|
|
case <-ticker.C:
|
|
if routineErr := audioTrack.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
|
|
fmt.Println(routineErr)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
}
|
|
|
|
// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection
|
|
// the user gets retransmitted RTP packets with no extra configuration
|
|
func Test_PeerConnection_RTX_E2E(t *testing.T) {
|
|
defer test.TimeOut(time.Second * 30).Stop()
|
|
|
|
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
|
|
|
|
wan.AddChunkFilter(func(vnet.Chunk) bool {
|
|
return rand.Intn(5) != 4 //nolint: gosec
|
|
})
|
|
|
|
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
|
|
assert.NoError(t, err)
|
|
|
|
rtpSender, err := pcOffer.AddTrack(track)
|
|
assert.NoError(t, err)
|
|
|
|
go func() {
|
|
rtcpBuf := make([]byte, 1500)
|
|
for {
|
|
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
|
|
ssrc := rtpSender.GetParameters().Encodings[0].SSRC
|
|
|
|
rtxRead, rtxReadCancel := context.WithCancel(context.Background())
|
|
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
|
|
for {
|
|
pkt, attributes, readRTPErr := track.ReadRTP()
|
|
if errors.Is(readRTPErr, io.EOF) {
|
|
return
|
|
} else if pkt.PayloadType == 0 {
|
|
continue
|
|
}
|
|
|
|
assert.NotNil(t, pkt)
|
|
assert.Equal(t, pkt.SSRC, uint32(ssrc))
|
|
assert.Equal(t, pkt.PayloadType, uint8(96))
|
|
|
|
rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
|
|
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
|
|
rtxSSRC := attributes.Get(AttributeRtxSsrc)
|
|
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
|
|
assert.Equal(t, rtxPayloadType, uint8(97))
|
|
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
|
|
|
|
rtxReadCancel()
|
|
}
|
|
}
|
|
})
|
|
|
|
assert.NoError(t, signalPair(pcOffer, pcAnswer))
|
|
|
|
func() {
|
|
for {
|
|
select {
|
|
case <-time.After(20 * time.Millisecond):
|
|
writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second})
|
|
assert.NoError(t, writeErr)
|
|
case <-rtxRead.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
assert.NoError(t, wan.Stop())
|
|
closePairNow(t, pcOffer, pcAnswer)
|
|
}
|