reduce usage of rtsp-simple-server in client tests

This commit is contained in:
aler9
2021-03-13 13:49:21 +01:00
parent a9a93f4fac
commit 792d74e6ef
3 changed files with 384 additions and 172 deletions

View File

@@ -2,6 +2,7 @@ package gortsplib
import (
"bufio"
"crypto/tls"
"net"
"os"
"os/exec"
@@ -63,7 +64,7 @@ func (c *container) wait() int {
return int(code)
}
func TestClientDialRead(t *testing.T) {
func TestClientRead(t *testing.T) {
for _, ca := range []struct {
encrypted bool
proto string
@@ -81,39 +82,142 @@ func TestClientDialRead(t *testing.T) {
t.Run(encryptedStr+"_"+ca.proto, func(t *testing.T) {
var scheme string
var port string
var serverConf string
if !ca.encrypted {
scheme = "rtsp"
port = "8554"
serverConf = "{}"
} else {
var l net.Listener
if ca.encrypted {
scheme = "rtsps"
port = "8555"
serverConf = "readTimeout: 20s\n" +
"protocols: [tcp]\n" +
"encryption: yes\n"
li, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer li.Close()
cert, err := tls.X509KeyPair(serverCert, serverKey)
require.NoError(t, err)
l = tls.NewListener(li, &tls.Config{Certificates: []tls.Certificate{cert}})
} else {
scheme = "rtsp"
var err error
l, err = net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer l.Close()
}
cnt1, err := newContainer("rtsp-simple-server", "server", []string{serverConf})
require.NoError(t, err)
defer cnt1.close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
time.Sleep(1 * time.Second)
conn, err := l.Accept()
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
scheme + "://localhost:" + port + "/teststream",
})
require.NoError(t, err)
defer cnt2.close()
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
time.Sleep(1 * time.Second)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
sdp := Tracks{track}.Write()
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: sdp,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
th, err := headers.ReadTransport(req.Header["Transport"])
require.NoError(t, err)
if ca.proto == "udp" {
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: StreamProtocolUDP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
ClientPorts: th.ClientPorts,
ServerPorts: &[2]int{34556, 34557},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
} else {
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
ClientPorts: th.ClientPorts,
InterleavedIds: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
}
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
if ca.proto == "udp" {
l1, err := net.ListenPacket("udp", "localhost:34556")
require.NoError(t, err)
defer l1.Close()
l1.WriteTo([]byte("\x00\x00\x00\x00"), &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
} else {
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
require.NoError(t, err)
}
}()
conf := ClientConf{
StreamProtocol: func() *StreamProtocol {
@@ -126,15 +230,12 @@ func TestClientDialRead(t *testing.T) {
}(),
}
conn, err := conf.DialRead(scheme + "://localhost:" + port + "/teststream")
conn, err := conf.DialRead(scheme + "://localhost:8554/teststream")
require.NoError(t, err)
var firstFrame int32
frameRecv := make(chan struct{})
done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
close(frameRecv)
})
<-frameRecv
@@ -149,7 +250,7 @@ func TestClientDialRead(t *testing.T) {
}
}
func TestClientDialReadNoServerPorts(t *testing.T) {
func TestClientReadNoServerPorts(t *testing.T) {
for _, ca := range []string{
"zero",
"no",
@@ -240,8 +341,6 @@ func TestClientDialReadNoServerPorts(t *testing.T) {
}.Write(bconn.Writer)
require.NoError(t, err)
time.Sleep(1 * time.Second)
l1, err := net.ListenPacket("udp", "localhost:0")
require.NoError(t, err)
defer l1.Close()
@@ -271,40 +370,108 @@ func TestClientDialReadNoServerPorts(t *testing.T) {
}
}
func TestClientDialReadAutomaticProtocol(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{
"protocols: [tcp]\n",
})
func TestClientReadAutomaticProtocol(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer cnt1.close()
defer l.Close()
time.Sleep(1 * time.Second)
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "tcp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
conn, err := l.Accept()
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
time.Sleep(1 * time.Second)
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
sdp := Tracks{track}.Write()
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: sdp,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = base.Response{
StatusCode: base.StatusUnsupportedTransport,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
InterleavedIds: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: StreamTypeRTP,
Payload: []byte("\x00\x00\x00\x00"),
}.Write(bconn.Writer)
require.NoError(t, err)
}()
conf := ClientConf{StreamProtocol: nil}
conn, err := conf.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
var firstFrame int32
frameRecv := make(chan struct{})
done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
close(frameRecv)
})
<-frameRecv
@@ -312,42 +479,137 @@ func TestClientDialReadAutomaticProtocol(t *testing.T) {
<-done
}
func TestClientDialReadRedirect(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{
"paths:\n" +
" path1:\n" +
" source: redirect\n" +
" sourceRedirect: rtsp://localhost:8554/path2\n" +
" path2:\n",
})
func TestClientReadRedirect(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8554")
require.NoError(t, err)
defer cnt1.close()
defer l.Close()
time.Sleep(1 * time.Second)
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/path2",
})
require.NoError(t, err)
defer cnt2.close()
conn, err := l.Accept()
require.NoError(t, err)
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
time.Sleep(1 * time.Second)
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
err = base.Response{
StatusCode: base.StatusMovedPermanently,
Header: base.Header{
"Location": base.HeaderValue{"rtsp://localhost:8554/test"},
},
}.Write(bconn.Writer)
require.NoError(t, err)
conn.Close()
conn, err = l.Accept()
require.NoError(t, err)
defer conn.Close()
bconn = bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
track, err := NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
sdp := Tracks{track}.Write()
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: sdp,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
th, err := headers.ReadTransport(req.Header["Transport"])
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: StreamProtocolUDP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
ClientPorts: th.ClientPorts,
ServerPorts: &[2]int{34556, 34557},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
l1, err := net.ListenPacket("udp", "localhost:34556")
require.NoError(t, err)
defer l1.Close()
l1.WriteTo([]byte("\x00\x00\x00\x00"), &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: th.ClientPorts[0],
})
}()
conn, err := DialRead("rtsp://localhost:8554/path1")
require.NoError(t, err)
var firstFrame int32
frameRecv := make(chan struct{})
done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
close(frameRecv)
})
<-frameRecv
@@ -355,7 +617,7 @@ func TestClientDialReadRedirect(t *testing.T) {
<-done
}
func TestClientDialReadPause(t *testing.T) {
func TestClientReadPause(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
@@ -426,7 +688,7 @@ func TestClientDialReadPause(t *testing.T) {
}
}
func TestClientDialPublishSerial(t *testing.T) {
func TestClientPublishSerial(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
@@ -487,7 +749,7 @@ func TestClientDialPublishSerial(t *testing.T) {
}
}
func TestClientDialPublishParallel(t *testing.T) {
func TestClientPublishParallel(t *testing.T) {
for _, ca := range []struct {
proto string
server string
@@ -600,7 +862,7 @@ func TestClientDialPublishParallel(t *testing.T) {
}
}
func TestClientDialPublishPauseSerial(t *testing.T) {
func TestClientPublishPauseSerial(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
@@ -672,7 +934,7 @@ func TestClientDialPublishPauseSerial(t *testing.T) {
}
}
func TestClientDialPublishPauseParallel(t *testing.T) {
func TestClientPublishPauseParallel(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",