Files
gortsplib/server_play_test.go
Alessandro Ros cf2ff2b564 add Transport.Profile, media.Profile (#873)
this will allow to support AVPF in the future.
2025-09-04 18:08:51 +02:00

2731 lines
69 KiB
Go

package gortsplib
import (
"bytes"
"crypto/rand"
"crypto/tls"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"golang.org/x/net/ipv4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/conn"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/mikey"
"github.com/bluenviron/gortsplib/v4/pkg/ntp"
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
)
func uintPtr(v uint) *uint {
return &v
}
func uint16Ptr(v uint16) *uint16 {
return &v
}
func multicastCapableIP(t *testing.T) string {
intfs, err := net.Interfaces()
require.NoError(t, err)
for _, intf := range intfs {
if (intf.Flags & net.FlagMulticast) != 0 {
var addrs []net.Addr
addrs, err = intf.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
return v.IP.String()
case *net.IPAddr:
return v.IP.String()
}
}
}
}
t.Errorf("unable to find a multicast IP")
return ""
}
func mediaURL(t *testing.T, baseURL *base.URL, media *description.Media) *base.URL {
u, err := media.URL(baseURL)
require.NoError(t, err)
return u
}
func doSetup(t *testing.T, conn *conn.Conn, u string,
inTH *headers.Transport, session string,
) (*base.Response, *headers.Transport) {
h := base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": inTH.Marshal(),
}
if session != "" {
h["Session"] = base.HeaderValue{session}
}
res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mustParseURL(u),
Header: h,
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var th headers.Transport
err = th.Unmarshal(res.Header["Transport"])
require.NoError(t, err)
return res, &th
}
func doPlay(t *testing.T, conn *conn.Conn, u string, session string) *base.Response {
res, err := writeReqReadRes(conn, base.Request{
Method: base.Play,
URL: mustParseURL(u),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
return res
}
func doPause(t *testing.T, conn *conn.Conn, u string, session string) {
res, err := writeReqReadRes(conn, base.Request{
Method: base.Pause,
URL: mustParseURL(u),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
func doTeardown(t *testing.T, conn *conn.Conn, u string, session string) {
res, err := writeReqReadRes(conn, base.Request{
Method: base.Teardown,
URL: mustParseURL(u),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
func readSession(t *testing.T, res *base.Response) string {
var sx headers.Session
err := sx.Unmarshal(res.Header["Session"])
require.NoError(t, err)
return sx.Session
}
func TestServerPlayPath(t *testing.T) {
for _, ca := range []struct {
name string
setupURL string
playURL string
path string
}{
{
"standard",
"rtsp://localhost:8554/teststream[control]",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{
"no query, ffmpeg format",
"rtsp://localhost:8554/teststream?testing=123[control]",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{
"no query, gstreamer format",
"rtsp://localhost:8554/teststream[control]?testing=123",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{
// this is needed to support reading mpegts with ffmpeg
"no control",
"rtsp://localhost:8554/teststream/",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{
"no control, query, ffmpeg",
"rtsp://localhost:8554/teststream?testing=123/",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{
"no control, query, gstreamer",
"rtsp://localhost:8554/teststream/?testing=123",
"rtsp://localhost:8554/teststream/",
"/teststream",
},
{
"subpath",
"rtsp://localhost:8554/test/stream[control]",
"rtsp://localhost:8554/test/stream/",
"/test/stream",
},
{
"subpath, no control",
"rtsp://localhost:8554/test/stream/",
"rtsp://localhost:8554/test/stream/",
"/test/stream",
},
{
"subpath with query, ffmpeg format",
"rtsp://localhost:8554/test/stream?testing=123[control]",
"rtsp://localhost:8554/test/stream",
"/test/stream",
},
{
"subpath with query, gstreamer format",
"rtsp://localhost:8554/test/stream[control]?testing=123",
"rtsp://localhost:8554/test/stream",
"/test/stream",
},
{
"no path, no slash",
"rtsp://localhost:8554[control]",
"rtsp://localhost:8554",
"",
},
{
"single slash",
"rtsp://localhost:8554/[control]",
"rtsp://localhost:8554//",
"/",
},
{
"no control, no path",
"rtsp://localhost:8554/",
"rtsp://localhost:8554/",
"/",
},
{
"no control, no path, no slash",
"rtsp://localhost:8554",
"rtsp://localhost:8554",
"",
},
} {
t.Run(ca.name, func(t *testing.T) {
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
require.Equal(t, ca.path, ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
require.Equal(t, ca.path, ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{
Medias: []*description.Media{
testH264Media,
testH264Media,
testH264Media,
testH264Media,
},
},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
th := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
InterleavedIDs: &[2]int{0, 1},
}
res, _ := doSetup(t, conn,
strings.ReplaceAll(ca.setupURL, "[control]", "/"+desc.Medias[1].Control),
th, "")
session := readSession(t, res)
doPlay(t, conn, ca.playURL, session)
})
}
}
func TestServerPlaySetupErrors(t *testing.T) {
for _, ca := range []string{
"invalid path",
"closed stream",
"different paths",
"double setup",
"different protocols",
} {
t.Run(ca, func(t *testing.T) {
var stream *ServerStream
nconnClosed := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
switch ca {
case "invalid path":
require.EqualError(t, ctx.Error, "invalid SETUP path. "+
"This typically happens when VLC fails a request, and then switches to an unsupported RTSP dialect")
case "closed stream":
require.EqualError(t, ctx.Error, "stream is closed")
case "different paths":
require.EqualError(t, ctx.Error, "can't setup medias with different paths")
case "double setup":
require.EqualError(t, ctx.Error, "media has already been setup")
case "different protocols":
require.EqualError(t, ctx.Error, "can't setup medias with different transports")
}
close(nconnClosed)
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
},
RTSPAddress: "localhost:8554",
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
if ca == "closed stream" {
stream.Close()
} else {
defer stream.Close()
}
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
var desc *description.Session
var th *headers.Transport
var res *base.Response
switch ca {
case "invalid path":
th = &headers.Transport{
Protocol: headers.TransportProtocolUDP,
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
ClientPorts: &[2]int{35466, 35467},
}
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: &base.URL{
Scheme: "rtsp",
Path: "/invalid",
},
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": th.Marshal(),
},
})
require.NoError(t, err)
require.Equal(t, base.StatusBadRequest, res.StatusCode)
default:
desc = doDescribe(t, conn, false)
th = &headers.Transport{
Protocol: headers.TransportProtocolUDP,
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
ClientPorts: &[2]int{35466, 35467},
}
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": th.Marshal(),
},
})
if ca != "closed stream" {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
}
switch ca {
case "closed stream":
require.NoError(t, err)
require.Equal(t, base.StatusBadRequest, res.StatusCode)
case "different paths":
session := readSession(t, res)
th.ClientPorts = &[2]int{35468, 35469}
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/test12stream/" + desc.Medias[0].Control),
Header: base.Header{
"CSeq": base.HeaderValue{"3"},
"Transport": th.Marshal(),
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusBadRequest, res.StatusCode)
case "double setup":
session := readSession(t, res)
th.ClientPorts = &[2]int{35468, 35469}
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{
"CSeq": base.HeaderValue{"4"},
"Transport": th.Marshal(),
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusBadRequest, res.StatusCode)
case "different protocols":
session := readSession(t, res)
th.Protocol = headers.TransportProtocolTCP
th.InterleavedIDs = &[2]int{0, 1}
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{
"CSeq": base.HeaderValue{"4"},
"Transport": th.Marshal(),
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusBadRequest, res.StatusCode)
}
<-nconnClosed
})
}
}
func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) {
var stream *ServerStream
first := int32(1)
errorRecv := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
if atomic.SwapInt32(&first, 0) == 1 {
require.EqualError(t, ctx.Error,
"UDP ports 35466 and 35467 are already assigned to another reader with the same IP")
close(errorRecv)
}
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
for i := 0; i < 2; i++ {
var nconn net.Conn
nconn, err = net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
Protocol: headers.TransportProtocolUDP,
ClientPorts: &[2]int{35466, 35467},
}
desc := doDescribe(t, conn, false)
var res *base.Response
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": inTH.Marshal(),
},
})
require.NoError(t, err)
if i == 0 {
require.Equal(t, base.StatusOK, res.StatusCode)
} else {
require.Equal(t, base.StatusBadRequest, res.StatusCode)
}
}
<-errorRecv
}
func TestServerPlay(t *testing.T) {
for _, ca := range []struct {
scheme string
transport string
secure string
}{
{
"rtsp",
"udp",
"unsecure",
},
{
"rtsp",
"multicast",
"unsecure",
},
{
"rtsp",
"tcp",
"unsecure",
},
{
"rtsps",
"tcp",
"unsecure",
},
{
"rtsps",
"udp",
"secure",
},
{
"rtsps",
"multicast",
"secure",
},
{
"rtsps",
"tcp",
"secure",
},
} {
t.Run(ca.scheme+"_"+ca.transport+"_"+ca.secure, func(t *testing.T) {
var stream *ServerStream
nconnOpened := make(chan struct{})
nconnClosed := make(chan struct{})
sessionOpened := make(chan struct{})
sessionClosed := make(chan struct{})
framesReceived := make(chan struct{})
counter := uint64(0)
listenIP := multicastCapableIP(t)
s := &Server{
Handler: &testServerHandler{
onConnOpen: func(_ *ServerHandlerOnConnOpenCtx) {
close(nconnOpened)
},
onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) {
s := ctx.Conn.Stats()
require.Greater(t, s.BytesSent, uint64(800))
require.Less(t, s.BytesSent, uint64(1600))
require.Greater(t, s.BytesReceived, uint64(400))
require.Less(t, s.BytesReceived, uint64(950))
close(nconnClosed)
},
onSessionOpen: func(_ *ServerHandlerOnSessionOpenCtx) {
close(sessionOpened)
},
onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) {
if ca.transport != "multicast" {
s := ctx.Session.Stats()
require.Greater(t, s.BytesSent, uint64(50))
require.Less(t, s.BytesSent, uint64(130))
require.Greater(t, s.BytesReceived, uint64(15))
require.Less(t, s.BytesReceived, uint64(35))
}
close(sessionClosed)
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
require.NotNil(t, ctx.Conn.Session())
switch ca.transport {
case "udp":
v := TransportUDP
require.Equal(t, &v, ctx.Session.SetuppedTransport())
case "tcp":
v := TransportTCP
require.Equal(t, &v, ctx.Session.SetuppedTransport())
case "multicast":
v := TransportUDPMulticast
require.Equal(t, &v, ctx.Session.SetuppedTransport())
}
require.Equal(t, "param=value", ctx.Session.SetuppedQuery())
require.Equal(t, stream.Description().Medias, ctx.Session.SetuppedMedias())
// send RTCP packets directly to the session.
// these are sent after the response, only if onPlay returns StatusOK.
if ca.transport != "multicast" {
err := ctx.Session.WritePacketRTCP(stream.Description().Medias[0], &testRTCPPacket)
require.NoError(t, err)
}
ctx.Session.OnPacketRTCPAny(func(medi *description.Media, pkt rtcp.Packet) {
// ignore multicast loopback
if ca.secure == "unsecure" && ca.transport == "multicast" && atomic.AddUint64(&counter, 1) <= 1 {
return
}
require.Equal(t, stream.Description().Medias[0], medi)
require.Equal(t, &testRTCPPacket, pkt)
close(framesReceived)
})
// the session is added to the stream only after onPlay returns
// with StatusOK; therefore we must wait before calling
// ServerStream.WritePacket*()
go func() {
time.Sleep(500 * time.Millisecond)
err := stream.WritePacketRTCP(stream.Description().Medias[0], &testRTCPPacket)
require.NoError(t, err)
err = stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
}()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onGetParameter: func(_ *ServerHandlerOnGetParameterCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: listenIP + ":8554",
}
switch ca.transport {
case "udp":
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
case "multicast":
s.MulticastIPRange = "224.1.0.0/16"
s.MulticastRTPPort = 8000
s.MulticastRTCPPort = 8001
}
if ca.scheme == "rtsps" {
cert, err := tls.X509KeyPair(serverCert, serverKey)
require.NoError(t, err)
s.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}}
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", listenIP+":8554")
require.NoError(t, err)
nconn = func() net.Conn {
if ca.scheme == "rtsps" {
return tls.Client(nconn, &tls.Config{InsecureSkipVerify: true})
}
return nconn
}()
conn := conn.NewConn(nconn)
<-nconnOpened
desc := doDescribe(t, conn, false)
if ca.secure == "secure" {
require.Equal(t, headers.TransportProfileSAVP, desc.Medias[0].Profile)
require.NotEmpty(t, desc.Medias[0].KeyMgmtMikey)
}
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
}
switch ca.transport {
case "udp":
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
case "multicast":
v := headers.TransportDeliveryMulticast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
case "tcp":
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{5, 6} // odd value
}
h := base.Header{
"CSeq": base.HeaderValue{"1"},
}
var srtpOutCtx *wrappedSRTPContext
if ca.secure == "secure" {
inTH.Profile = headers.TransportProfileSAVP
key := make([]byte, srtpKeyLength)
_, err = rand.Read(key)
require.NoError(t, err)
srtpOutCtx = &wrappedSRTPContext{
key: key,
ssrcs: []uint32{2345423},
}
err = srtpOutCtx.initialize()
require.NoError(t, err)
var mikeyMsg *mikey.Message
mikeyMsg, err = mikeyGenerate(srtpOutCtx)
require.NoError(t, err)
var enc base.HeaderValue
enc, err = headers.KeyMgmt{
URL: mediaURL(t, desc.BaseURL, desc.Medias[0]).String(),
MikeyMessage: mikeyMsg,
}.Marshal()
require.NoError(t, err)
h["KeyMgmt"] = enc
}
h["Transport"] = inTH.Marshal()
res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mediaURL(t, desc.BaseURL, desc.Medias[0]),
Header: h,
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var th headers.Transport
err = th.Unmarshal(res.Header["Transport"])
require.NoError(t, err)
var srtpInCtx *wrappedSRTPContext
if ca.secure == "secure" {
require.Equal(t, headers.TransportProfileSAVP, th.Profile)
var keyMgmt headers.KeyMgmt
err = keyMgmt.Unmarshal(res.Header["KeyMgmt"])
require.NoError(t, err)
pl1, _ := mikeyGetPayload[*mikey.PayloadKEMAC](keyMgmt.MikeyMessage)
pl2, _ := mikeyGetPayload[*mikey.PayloadKEMAC](desc.Medias[0].KeyMgmtMikey)
require.Equal(t, pl1, pl2)
srtpInCtx, err = mikeyToContext(keyMgmt.MikeyMessage)
require.NoError(t, err)
}
var l1 net.PacketConn
var l2 net.PacketConn
switch ca.transport {
case "udp":
require.Equal(t, headers.TransportProtocolUDP, th.Protocol)
require.Equal(t, headers.TransportDeliveryUnicast, *th.Delivery)
l1, err = net.ListenPacket("udp", listenIP+":35466")
require.NoError(t, err)
defer l1.Close()
l2, err = net.ListenPacket("udp", listenIP+":35467")
require.NoError(t, err)
defer l2.Close()
case "multicast": //nolint:dupl
require.Equal(t, headers.TransportProtocolUDP, th.Protocol)
require.Equal(t, headers.TransportDeliveryMulticast, *th.Delivery)
l1, err = net.ListenPacket("udp", "224.0.0.0:"+strconv.FormatInt(int64(th.Ports[0]), 10))
require.NoError(t, err)
defer l1.Close()
p := ipv4.NewPacketConn(l1)
var intfs []net.Interface
intfs, err = net.Interfaces()
require.NoError(t, err)
for _, intf := range intfs {
err = p.JoinGroup(&intf, &net.UDPAddr{IP: *th.Destination})
require.NoError(t, err)
}
l2, err = net.ListenPacket("udp", "224.0.0.0:"+strconv.FormatInt(int64(th.Ports[1]), 10))
require.NoError(t, err)
defer l2.Close()
p = ipv4.NewPacketConn(l2)
intfs, err = net.Interfaces()
require.NoError(t, err)
for _, intf := range intfs {
err = p.JoinGroup(&intf, &net.UDPAddr{IP: *th.Destination})
require.NoError(t, err)
}
case "tcp":
require.Equal(t, headers.TransportProtocolTCP, th.Protocol)
require.Equal(t, headers.TransportDeliveryUnicast, *th.Delivery)
}
<-sessionOpened
session := readSession(t, res)
doPlay(t, conn, "rtsp://"+listenIP+":8554/teststream", session)
// server -> client (direct)
if ca.transport != "multicast" {
var buf []byte
switch ca.transport {
case "udp":
buf = make([]byte, 2048)
var n int
n, _, err = l2.ReadFrom(buf)
require.NoError(t, err)
buf = buf[:n]
case "tcp":
var f *base.InterleavedFrame
f, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 6, f.Channel)
buf = f.Payload
}
if ca.secure == "secure" {
buf, err = srtpInCtx.decryptRTCP(buf, buf, nil)
require.NoError(t, err)
}
require.Equal(t, testRTCPPacketMarshaled, buf)
}
// server -> client (through stream)
var buf1 []byte
var buf2 []byte
switch ca.transport {
case "udp", "multicast":
buf1 = make([]byte, 2048)
var n int
n, _, err = l1.ReadFrom(buf1)
require.NoError(t, err)
buf1 = buf1[:n]
buf2 = make([]byte, 2048)
n, _, err = l2.ReadFrom(buf2)
require.NoError(t, err)
buf2 = buf2[:n]
case "tcp":
var f *base.InterleavedFrame
f, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 6, f.Channel)
buf2 = f.Payload
f, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 5, f.Channel)
buf1 = f.Payload
}
if ca.secure == "secure" {
buf1, err = srtpInCtx.decryptRTP(buf1, buf1, nil)
require.NoError(t, err)
}
var pkt rtp.Packet
err = pkt.Unmarshal(buf1)
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
if ca.secure == "secure" {
buf2, err = srtpInCtx.decryptRTCP(buf2, buf2, nil)
require.NoError(t, err)
}
require.Equal(t, testRTCPPacketMarshaled, buf2)
// client -> server
buf := testRTCPPacketMarshaled
if ca.secure == "secure" {
encr := make([]byte, 2000)
encr, err = srtpOutCtx.encryptRTCP(encr, buf, nil)
require.NoError(t, err)
buf = encr
}
switch ca.transport {
case "udp":
_, err = l2.WriteTo(buf, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ServerPorts[1],
})
require.NoError(t, err)
case "multicast":
_, err = l2.WriteTo(buf, &net.UDPAddr{
IP: *th.Destination,
Port: th.Ports[1],
})
require.NoError(t, err)
case "tcp":
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 6,
Payload: buf,
}, make([]byte, 1024))
require.NoError(t, err)
}
<-framesReceived
// ping
switch ca.transport {
case "udp", "multicast":
// ping with OPTIONS
res, err = writeReqReadRes(conn, base.Request{
Method: base.Options,
URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"4"},
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
// ping with GET_PARAMETER
res, err = writeReqReadRes(conn, base.Request{
Method: base.GetParameter,
URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"5"},
"Session": base.HeaderValue{session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
doTeardown(t, conn, "rtsp://"+listenIP+":8554/teststream", session)
<-sessionClosed
nconn.Close()
<-nconnClosed
})
}
}
func TestServerPlaySocketError(t *testing.T) {
for _, transport := range []string{
"udp",
"multicast",
"tcp",
} {
t.Run(transport, func(t *testing.T) {
var stream *ServerStream
connClosed := make(chan struct{})
writeDone := make(chan struct{})
listenIP := multicastCapableIP(t)
s := &Server{
Handler: &testServerHandler{
onConnClose: func(_ *ServerHandlerOnConnCloseCtx) {
close(connClosed)
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
defer close(writeDone)
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
for range t.C {
err := stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
if err != nil {
return
}
}
}()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: listenIP + ":8554",
}
switch transport {
case "udp":
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
case "multicast":
s.MulticastIPRange = "224.1.0.0/16"
s.MulticastRTPPort = 8000
s.MulticastRTCPPort = 8001
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
func() {
var nconn net.Conn
nconn, err = net.Dial("tcp", listenIP+":8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
}
switch transport {
case "udp":
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
case "multicast":
v := headers.TransportDeliveryMulticast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
default:
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{5, 6} // odd value
}
res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
var l1 net.PacketConn
var l2 net.PacketConn
switch transport {
case "udp":
require.Equal(t, headers.TransportProtocolUDP, th.Protocol)
require.Equal(t, headers.TransportDeliveryUnicast, *th.Delivery)
l1, err = net.ListenPacket("udp", listenIP+":35466")
require.NoError(t, err)
defer l1.Close()
l2, err = net.ListenPacket("udp", listenIP+":35467")
require.NoError(t, err)
defer l2.Close()
case "multicast": //nolint:dupl
require.Equal(t, headers.TransportProtocolUDP, th.Protocol)
require.Equal(t, headers.TransportDeliveryMulticast, *th.Delivery)
l1, err = net.ListenPacket("udp", "224.0.0.0:"+strconv.FormatInt(int64(th.Ports[0]), 10))
require.NoError(t, err)
defer l1.Close()
p := ipv4.NewPacketConn(l1)
var intfs []net.Interface
intfs, err = net.Interfaces()
require.NoError(t, err)
for _, intf := range intfs {
err = p.JoinGroup(&intf, &net.UDPAddr{IP: *th.Destination})
require.NoError(t, err)
}
l2, err = net.ListenPacket("udp", "224.0.0.0:"+strconv.FormatInt(int64(th.Ports[1]), 10))
require.NoError(t, err)
defer l2.Close()
p = ipv4.NewPacketConn(l2)
intfs, err = net.Interfaces()
require.NoError(t, err)
for _, intf := range intfs {
err = p.JoinGroup(&intf, &net.UDPAddr{IP: *th.Destination})
require.NoError(t, err)
}
default:
require.Equal(t, headers.TransportProtocolTCP, th.Protocol)
require.Equal(t, headers.TransportDeliveryUnicast, *th.Delivery)
}
session := readSession(t, res)
doPlay(t, conn, "rtsp://"+listenIP+":8554/teststream", session)
}()
<-connClosed
stream.Close()
<-writeDone
})
}
}
func TestServerPlayDecodeErrors(t *testing.T) {
for _, ca := range []struct {
proto string
name string
}{
{"udp", "rtcp invalid"},
{"udp", "rtcp too big"},
{"tcp", "rtcp invalid"},
{"tcp", "rtcp too big"},
} {
t.Run(ca.proto+" "+ca.name, func(t *testing.T) {
var stream *ServerStream
errorRecv := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onDecodeError: func(ctx *ServerHandlerOnDecodeErrorCtx) {
switch {
case ca.proto == "udp" && ca.name == "rtcp invalid":
require.EqualError(t, ctx.Error, "rtcp: packet too short")
case ca.proto == "udp" && ca.name == "rtcp too big":
require.EqualError(t, ctx.Error, "RTCP packet is too big to be read with UDP")
case ca.proto == "tcp" && ca.name == "rtcp invalid":
require.EqualError(t, ctx.Error, "rtcp: packet too short")
case ca.proto == "tcp" && ca.name == "rtcp too big":
require.EqualError(t, ctx.Error, "RTCP packet size (2000) is greater than maximum allowed (1472)")
}
close(errorRecv)
},
},
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
}
if ca.proto == "udp" {
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
} else {
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
}
res, resTH := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
var l1 net.PacketConn
var l2 net.PacketConn
if ca.proto == "udp" {
l1, err = net.ListenPacket("udp", "127.0.0.1:35466")
require.NoError(t, err)
defer l1.Close()
l2, err = net.ListenPacket("udp", "127.0.0.1:35467")
require.NoError(t, err)
defer l2.Close()
}
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
switch { //nolint:dupl
case ca.proto == "udp" && ca.name == "rtcp invalid":
_, err = l2.WriteTo([]byte{0x01, 0x02}, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[1],
})
require.NoError(t, err)
case ca.proto == "udp" && ca.name == "rtcp too big":
_, err = l2.WriteTo(bytes.Repeat([]byte{0x01, 0x02}, 2000/2), &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: resTH.ServerPorts[1],
})
require.NoError(t, err)
case ca.proto == "tcp" && ca.name == "rtcp invalid":
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 1,
Payload: []byte{0x01, 0x02},
}, make([]byte, 2048))
require.NoError(t, err)
case ca.proto == "tcp" && ca.name == "rtcp too big":
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 1,
Payload: bytes.Repeat([]byte{0x01, 0x02}, 2000/2),
}, make([]byte, 2048))
require.NoError(t, err)
}
<-errorRecv
})
}
}
func TestServerPlayRTCPReport(t *testing.T) {
for _, ca := range []string{"udp", "tcp"} {
t.Run(ca, func(t *testing.T) {
var stream *ServerStream
var curTime time.Time
var curTimeMutex sync.Mutex
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
timeNow: func() time.Time {
curTimeMutex.Lock()
defer curTimeMutex.Unlock()
return curTime
},
senderReportPeriod: 100 * time.Millisecond,
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
}
if ca == "udp" {
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
} else {
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
var l1 net.PacketConn
var l2 net.PacketConn
if ca == "udp" {
l1, err = net.ListenPacket("udp", "localhost:35466")
require.NoError(t, err)
defer l1.Close()
l2, err = net.ListenPacket("udp", "localhost:35467")
require.NoError(t, err)
defer l2.Close()
}
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
curTimeMutex.Lock()
curTime = time.Date(2014, 6, 7, 15, 0, 0, 0, time.UTC)
curTimeMutex.Unlock()
err = stream.WritePacketRTPWithNTP(
stream.Description().Medias[0],
&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SSRC: 0x38F27A2F,
Timestamp: 240000,
},
Payload: []byte{0x05}, // IDR
},
time.Date(2017, 8, 10, 12, 22, 0, 0, time.UTC))
require.NoError(t, err)
curTimeMutex.Lock()
curTime = time.Date(2014, 6, 7, 15, 0, 30, 0, time.UTC)
curTimeMutex.Unlock()
var buf []byte
if ca == "udp" {
buf = make([]byte, 2048)
var n int
n, _, err = l2.ReadFrom(buf)
require.NoError(t, err)
buf = buf[:n]
} else {
_, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
var f *base.InterleavedFrame
f, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 1, f.Channel)
buf = f.Payload
}
packets, err := rtcp.Unmarshal(buf)
require.NoError(t, err)
require.Equal(t, []rtcp.Packet{
&rtcp.SenderReport{
SSRC: packets[0].(*rtcp.SenderReport).SSRC,
NTPTime: ntp.Encode(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)),
RTPTime: 240000 + 90000*30,
PacketCount: 1,
OctetCount: 1,
},
}, packets)
doTeardown(t, conn, "rtsp://localhost:8554/teststream", session)
})
}
}
func TestServerPlayVLCMulticast(t *testing.T) {
var stream *ServerStream
listenIP := multicastCapableIP(t)
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
},
RTSPAddress: listenIP + ":8554",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8000,
MulticastRTCPPort: 8001,
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", listenIP+":8554")
require.NoError(t, err)
conn := conn.NewConn(nconn)
defer nconn.Close()
res, err := writeReqReadRes(conn, base.Request{
Method: base.Describe,
URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream?vlcmulticast"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var desc sdp.SessionDescription
err = desc.Unmarshal(res.Body)
require.NoError(t, err)
require.Equal(t, "224.1.0.0", desc.ConnectionInformation.Address.Address)
}
func TestServerPlayTCPResponseBeforeFrames(t *testing.T) {
var stream *ServerStream
writerDone := make(chan struct{})
writerTerminate := make(chan struct{})
s := &Server{
RTSPAddress: "localhost:8554",
Handler: &testServerHandler{
onConnClose: func(_ *ServerHandlerOnConnCloseCtx) {
close(writerTerminate)
<-writerDone
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
defer close(writerDone)
err := stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
ti := time.NewTicker(50 * time.Millisecond)
defer ti.Stop()
for {
select {
case <-ti.C:
err = stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
case <-writerTerminate:
return
}
}
}()
time.Sleep(50 * time.Millisecond)
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
InterleavedIDs: &[2]int{0, 1},
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
_, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
}
func TestServerPlayPause(t *testing.T) {
var stream *ServerStream
writerStarted := false
writerDone := make(chan struct{})
writerTerminate := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(_ *ServerHandlerOnConnCloseCtx) {
close(writerTerminate)
<-writerDone
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
if !writerStarted {
writerStarted = true
go func() {
defer close(writerDone)
ti := time.NewTicker(50 * time.Millisecond)
defer ti.Stop()
for {
select {
case <-ti.C:
err := stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
case <-writerTerminate:
return
}
}
}()
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onPause: func(_ *ServerHandlerOnPauseCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
InterleavedIDs: &[2]int{0, 1},
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
doPause(t, conn, "rtsp://localhost:8554/teststream", session)
}
func TestServerPlayPlayPausePausePlay(t *testing.T) {
for _, ca := range []string{"stream", "direct"} {
t.Run(ca, func(t *testing.T) {
var stream *ServerStream
writerStarted := false
writerDone := make(chan struct{})
writerTerminate := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(_ *ServerHandlerOnConnCloseCtx) {
close(writerTerminate)
<-writerDone
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
if !writerStarted {
writerStarted = true
go func() {
defer close(writerDone)
ti := time.NewTicker(50 * time.Millisecond)
defer ti.Stop()
for {
select {
case <-ti.C:
if ca == "stream" {
err := stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
} else {
err := ctx.Session.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
}
case <-writerTerminate:
return
}
}
}()
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onPause: func(_ *ServerHandlerOnPauseCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
InterleavedIDs: &[2]int{0, 1},
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
doPause(t, conn, "rtsp://localhost:8554/teststream", session)
doPause(t, conn, "rtsp://localhost:8554/teststream", session)
time.Sleep(500 * time.Millisecond)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
})
}
}
func TestServerPlayTimeout(t *testing.T) {
for _, transport := range []string{
"udp",
"multicast",
// there's no timeout when reading with TCP
} {
t.Run(transport, func(t *testing.T) {
var stream *ServerStream
sessionClosed := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onSessionClose: func(_ *ServerHandlerOnSessionCloseCtx) {
close(sessionClosed)
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
ReadTimeout: 1 * time.Second,
sessionTimeout: 1 * time.Second,
RTSPAddress: "localhost:8554",
checkStreamPeriod: 500 * time.Millisecond,
}
switch transport {
case "udp":
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
case "multicast":
s.MulticastIPRange = "224.1.0.0/16"
s.MulticastRTPPort = 8000
s.MulticastRTCPPort = 8001
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
}
switch transport {
case "udp":
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
case "multicast":
v := headers.TransportDeliveryMulticast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
<-sessionClosed
})
}
}
func TestServerPlayWithoutTeardown(t *testing.T) {
for _, transport := range []string{
"udp",
"tcp",
} {
t.Run(transport, func(t *testing.T) {
var stream *ServerStream
nconnClosed := make(chan struct{})
sessionClosed := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onConnClose: func(_ *ServerHandlerOnConnCloseCtx) {
close(nconnClosed)
},
onSessionClose: func(_ *ServerHandlerOnSessionCloseCtx) {
close(sessionClosed)
},
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
ReadTimeout: 1 * time.Second,
sessionTimeout: 1 * time.Second,
RTSPAddress: "localhost:8554",
}
if transport == "udp" {
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
}
if transport == "udp" {
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
} else {
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
nconn.Close()
<-sessionClosed
<-nconnClosed
})
}
}
func TestServerPlayUDPChangeConn(t *testing.T) {
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onGetParameter: func(_ *ServerHandlerOnGetParameterCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001",
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
sxID := ""
func() {
var nconn net.Conn
nconn, err = net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
Protocol: headers.TransportProtocolUDP,
ClientPorts: &[2]int{35466, 35467},
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
sxID = session
}()
func() {
var nconn net.Conn
nconn, err = net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
var res *base.Response
res, err = writeReqReadRes(conn, base.Request{
Method: base.GetParameter,
URL: mustParseURL("rtsp://localhost:8554/teststream/"),
Header: base.Header{
"CSeq": base.HeaderValue{"4"},
"Session": base.HeaderValue{sxID},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}()
}
func TestServerPlayPartialMedias(t *testing.T) {
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(500 * time.Millisecond)
err := stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
err = stream.WritePacketRTP(stream.Description().Medias[1], &testRTPPacket)
require.NoError(t, err)
}()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media, testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{4, 5},
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
f, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 4, f.Channel)
var pkt rtp.Packet
err = pkt.Unmarshal(f.Payload)
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
}
func TestServerPlayAdditionalInfos(t *testing.T) {
getInfos := func() (*headers.RTPInfo, []*uint32) {
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}
res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
ssrcs := make([]*uint32, 2)
ssrcs[0] = th.SSRC
inTH = &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{2, 3},
}
session := readSession(t, res)
_, th = doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[1]).String(), inTH, session)
ssrcs[1] = th.SSRC
res = doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
var ri headers.RTPInfo
err = ri.Unmarshal(res.Header["RTP-Info"])
require.NoError(t, err)
return &ri, ssrcs
}
forma := &format.Generic{
PayloadTyp: 96,
RTPMa: "private/90000",
}
err := forma.Init()
require.NoError(t, err)
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
err = s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{
Medias: []*description.Media{
{
Type: "application",
Formats: []format.Format{forma},
},
{
Type: "application",
Formats: []format.Format{forma},
},
},
},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
err = stream.WritePacketRTP(stream.Description().Medias[0], &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: 556,
Timestamp: 984512368,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
require.NoError(t, err)
rtpInfo, ssrcs := getInfos()
require.True(t, strings.HasPrefix(mustParseURL((*rtpInfo)[0].URL).Path, "/teststream/trackID="))
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: mustParseURL((*rtpInfo)[0].URL).Path,
}).String(),
SequenceNumber: uint16Ptr(557),
Timestamp: (*rtpInfo)[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: mustParseURL((*rtpInfo)[1].URL).Path,
}).String(),
},
}, rtpInfo)
require.Len(t, ssrcs, 2)
require.NotNil(t, ssrcs[0])
require.NotNil(t, ssrcs[1])
err = stream.WritePacketRTP(stream.Description().Medias[1], &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 536474323,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
require.NoError(t, err)
rtpInfo, ssrcs = getInfos()
require.True(t, strings.HasPrefix(mustParseURL((*rtpInfo)[0].URL).Path, "/teststream/trackID="))
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: mustParseURL((*rtpInfo)[0].URL).Path,
}).String(),
SequenceNumber: uint16Ptr(557),
Timestamp: (*rtpInfo)[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "localhost:8554",
Path: mustParseURL((*rtpInfo)[1].URL).Path,
}).String(),
SequenceNumber: uint16Ptr(88),
Timestamp: (*rtpInfo)[1].Timestamp,
},
}, rtpInfo)
require.Len(t, ssrcs, 2)
require.NotNil(t, ssrcs[0])
require.NotNil(t, ssrcs[1])
}
func TestServerPlayNoInterleavedIDs(t *testing.T) {
forma := &format.Generic{
PayloadTyp: 96,
RTPMa: "private/90000",
}
err := forma.Init()
require.NoError(t, err)
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
err = s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{
Medias: []*description.Media{
{
Type: "application",
Formats: []format.Format{forma},
},
{
Type: "application",
Formats: []format.Format{forma},
},
},
},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Mode: transportModePtr(headers.TransportModePlay),
Protocol: headers.TransportProtocolTCP,
}
res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
require.Equal(t, &[2]int{0, 1}, th.InterleavedIDs)
session := readSession(t, res)
_, th = doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[1]).String(), inTH, session)
require.Equal(t, &[2]int{2, 3}, th.InterleavedIDs)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
for i := range 2 {
err = stream.WritePacketRTP(stream.Description().Medias[i], &testRTPPacket)
require.NoError(t, err)
var f *base.InterleavedFrame
f, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, i*2, f.Channel)
var pkt rtp.Packet
err = pkt.Unmarshal(f.Payload)
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
}
}
func TestServerPlayStreamStats(t *testing.T) {
var stream *ServerStream
s := &Server{
RTSPAddress: "localhost:8554",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8000,
MulticastRTCPPort: 8001,
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{testH264Media}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
for _, transport := range []string{"tcp", "multicast"} {
var nconn net.Conn
nconn, err = net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
}
if transport == "multicast" {
v := headers.TransportDeliveryMulticast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
} else {
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
}
res, _ := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH, "")
session := readSession(t, res)
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
}
err = stream.WritePacketRTP(stream.Description().Medias[0], &testRTPPacket)
require.NoError(t, err)
st := stream.Stats()
require.Equal(t, &ServerStreamStats{
BytesSent: 32,
RTPPacketsSent: 2,
Medias: map[*description.Media]ServerStreamStatsMedia{
stream.Description().Medias[0]: {
BytesSent: 32,
RTCPPacketsSent: 0,
Formats: map[format.Format]ServerStreamStatsFormat{
stream.Description().Medias[0].Formats[0]: {
RTPPacketsSent: 2,
LocalSSRC: st.Medias[stream.Description().Medias[0]].
Formats[stream.Description().Medias[0].Formats[0]].LocalSSRC,
},
},
},
},
}, st)
}
func TestServerPlayBackChannel(t *testing.T) {
for _, transport := range []string{
"udp",
"tcp",
} {
t.Run(transport, func(t *testing.T) {
serverOk := make(chan struct{})
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
ctx.Session.OnPacketRTPAny(func(_ *description.Media, _ format.Format, _ *rtp.Packet) {
close(serverOk)
})
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8554",
}
if transport == "udp" {
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{
testH264Media,
{
Type: description.MediaTypeAudio,
IsBackChannel: true,
Formats: []format.Format{&format.G711{
PayloadTyp: 8,
MULaw: false,
SampleRate: 8000,
ChannelCount: 1,
}},
},
}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "127.0.0.1:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, true)
var session string
var serverPorts [2]*[2]int
var l1s [2]net.PacketConn
var l2s [2]net.PacketConn
for i := 0; i < 2; i++ {
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
}
if transport == "udp" {
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466 + i*2, 35467 + i*2}
} else {
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0 + i*2, 1 + i*2}
}
res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[i]).String(), inTH, "")
if transport == "udp" {
serverPorts[i] = th.ServerPorts
l1s[i], err = net.ListenPacket("udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(35466+i*2), 10)))
require.NoError(t, err)
defer l1s[i].Close()
l2s[i], err = net.ListenPacket("udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(35467+i*2), 10)))
require.NoError(t, err)
defer l2s[i].Close()
}
session = readSession(t, res)
}
doPlay(t, conn, "rtsp://127.0.0.1:8554/teststream", session)
// client -> server
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 8,
},
Payload: []byte{1, 2, 3, 4},
}
buf, err := pkt.Marshal()
require.NoError(t, err)
if transport == "udp" {
_, err = l1s[1].WriteTo(buf, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: serverPorts[1][0],
})
require.NoError(t, err)
} else {
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 2,
Payload: buf,
}, make([]byte, 1024))
require.NoError(t, err)
}
<-serverOk
doTeardown(t, conn, "rtsp://127.0.0.1:8554/teststream", session)
})
}
}