Files
Alessandro Ros b627128d0f remove context from webrtc.PeerConnection arguments (#4854)
contexts are useless since there's already PeerConnection.Close().
2025-08-12 15:19:59 +02:00

445 lines
11 KiB
Go

package whip
import (
"context"
"io"
"net"
"net/http"
"net/url"
"sort"
"testing"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/protocols/webrtc"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/pion/rtp"
pwebrtc "github.com/pion/webrtc/v4"
"github.com/stretchr/testify/require"
)
func whipOffer(body []byte) *pwebrtc.SessionDescription {
return &pwebrtc.SessionDescription{
Type: pwebrtc.SDPTypeOffer,
SDP: string(body),
}
}
func gatherCodecs(tracks []*webrtc.IncomingTrack) []pwebrtc.RTPCodecParameters {
codecs := make([]pwebrtc.RTPCodecParameters, len(tracks))
for i, track := range tracks {
codecs[i] = track.Codec()
}
return codecs
}
func TestClientRead(t *testing.T) {
for _, ca := range []string{
"audio",
"video+audio",
} {
t.Run(ca, func(t *testing.T) {
var outgoingTracks []*webrtc.OutgoingTrack
switch ca {
case "audio":
outgoingTracks = []*webrtc.OutgoingTrack{{
Caps: pwebrtc.RTPCodecCapability{
MimeType: "audio/opus",
ClockRate: 48000,
Channels: 2,
},
}}
case "video+audio":
outgoingTracks = []*webrtc.OutgoingTrack{
{
Caps: pwebrtc.RTPCodecCapability{
MimeType: "video/H264",
ClockRate: 90000,
},
},
{
Caps: pwebrtc.RTPCodecCapability{
MimeType: "audio/opus",
ClockRate: 48000,
Channels: 2,
},
},
}
}
pc := &webrtc.PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
Publish: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
OutgoingTracks: outgoingTracks,
Log: test.NilLogger,
}
err := pc.Start()
require.NoError(t, err)
defer pc.Close()
state := 0
httpServ := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch state {
case 0:
require.Equal(t, http.MethodOptions, r.Method)
require.Equal(t, "/my/resource", r.URL.Path)
w.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
w.WriteHeader(http.StatusNoContent)
case 1:
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "/my/resource", r.URL.Path)
require.Equal(t, "application/sdp", r.Header.Get("Content-Type"))
body, err2 := io.ReadAll(r.Body)
require.NoError(t, err2)
offer := whipOffer(body)
answer, err2 := pc.CreateFullAnswer(offer)
require.NoError(t, err2)
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
w.Header().Set("ETag", "test_etag")
w.Header().Set("Location", "/my/resource/sessionid")
w.WriteHeader(http.StatusCreated)
w.Write([]byte(answer.SDP))
go func() {
err3 := pc.WaitUntilConnected()
require.NoError(t, err3)
for _, track := range outgoingTracks {
err3 = track.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 111,
SequenceNumber: 1123,
Timestamp: 45343,
SSRC: 563424,
},
Payload: []byte{5, 2},
})
require.NoError(t, err3)
}
}()
default:
require.Equal(t, "/my/resource/sessionid", r.URL.Path)
switch r.Method {
case http.MethodPatch:
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
w.WriteHeader(http.StatusOK)
default:
t.Errorf("should not happen")
}
}
state++
}),
}
ln, err := net.Listen("tcp", "localhost:9005")
require.NoError(t, err)
go httpServ.Serve(ln)
defer httpServ.Shutdown(context.Background())
u, err := url.Parse("http://localhost:9005/my/resource")
require.NoError(t, err)
cl := &Client{
URL: u,
HTTPClient: &http.Client{},
Log: test.NilLogger,
}
err = cl.Initialize(context.Background())
require.NoError(t, err)
defer cl.Close() //nolint:errcheck
codecs := gatherCodecs(cl.IncomingTracks())
switch ca {
case "audio":
require.Equal(t, []pwebrtc.RTPCodecParameters{
{
RTPCodecCapability: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1;stereo=1;sprop-stereo=1",
RTCPFeedback: []pwebrtc.RTCPFeedback{{
Type: "transport-cc",
}},
},
PayloadType: 111,
},
}, codecs)
case "video+audio":
sort.Slice(codecs, func(i, j int) bool {
return codecs[i].PayloadType < codecs[j].PayloadType
})
require.Equal(t, []pwebrtc.RTPCodecParameters{
{
RTPCodecCapability: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
RTCPFeedback: codecs[0].RTCPFeedback,
},
PayloadType: 105,
},
{
RTPCodecCapability: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1;stereo=1;sprop-stereo=1",
RTCPFeedback: codecs[1].RTCPFeedback,
},
PayloadType: 111,
},
}, codecs)
}
recv := make([]chan struct{}, len(outgoingTracks))
for i := range outgoingTracks {
recv[i] = make(chan struct{})
}
for i, track := range cl.IncomingTracks() {
ci := i
track.OnPacketRTP = func(_ *rtp.Packet, _ time.Time) {
close(recv[ci])
}
}
cl.StartReading()
for _, rv := range recv {
<-rv
}
})
}
}
func TestClientPublish(t *testing.T) {
for _, ca := range []string{"audio", "video+audio"} {
t.Run(ca, func(t *testing.T) {
pc := &webrtc.PeerConnection{
LocalRandomUDP: true,
IPsFromInterfaces: true,
HandshakeTimeout: conf.Duration(10 * time.Second),
TrackGatherTimeout: conf.Duration(2 * time.Second),
STUNGatherTimeout: conf.Duration(5 * time.Second),
Log: test.NilLogger,
}
err := pc.Start()
require.NoError(t, err)
defer pc.Close()
state := 0
var recv []chan struct{}
httpServ := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch state {
case 0:
require.Equal(t, http.MethodOptions, r.Method)
require.Equal(t, "/my/resource", r.URL.Path)
w.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
w.WriteHeader(http.StatusNoContent)
case 1:
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "/my/resource", r.URL.Path)
require.Equal(t, "application/sdp", r.Header.Get("Content-Type"))
body, err2 := io.ReadAll(r.Body)
require.NoError(t, err2)
offer := whipOffer(body)
answer, err2 := pc.CreateFullAnswer(offer)
require.NoError(t, err2)
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
w.Header().Set("ETag", "test_etag")
w.Header().Set("Location", "/my/resource/sessionid")
w.WriteHeader(http.StatusCreated)
w.Write([]byte(answer.SDP))
go func() {
err3 := pc.WaitUntilConnected()
require.NoError(t, err3)
err3 = pc.GatherIncomingTracks()
require.NoError(t, err3)
codecs := gatherCodecs(pc.IncomingTracks())
switch ca {
case "audio":
require.Equal(t, []pwebrtc.RTPCodecParameters{
{
RTPCodecCapability: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "",
RTCPFeedback: codecs[0].RTCPFeedback,
},
PayloadType: 96,
},
}, codecs)
case "video+audio":
sort.Slice(codecs, func(i, j int) bool {
return codecs[i].PayloadType < codecs[j].PayloadType
})
require.Equal(t, []pwebrtc.RTPCodecParameters{
{
RTPCodecCapability: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeH264,
ClockRate: 90000,
RTCPFeedback: codecs[0].RTCPFeedback,
},
PayloadType: 96,
},
{
RTPCodecCapability: pwebrtc.RTPCodecCapability{
MimeType: pwebrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "",
RTCPFeedback: codecs[1].RTCPFeedback,
},
PayloadType: 97,
},
}, codecs)
}
for i, track := range pc.IncomingTracks() {
ci := i
track.OnPacketRTP = func(_ *rtp.Packet, _ time.Time) {
close(recv[ci])
}
}
pc.StartReading()
}()
default:
require.Equal(t, "/my/resource/sessionid", r.URL.Path)
switch r.Method {
case http.MethodPatch:
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
w.WriteHeader(http.StatusOK)
default:
t.Errorf("should not happen")
}
}
state++
}),
}
ln, err := net.Listen("tcp", "localhost:9005")
require.NoError(t, err)
go httpServ.Serve(ln)
defer httpServ.Shutdown(context.Background())
u, err := url.Parse("http://localhost:9005/my/resource")
require.NoError(t, err)
var outgoingTracks []*webrtc.OutgoingTrack
switch ca {
case "audio":
outgoingTracks = []*webrtc.OutgoingTrack{{
Caps: pwebrtc.RTPCodecCapability{
MimeType: "audio/opus",
ClockRate: 48000,
Channels: 2,
},
}}
case "video+audio":
outgoingTracks = []*webrtc.OutgoingTrack{
{
Caps: pwebrtc.RTPCodecCapability{
MimeType: "video/H264",
ClockRate: 90000,
},
},
{
Caps: pwebrtc.RTPCodecCapability{
MimeType: "audio/opus",
ClockRate: 48000,
Channels: 2,
},
},
}
}
recv = make([]chan struct{}, len(outgoingTracks))
for i := range outgoingTracks {
recv[i] = make(chan struct{})
}
cl := &Client{
URL: u,
Publish: true,
OutgoingTracks: outgoingTracks,
HTTPClient: &http.Client{},
Log: test.NilLogger,
}
err = cl.Initialize(context.Background())
require.NoError(t, err)
defer cl.Close() //nolint:errcheck
for _, track := range cl.OutgoingTracks {
err = track.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 111,
SequenceNumber: 1123,
Timestamp: 45343,
SSRC: 563424,
},
Payload: []byte{5, 2},
})
require.NoError(t, err)
}
for _, rv := range recv {
<-rv
}
})
}
}