diff --git a/.github/workflows/test_highlevel.yml b/.github/workflows/test_highlevel.yml new file mode 100644 index 00000000..1f46afc7 --- /dev/null +++ b/.github/workflows/test_highlevel.yml @@ -0,0 +1,20 @@ +name: test_highlevel + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-20.04 + + steps: + - uses: actions/checkout@v2 + + - uses: actions/setup-go@v2 + with: + go-version: "1.18" + + - run: make test-highlevel-nodocker diff --git a/Makefile b/Makefile index 54f6afdf..885abebf 100644 --- a/Makefile +++ b/Makefile @@ -9,11 +9,12 @@ help: @echo "" @echo "available actions:" @echo "" - @echo " mod-tidy run go mod tidy" - @echo " format format source files" - @echo " test run tests" - @echo " lint run linter" - @echo " bench run benchmarks" + @echo " mod-tidy run go mod tidy" + @echo " format format source files" + @echo " test run tests" + @echo " test-highlevel run high-level tests" + @echo " lint run linter" + @echo " bench run benchmarks" @echo "" blank := @@ -39,7 +40,7 @@ format: define DOCKERFILE_TEST FROM $(BASE_IMAGE) -RUN apk add --no-cache make docker-cli git gcc musl-dev pkgconfig ffmpeg-dev +RUN apk add --no-cache make git gcc musl-dev pkgconfig ffmpeg-dev WORKDIR /s COPY go.mod go.sum ./ RUN go mod download @@ -50,8 +51,6 @@ export DOCKERFILE_TEST test: echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp docker run --rm -it \ - -v /var/run/docker.sock:/var/run/docker.sock:ro \ - --network=host \ --name temp \ temp \ make test-nodocker @@ -63,12 +62,32 @@ test-pkg: go test -v -race -coverprofile=coverage-pkg.txt ./pkg/... test-root: - $(foreach IMG,$(shell echo testimages/*/ | xargs -n1 basename), \ - docker build -q testimages/$(IMG) -t gortsplib-test-$(IMG)$(NL)) go test -v -race -coverprofile=coverage-root.txt . test-nodocker: test-examples test-pkg test-root +define DOCKERFILE_HIGHLEVEL_TEST +FROM $(BASE_IMAGE) +RUN apk add --no-cache make docker-cli git gcc musl-dev +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +endef +export DOCKERFILE_HIGHLEVEL_TEST + +test-highlevel: + echo "$$DOCKERFILE_HIGHLEVEL_TEST" | docker build -q . -f - -t temp + docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock:ro \ + --network=host \ + --name temp \ + temp \ + make test-highlevel-nodocker + +test-highlevel-nodocker: + go test -v -race -tags enable_highlevel_tests ./internal/highleveltests + lint: docker run --rm -v $(PWD):/app -w /app \ $(LINT_IMAGE) \ diff --git a/testimages/ffmpeg/Dockerfile b/internal/highleveltests/images/ffmpeg/Dockerfile similarity index 100% rename from testimages/ffmpeg/Dockerfile rename to internal/highleveltests/images/ffmpeg/Dockerfile diff --git a/testimages/ffmpeg/emptyvideo.mkv b/internal/highleveltests/images/ffmpeg/emptyvideo.mkv similarity index 100% rename from testimages/ffmpeg/emptyvideo.mkv rename to internal/highleveltests/images/ffmpeg/emptyvideo.mkv diff --git a/testimages/ffmpeg/start.sh b/internal/highleveltests/images/ffmpeg/start.sh similarity index 100% rename from testimages/ffmpeg/start.sh rename to internal/highleveltests/images/ffmpeg/start.sh diff --git a/testimages/gstreamer/Dockerfile b/internal/highleveltests/images/gstreamer/Dockerfile similarity index 100% rename from testimages/gstreamer/Dockerfile rename to internal/highleveltests/images/gstreamer/Dockerfile diff --git a/testimages/gstreamer/emptyvideo.mkv b/internal/highleveltests/images/gstreamer/emptyvideo.mkv similarity index 100% rename from testimages/gstreamer/emptyvideo.mkv rename to internal/highleveltests/images/gstreamer/emptyvideo.mkv diff --git a/testimages/gstreamer/exitafterframe.c b/internal/highleveltests/images/gstreamer/exitafterframe.c similarity index 100% rename from testimages/gstreamer/exitafterframe.c rename to internal/highleveltests/images/gstreamer/exitafterframe.c diff --git a/testimages/gstreamer/start.sh b/internal/highleveltests/images/gstreamer/start.sh similarity index 100% rename from testimages/gstreamer/start.sh rename to internal/highleveltests/images/gstreamer/start.sh diff --git a/internal/highleveltests/server_test.go b/internal/highleveltests/server_test.go new file mode 100644 index 00000000..d1317ea2 --- /dev/null +++ b/internal/highleveltests/server_test.go @@ -0,0 +1,523 @@ +//go:build enable_highlevel_tests +// +build enable_highlevel_tests + +package highleveltests + +import ( + "crypto/tls" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/base" +) + +var serverCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDkzCCAnugAwIBAgIUHFnymlrkEnz3ThpFvSrqybBepn4wDQYJKoZIhvcNAQEL +BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X +DTIxMTIwMzIxNDg0MFoXDTMxMTIwMTIxNDg0MFowWTELMAkGA1UEBhMCQVUxEzAR +BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5 +IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAv8h21YDIAYNzewrfQqQTlODJjuUZKxMCO7z1wIapem5I+1I8n+vD +v8qvuyZk1m9CKQPfXxhJz0TT5kECoUY0KaDtykSzfaUK34F9J1d5snDkaOtN48W+ +8l39Wtcvc5JW17jNwabppAkHHYAMQryO8urKLWKbZmLhYCJdYgNqb8ciWPsnYNA0 +zcnKML9zQphh7dxPq1wCsy/c/XZUzxTLAe8hsCKuqpESEX3MMJA9gOLmiOF0JgpT +9h6eqvJU8IK0QMIv3tekJWSBvTLyz4ghENs10sMKKNqR6NWt2SsOloeBkOhIDLOk +byLaPEvugrQsga99uhANRpXp+CHnVeAH8QIDAQABo1MwUTAdBgNVHQ4EFgQUwyEH +cMynEoy1/TnbIhgpEAs038gwHwYDVR0jBBgwFoAUwyEHcMynEoy1/TnbIhgpEAs0 +38gwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAiV56KhDoUVzW +qV1X0QbfLaifimsN3Na3lUgmjcgyUe8rHj09pXuAD/AcQw/zwKzZ6dPtizBeNLN8 +jV1dbJmR7DE3MDlndgMKTOKFsqzHjG9UTXkBGFUEM1shn2GE8XcvDF0AzKU82YjP +B0KswA1NoYTNP2PW4IhZRzv2M+fnmkvc8DSEZ+dxEMg3aJfe/WLPvYjDpFXLvuxl +YnerRQ04hFysh5eogPFpB4KyyPs6jGnQFmZCbFyk9pjKRbDPJc6FkDglkzTB6j3Q +TSfgNJswOiap13vQQKf5Vu7LTuyO4Wjfjr74QNqMLLNIgcC7n2jfQj1g5Xa0bnF5 +G4tLrCLUUw== +-----END CERTIFICATE----- +`) + +var serverKey = []byte(`-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC/yHbVgMgBg3N7 +Ct9CpBOU4MmO5RkrEwI7vPXAhql6bkj7Ujyf68O/yq+7JmTWb0IpA99fGEnPRNPm +QQKhRjQpoO3KRLN9pQrfgX0nV3mycORo603jxb7yXf1a1y9zklbXuM3BpumkCQcd +gAxCvI7y6sotYptmYuFgIl1iA2pvxyJY+ydg0DTNycowv3NCmGHt3E+rXAKzL9z9 +dlTPFMsB7yGwIq6qkRIRfcwwkD2A4uaI4XQmClP2Hp6q8lTwgrRAwi/e16QlZIG9 +MvLPiCEQ2zXSwwoo2pHo1a3ZKw6Wh4GQ6EgMs6RvIto8S+6CtCyBr326EA1Glen4 +IedV4AfxAgMBAAECggEAOqcJSNSA1o2oJKo3i374iiCRJAWGw/ilRzXMBtxoOow9 +/7av2czV6fMH+XmNf1M5bafEiaW49Q28rH+XWVFKJK0V7DVEm5l9EMveRcjn7B3A +jSHhiVZxxlfeYwjKd1L7AjB/pMjyTXuBVJFTrplSMpKB0I2GrzJwcOExpAcdZx98 +K0s5pauJH9bE0kI3p585SGQaIjrz0LvAmf6cQ5HhKfahJdWNnKZ/S4Kdqe+JCgyd +NawREHhf3tU01Cd3DOgXn4+5V/Ts6XtqY1RuSvonNv3nyeiOpX8C4cHKD5u2sNOC +3J4xWrrs0W3e8IATgAys56teKbEufHTUx52wNhAbzQKBgQD56W0tPCuaKrsjxsvE +dNHdm/9aQrN1jCJxUcGaxCIioXSyDvpSKcgxQbEqHXRTtJt5/Kadz9omq4vFTVtl +5Gf+3Lrf3ZT82SvYHtlIMdBZLlKwk6MolEa0KGAuJBNJVRIOkm5YjV/3bJebeTIb +WrLEyNCOXFAh3KVzBPU8nJ1aTwKBgQDEdISg3UsSOLBa0BfoJ5FlqGepZSufYgqh +xAJn8EbopnlzfmHBZAhE2+Igh0xcHhQqHThc3OuLtAkWu6fUSLiSA+XjU9TWPpA1 +C/325rhT23fxzYIlYFegR9BToxYhv14ufkcTXRfHRAhffk7K5A2nlJfldDZRmUh2 +5KIjXQ0pvwKBgQCa7S6VgFu3cw4Ym8DuxUzlCTRADGGcWYdwoLJY84YF2fmx+L8N ++ID2qDbgWOooiipocUwJQTWIC4jWg6JJhFNEGCpxZbhbF3aqwFULAHadEq6IcL4R +Bfre7LjTYeHi8C4FgpmNo/b+N/+0jmmVs6BnheZkmq3CkDqxFz3AmYai2QKBgQC1 +kzAmcoJ5U/YD6YO/Khsjx3QQSBb6mCZVf5HtuVIApCVqzuvRUACojEbDY+n61j4y +8pDum64FkKA557Xl6lTVeE7ZPtlgL7EfpnbT5kmGEDobPqPEofg7h0SQmRLSnEqT +VFmjFw7sOQA4Ksjuk7vfIOMHy9KMts0YPpdxcgbBhwKBgQCP8MeRPuhZ26/oIESr +I8ArLEaPebYmLXCT2ZTudGztoyYFxinRGHA4PdamSOKfB1li52wAaqgRA3cSqkUi +kabimVOvrOAWlnvznqXEHPNx6mbbKs08jh+uRRmrOmMrxAobpTqarL2Sdxb6afID +NkxNic7oHgsZpIkZ8HK+QjAAWA== +-----END PRIVATE KEY----- +`) + +type testServerHandler struct { + onConnOpen func(*gortsplib.ServerHandlerOnConnOpenCtx) + onConnClose func(*gortsplib.ServerHandlerOnConnCloseCtx) + onSessionOpen func(*gortsplib.ServerHandlerOnSessionOpenCtx) + onSessionClose func(*gortsplib.ServerHandlerOnSessionCloseCtx) + onDescribe func(*gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) + onAnnounce func(*gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) + onSetup func(*gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) + onPlay func(*gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) + onRecord func(*gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) + onPause func(*gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) + onPacketRTP func(*gortsplib.ServerHandlerOnPacketRTPCtx) + onPacketRTCP func(*gortsplib.ServerHandlerOnPacketRTCPCtx) + onSetParameter func(*gortsplib.ServerHandlerOnSetParameterCtx) (*base.Response, error) + onGetParameter func(*gortsplib.ServerHandlerOnGetParameterCtx) (*base.Response, error) +} + +func (sh *testServerHandler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + if sh.onConnOpen != nil { + sh.onConnOpen(ctx) + } +} + +func (sh *testServerHandler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { + if sh.onConnClose != nil { + sh.onConnClose(ctx) + } +} + +func (sh *testServerHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + if sh.onSessionOpen != nil { + sh.onSessionOpen(ctx) + } +} + +func (sh *testServerHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + if sh.onSessionClose != nil { + sh.onSessionClose(ctx) + } +} + +func (sh *testServerHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + if sh.onDescribe != nil { + return sh.onDescribe(ctx) + } + return nil, nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { + if sh.onAnnounce != nil { + return sh.onAnnounce(ctx) + } + return nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + if sh.onSetup != nil { + return sh.onSetup(ctx) + } + return nil, nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + if sh.onPlay != nil { + return sh.onPlay(ctx) + } + return nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { + if sh.onRecord != nil { + return sh.onRecord(ctx) + } + return nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) { + if sh.onPause != nil { + return sh.onPause(ctx) + } + return nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { + if sh.onPacketRTP != nil { + sh.onPacketRTP(ctx) + } +} + +func (sh *testServerHandler) OnPacketRTCP(ctx *gortsplib.ServerHandlerOnPacketRTCPCtx) { + if sh.onPacketRTCP != nil { + sh.onPacketRTCP(ctx) + } +} + +func (sh *testServerHandler) OnSetParameter(ctx *gortsplib.ServerHandlerOnSetParameterCtx) (*base.Response, error) { + if sh.onSetParameter != nil { + return sh.onSetParameter(ctx) + } + return nil, fmt.Errorf("unimplemented") +} + +func (sh *testServerHandler) OnGetParameter(ctx *gortsplib.ServerHandlerOnGetParameterCtx) (*base.Response, error) { + if sh.onGetParameter != nil { + return sh.onGetParameter(ctx) + } + return nil, fmt.Errorf("unimplemented") +} + +type container struct { + name string +} + +func newContainer(image string, name string, args []string) (*container, error) { + c := &container{ + name: name, + } + + exec.Command("docker", "kill", "gortsplib-test-"+name).Run() + exec.Command("docker", "wait", "gortsplib-test-"+name).Run() + + cmd := []string{ + "docker", "run", + "--network=host", + "--name=gortsplib-test-" + name, + "gortsplib-test-" + image, + } + cmd = append(cmd, args...) + ecmd := exec.Command(cmd[0], cmd[1:]...) + ecmd.Stdout = nil + ecmd.Stderr = os.Stderr + + err := ecmd.Start() + if err != nil { + return nil, err + } + + time.Sleep(1 * time.Second) + + return c, nil +} + +func (c *container) close() { + exec.Command("docker", "kill", "gortsplib-test-"+c.name).Run() + exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() + exec.Command("docker", "rm", "gortsplib-test-"+c.name).Run() +} + +func (c *container) wait() int { + exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() + out, _ := exec.Command("docker", "inspect", "gortsplib-test-"+c.name, + "--format={{.State.ExitCode}}").Output() + code, _ := strconv.ParseInt(string(out[:len(out)-1]), 10, 64) + return int(code) +} + +func buildImage(image string) error { + ecmd := exec.Command("docker", "build", filepath.Join("images", image), + "-t", "gortsplib-test-"+image) + ecmd.Stdout = nil + ecmd.Stderr = os.Stderr + return ecmd.Run() +} + +func TestServerPublishRead(t *testing.T) { + files, err := os.ReadDir("images") + require.NoError(t, err) + + for _, file := range files { + err := buildImage(file.Name()) + require.NoError(t, err) + } + + for _, ca := range []struct { + publisherSoft string + publisherProto string + readerSoft string + readerProto string + }{ + {"ffmpeg", "udp", "ffmpeg", "udp"}, + {"ffmpeg", "udp", "gstreamer", "udp"}, + {"gstreamer", "udp", "ffmpeg", "udp"}, + {"gstreamer", "udp", "gstreamer", "udp"}, + + {"ffmpeg", "tcp", "ffmpeg", "tcp"}, + {"ffmpeg", "tcp", "gstreamer", "tcp"}, + {"gstreamer", "tcp", "ffmpeg", "tcp"}, + {"gstreamer", "tcp", "gstreamer", "tcp"}, + + {"ffmpeg", "tcp", "ffmpeg", "udp"}, + {"ffmpeg", "udp", "ffmpeg", "tcp"}, + + {"ffmpeg", "tls", "ffmpeg", "tls"}, + {"ffmpeg", "tls", "gstreamer", "tls"}, + {"gstreamer", "tls", "ffmpeg", "tls"}, + {"gstreamer", "tls", "gstreamer", "tls"}, + + {"ffmpeg", "udp", "ffmpeg", "multicast"}, + {"ffmpeg", "udp", "gstreamer", "multicast"}, + } { + t.Run(ca.publisherSoft+"_"+ca.publisherProto+"_"+ + ca.readerSoft+"_"+ca.readerProto, func(t *testing.T) { + var mutex sync.Mutex + var stream *gortsplib.ServerStream + var publisher *gortsplib.ServerSession + + s := &gortsplib.Server{ + Handler: &testServerHandler{ + onSessionClose: func(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + mutex.Lock() + defer mutex.Unlock() + + if stream != nil { + if ctx.Session == publisher { + stream.Close() + stream = nil + } + } + }, + onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + if ctx.Path != "test/stream" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, nil, fmt.Errorf("invalid path (%s)", ctx.Request.URL) + } + if ctx.Query != "key=val" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, nil, fmt.Errorf("invalid query (%s)", ctx.Query) + } + + mutex.Lock() + defer mutex.Unlock() + + if stream == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onAnnounce: func(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) { + if ctx.Path != "test/stream" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("invalid path (%s)", ctx.Request.URL) + } + if ctx.Query != "key=val" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("invalid query (%s)", ctx.Query) + } + + mutex.Lock() + defer mutex.Unlock() + + if stream != nil { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("someone is already publishing") + } + + stream = gortsplib.NewServerStream(ctx.Tracks) + publisher = ctx.Session + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + if ctx.Path != "test/stream" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, nil, fmt.Errorf("invalid path (%s)", ctx.Request.URL) + } + if ctx.Query != "key=val" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, nil, fmt.Errorf("invalid query (%s)", ctx.Query) + } + + if stream == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + if ctx.Path != "test/stream" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("invalid path (%s)", ctx.Request.URL) + } + if ctx.Query != "key=val" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("invalid query (%s)", ctx.Query) + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onRecord: func(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) { + if ctx.Path != "test/stream" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("invalid path (%s)", ctx.Request.URL) + } + if ctx.Query != "key=val" { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("invalid query (%s)", ctx.Query) + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onPacketRTP: func(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) { + mutex.Lock() + defer mutex.Unlock() + + if ctx.Session == publisher { + stream.WritePacketRTP(ctx.TrackID, ctx.Packet, true) + } + }, + }, + RTSPAddress: "localhost:8554", + } + + var proto string + if ca.publisherProto == "tls" { + proto = "rtsps" + cert, err := tls.X509KeyPair(serverCert, serverKey) + require.NoError(t, err) + s.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}} + } else { + proto = "rtsp" + s.UDPRTPAddress = "127.0.0.1:8000" + s.UDPRTCPAddress = "127.0.0.1:8001" + s.MulticastIPRange = "224.1.0.0/16" + s.MulticastRTPPort = 8002 + s.MulticastRTCPPort = 8003 + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + switch ca.publisherSoft { + case "ffmpeg": + ts := func() string { + switch ca.publisherProto { + case "udp", "tcp": + return ca.publisherProto + } + return "tcp" + }() + + cnt1, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.mkv", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", ts, + proto + "://localhost:8554/test/stream?key=val", + }) + require.NoError(t, err) + defer cnt1.close() + + case "gstreamer": + ts := func() string { + switch ca.publisherProto { + case "udp", "tcp": + return ca.publisherProto + } + return "tcp" + }() + + cnt1, err := newContainer("gstreamer", "publish", []string{ + "filesrc location=emptyvideo.mkv ! matroskademux ! video/x-h264 ! rtspclientsink " + + "location=" + proto + "://127.0.0.1:8554/test/stream?key=val protocols=" + ts + + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + } + + time.Sleep(1 * time.Second) + + switch ca.readerSoft { + case "ffmpeg": + ts := func() string { + switch ca.readerProto { + case "udp", "tcp": + return ca.readerProto + case "multicast": + return "udp_multicast" + } + return "tcp" + }() + + cnt2, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", ts, + "-i", proto + "://localhost:8554/test/stream?key=val", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt2.close() + require.Equal(t, 0, cnt2.wait()) + + case "gstreamer": + ts := func() string { + switch ca.readerProto { + case "udp", "tcp": + return ca.readerProto + case "multicast": + return "udp-mcast" + } + return "tcp" + }() + + cnt2, err := newContainer("gstreamer", "read", []string{ + "rtspsrc location=" + proto + "://127.0.0.1:8554/test/stream?key=val protocols=" + ts + + " tls-validation-flags=0 latency=0 " + + "! application/x-rtp,media=video ! decodebin ! exitafterframe ! fakesink", + }) + require.NoError(t, err) + defer cnt2.close() + require.Equal(t, 0, cnt2.wait()) + } + }) + } +} diff --git a/server_test.go b/server_test.go index 402d9f8d..4803dc75 100644 --- a/server_test.go +++ b/server_test.go @@ -2,15 +2,9 @@ package gortsplib import ( "bufio" - "crypto/tls" "fmt" "net" - "os" - "os/exec" - "strconv" - "sync" "testing" - "time" "github.com/stretchr/testify/require" @@ -19,6 +13,60 @@ import ( "github.com/aler9/gortsplib/pkg/headers" ) +var serverCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDkzCCAnugAwIBAgIUHFnymlrkEnz3ThpFvSrqybBepn4wDQYJKoZIhvcNAQEL +BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X +DTIxMTIwMzIxNDg0MFoXDTMxMTIwMTIxNDg0MFowWTELMAkGA1UEBhMCQVUxEzAR +BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5 +IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAv8h21YDIAYNzewrfQqQTlODJjuUZKxMCO7z1wIapem5I+1I8n+vD +v8qvuyZk1m9CKQPfXxhJz0TT5kECoUY0KaDtykSzfaUK34F9J1d5snDkaOtN48W+ +8l39Wtcvc5JW17jNwabppAkHHYAMQryO8urKLWKbZmLhYCJdYgNqb8ciWPsnYNA0 +zcnKML9zQphh7dxPq1wCsy/c/XZUzxTLAe8hsCKuqpESEX3MMJA9gOLmiOF0JgpT +9h6eqvJU8IK0QMIv3tekJWSBvTLyz4ghENs10sMKKNqR6NWt2SsOloeBkOhIDLOk +byLaPEvugrQsga99uhANRpXp+CHnVeAH8QIDAQABo1MwUTAdBgNVHQ4EFgQUwyEH +cMynEoy1/TnbIhgpEAs038gwHwYDVR0jBBgwFoAUwyEHcMynEoy1/TnbIhgpEAs0 +38gwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAiV56KhDoUVzW +qV1X0QbfLaifimsN3Na3lUgmjcgyUe8rHj09pXuAD/AcQw/zwKzZ6dPtizBeNLN8 +jV1dbJmR7DE3MDlndgMKTOKFsqzHjG9UTXkBGFUEM1shn2GE8XcvDF0AzKU82YjP +B0KswA1NoYTNP2PW4IhZRzv2M+fnmkvc8DSEZ+dxEMg3aJfe/WLPvYjDpFXLvuxl +YnerRQ04hFysh5eogPFpB4KyyPs6jGnQFmZCbFyk9pjKRbDPJc6FkDglkzTB6j3Q +TSfgNJswOiap13vQQKf5Vu7LTuyO4Wjfjr74QNqMLLNIgcC7n2jfQj1g5Xa0bnF5 +G4tLrCLUUw== +-----END CERTIFICATE----- +`) + +var serverKey = []byte(`-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC/yHbVgMgBg3N7 +Ct9CpBOU4MmO5RkrEwI7vPXAhql6bkj7Ujyf68O/yq+7JmTWb0IpA99fGEnPRNPm +QQKhRjQpoO3KRLN9pQrfgX0nV3mycORo603jxb7yXf1a1y9zklbXuM3BpumkCQcd +gAxCvI7y6sotYptmYuFgIl1iA2pvxyJY+ydg0DTNycowv3NCmGHt3E+rXAKzL9z9 +dlTPFMsB7yGwIq6qkRIRfcwwkD2A4uaI4XQmClP2Hp6q8lTwgrRAwi/e16QlZIG9 +MvLPiCEQ2zXSwwoo2pHo1a3ZKw6Wh4GQ6EgMs6RvIto8S+6CtCyBr326EA1Glen4 +IedV4AfxAgMBAAECggEAOqcJSNSA1o2oJKo3i374iiCRJAWGw/ilRzXMBtxoOow9 +/7av2czV6fMH+XmNf1M5bafEiaW49Q28rH+XWVFKJK0V7DVEm5l9EMveRcjn7B3A +jSHhiVZxxlfeYwjKd1L7AjB/pMjyTXuBVJFTrplSMpKB0I2GrzJwcOExpAcdZx98 +K0s5pauJH9bE0kI3p585SGQaIjrz0LvAmf6cQ5HhKfahJdWNnKZ/S4Kdqe+JCgyd +NawREHhf3tU01Cd3DOgXn4+5V/Ts6XtqY1RuSvonNv3nyeiOpX8C4cHKD5u2sNOC +3J4xWrrs0W3e8IATgAys56teKbEufHTUx52wNhAbzQKBgQD56W0tPCuaKrsjxsvE +dNHdm/9aQrN1jCJxUcGaxCIioXSyDvpSKcgxQbEqHXRTtJt5/Kadz9omq4vFTVtl +5Gf+3Lrf3ZT82SvYHtlIMdBZLlKwk6MolEa0KGAuJBNJVRIOkm5YjV/3bJebeTIb +WrLEyNCOXFAh3KVzBPU8nJ1aTwKBgQDEdISg3UsSOLBa0BfoJ5FlqGepZSufYgqh +xAJn8EbopnlzfmHBZAhE2+Igh0xcHhQqHThc3OuLtAkWu6fUSLiSA+XjU9TWPpA1 +C/325rhT23fxzYIlYFegR9BToxYhv14ufkcTXRfHRAhffk7K5A2nlJfldDZRmUh2 +5KIjXQ0pvwKBgQCa7S6VgFu3cw4Ym8DuxUzlCTRADGGcWYdwoLJY84YF2fmx+L8N ++ID2qDbgWOooiipocUwJQTWIC4jWg6JJhFNEGCpxZbhbF3aqwFULAHadEq6IcL4R +Bfre7LjTYeHi8C4FgpmNo/b+N/+0jmmVs6BnheZkmq3CkDqxFz3AmYai2QKBgQC1 +kzAmcoJ5U/YD6YO/Khsjx3QQSBb6mCZVf5HtuVIApCVqzuvRUACojEbDY+n61j4y +8pDum64FkKA557Xl6lTVeE7ZPtlgL7EfpnbT5kmGEDobPqPEofg7h0SQmRLSnEqT +VFmjFw7sOQA4Ksjuk7vfIOMHy9KMts0YPpdxcgbBhwKBgQCP8MeRPuhZ26/oIESr +I8ArLEaPebYmLXCT2ZTudGztoyYFxinRGHA4PdamSOKfB1li52wAaqgRA3cSqkUi +kabimVOvrOAWlnvznqXEHPNx6mbbKs08jh+uRRmrOmMrxAobpTqarL2Sdxb6afID +NkxNic7oHgsZpIkZ8HK+QjAAWA== +-----END PRIVATE KEY----- +`) + func writeReqReadRes(conn net.Conn, br *bufio.Reader, req base.Request, @@ -149,383 +197,6 @@ func (sh *testServerHandler) OnGetParameter(ctx *ServerHandlerOnGetParameterCtx) return nil, fmt.Errorf("unimplemented") } -type container struct { - name string -} - -func newContainer(image string, name string, args []string) (*container, error) { - c := &container{ - name: name, - } - - exec.Command("docker", "kill", "gortsplib-test-"+name).Run() - exec.Command("docker", "wait", "gortsplib-test-"+name).Run() - - cmd := []string{ - "docker", "run", - "--network=host", - "--name=gortsplib-test-" + name, - "gortsplib-test-" + image, - } - cmd = append(cmd, args...) - ecmd := exec.Command(cmd[0], cmd[1:]...) - ecmd.Stdout = nil - ecmd.Stderr = os.Stderr - - err := ecmd.Start() - if err != nil { - return nil, err - } - - time.Sleep(1 * time.Second) - - return c, nil -} - -func (c *container) close() { - exec.Command("docker", "kill", "gortsplib-test-"+c.name).Run() - exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() - exec.Command("docker", "rm", "gortsplib-test-"+c.name).Run() -} - -func (c *container) wait() int { - exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() - out, _ := exec.Command("docker", "inspect", "gortsplib-test-"+c.name, - "--format={{.State.ExitCode}}").Output() - code, _ := strconv.ParseInt(string(out[:len(out)-1]), 10, 64) - return int(code) -} - -var serverCert = []byte(`-----BEGIN CERTIFICATE----- -MIIDkzCCAnugAwIBAgIUHFnymlrkEnz3ThpFvSrqybBepn4wDQYJKoZIhvcNAQEL -BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X -DTIxMTIwMzIxNDg0MFoXDTMxMTIwMTIxNDg0MFowWTELMAkGA1UEBhMCQVUxEzAR -BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5 -IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A -MIIBCgKCAQEAv8h21YDIAYNzewrfQqQTlODJjuUZKxMCO7z1wIapem5I+1I8n+vD -v8qvuyZk1m9CKQPfXxhJz0TT5kECoUY0KaDtykSzfaUK34F9J1d5snDkaOtN48W+ -8l39Wtcvc5JW17jNwabppAkHHYAMQryO8urKLWKbZmLhYCJdYgNqb8ciWPsnYNA0 -zcnKML9zQphh7dxPq1wCsy/c/XZUzxTLAe8hsCKuqpESEX3MMJA9gOLmiOF0JgpT -9h6eqvJU8IK0QMIv3tekJWSBvTLyz4ghENs10sMKKNqR6NWt2SsOloeBkOhIDLOk -byLaPEvugrQsga99uhANRpXp+CHnVeAH8QIDAQABo1MwUTAdBgNVHQ4EFgQUwyEH -cMynEoy1/TnbIhgpEAs038gwHwYDVR0jBBgwFoAUwyEHcMynEoy1/TnbIhgpEAs0 -38gwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAiV56KhDoUVzW -qV1X0QbfLaifimsN3Na3lUgmjcgyUe8rHj09pXuAD/AcQw/zwKzZ6dPtizBeNLN8 -jV1dbJmR7DE3MDlndgMKTOKFsqzHjG9UTXkBGFUEM1shn2GE8XcvDF0AzKU82YjP -B0KswA1NoYTNP2PW4IhZRzv2M+fnmkvc8DSEZ+dxEMg3aJfe/WLPvYjDpFXLvuxl -YnerRQ04hFysh5eogPFpB4KyyPs6jGnQFmZCbFyk9pjKRbDPJc6FkDglkzTB6j3Q -TSfgNJswOiap13vQQKf5Vu7LTuyO4Wjfjr74QNqMLLNIgcC7n2jfQj1g5Xa0bnF5 -G4tLrCLUUw== ------END CERTIFICATE----- -`) - -var serverKey = []byte(`-----BEGIN PRIVATE KEY----- -MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC/yHbVgMgBg3N7 -Ct9CpBOU4MmO5RkrEwI7vPXAhql6bkj7Ujyf68O/yq+7JmTWb0IpA99fGEnPRNPm -QQKhRjQpoO3KRLN9pQrfgX0nV3mycORo603jxb7yXf1a1y9zklbXuM3BpumkCQcd -gAxCvI7y6sotYptmYuFgIl1iA2pvxyJY+ydg0DTNycowv3NCmGHt3E+rXAKzL9z9 -dlTPFMsB7yGwIq6qkRIRfcwwkD2A4uaI4XQmClP2Hp6q8lTwgrRAwi/e16QlZIG9 -MvLPiCEQ2zXSwwoo2pHo1a3ZKw6Wh4GQ6EgMs6RvIto8S+6CtCyBr326EA1Glen4 -IedV4AfxAgMBAAECggEAOqcJSNSA1o2oJKo3i374iiCRJAWGw/ilRzXMBtxoOow9 -/7av2czV6fMH+XmNf1M5bafEiaW49Q28rH+XWVFKJK0V7DVEm5l9EMveRcjn7B3A -jSHhiVZxxlfeYwjKd1L7AjB/pMjyTXuBVJFTrplSMpKB0I2GrzJwcOExpAcdZx98 -K0s5pauJH9bE0kI3p585SGQaIjrz0LvAmf6cQ5HhKfahJdWNnKZ/S4Kdqe+JCgyd -NawREHhf3tU01Cd3DOgXn4+5V/Ts6XtqY1RuSvonNv3nyeiOpX8C4cHKD5u2sNOC -3J4xWrrs0W3e8IATgAys56teKbEufHTUx52wNhAbzQKBgQD56W0tPCuaKrsjxsvE -dNHdm/9aQrN1jCJxUcGaxCIioXSyDvpSKcgxQbEqHXRTtJt5/Kadz9omq4vFTVtl -5Gf+3Lrf3ZT82SvYHtlIMdBZLlKwk6MolEa0KGAuJBNJVRIOkm5YjV/3bJebeTIb -WrLEyNCOXFAh3KVzBPU8nJ1aTwKBgQDEdISg3UsSOLBa0BfoJ5FlqGepZSufYgqh -xAJn8EbopnlzfmHBZAhE2+Igh0xcHhQqHThc3OuLtAkWu6fUSLiSA+XjU9TWPpA1 -C/325rhT23fxzYIlYFegR9BToxYhv14ufkcTXRfHRAhffk7K5A2nlJfldDZRmUh2 -5KIjXQ0pvwKBgQCa7S6VgFu3cw4Ym8DuxUzlCTRADGGcWYdwoLJY84YF2fmx+L8N -+ID2qDbgWOooiipocUwJQTWIC4jWg6JJhFNEGCpxZbhbF3aqwFULAHadEq6IcL4R -Bfre7LjTYeHi8C4FgpmNo/b+N/+0jmmVs6BnheZkmq3CkDqxFz3AmYai2QKBgQC1 -kzAmcoJ5U/YD6YO/Khsjx3QQSBb6mCZVf5HtuVIApCVqzuvRUACojEbDY+n61j4y -8pDum64FkKA557Xl6lTVeE7ZPtlgL7EfpnbT5kmGEDobPqPEofg7h0SQmRLSnEqT -VFmjFw7sOQA4Ksjuk7vfIOMHy9KMts0YPpdxcgbBhwKBgQCP8MeRPuhZ26/oIESr -I8ArLEaPebYmLXCT2ZTudGztoyYFxinRGHA4PdamSOKfB1li52wAaqgRA3cSqkUi -kabimVOvrOAWlnvznqXEHPNx6mbbKs08jh+uRRmrOmMrxAobpTqarL2Sdxb6afID -NkxNic7oHgsZpIkZ8HK+QjAAWA== ------END PRIVATE KEY----- -`) - -func TestServerHighLevelPublishRead(t *testing.T) { - for _, ca := range []struct { - publisherSoft string - publisherProto string - readerSoft string - readerProto string - }{ - {"ffmpeg", "udp", "ffmpeg", "udp"}, - {"ffmpeg", "udp", "gstreamer", "udp"}, - {"gstreamer", "udp", "ffmpeg", "udp"}, - {"gstreamer", "udp", "gstreamer", "udp"}, - - {"ffmpeg", "tcp", "ffmpeg", "tcp"}, - {"ffmpeg", "tcp", "gstreamer", "tcp"}, - {"gstreamer", "tcp", "ffmpeg", "tcp"}, - {"gstreamer", "tcp", "gstreamer", "tcp"}, - - {"ffmpeg", "tcp", "ffmpeg", "udp"}, - {"ffmpeg", "udp", "ffmpeg", "tcp"}, - - {"ffmpeg", "tls", "ffmpeg", "tls"}, - {"ffmpeg", "tls", "gstreamer", "tls"}, - {"gstreamer", "tls", "ffmpeg", "tls"}, - {"gstreamer", "tls", "gstreamer", "tls"}, - - {"ffmpeg", "udp", "ffmpeg", "multicast"}, - {"ffmpeg", "udp", "gstreamer", "multicast"}, - } { - t.Run(ca.publisherSoft+"_"+ca.publisherProto+"_"+ - ca.readerSoft+"_"+ca.readerProto, func(t *testing.T) { - var mutex sync.Mutex - var stream *ServerStream - var publisher *ServerSession - - s := &Server{ - Handler: &testServerHandler{ - onSessionClose: func(ctx *ServerHandlerOnSessionCloseCtx) { - mutex.Lock() - defer mutex.Unlock() - - if stream != nil { - if ctx.Session == publisher { - stream.Close() - stream = nil - } - } - }, - onDescribe: func(ctx *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) { - if ctx.Path != "test/stream" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, nil, fmt.Errorf("invalid path (%s)", ctx.Request.URL) - } - if ctx.Query != "key=val" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, nil, fmt.Errorf("invalid query (%s)", ctx.Query) - } - - mutex.Lock() - defer mutex.Unlock() - - if stream == nil { - return &base.Response{ - StatusCode: base.StatusNotFound, - }, nil, nil - } - - return &base.Response{ - StatusCode: base.StatusOK, - }, stream, nil - }, - onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { - if ctx.Path != "test/stream" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path (%s)", ctx.Request.URL) - } - if ctx.Query != "key=val" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid query (%s)", ctx.Query) - } - - mutex.Lock() - defer mutex.Unlock() - - if stream != nil { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("someone is already publishing") - } - - stream = NewServerStream(ctx.Tracks) - publisher = ctx.Session - - return &base.Response{ - StatusCode: base.StatusOK, - }, nil - }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { - if ctx.Path != "test/stream" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, nil, fmt.Errorf("invalid path (%s)", ctx.Request.URL) - } - if ctx.Query != "key=val" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, nil, fmt.Errorf("invalid query (%s)", ctx.Query) - } - - if stream == nil { - return &base.Response{ - StatusCode: base.StatusNotFound, - }, nil, nil - } - - return &base.Response{ - StatusCode: base.StatusOK, - }, stream, nil - }, - onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { - if ctx.Path != "test/stream" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path (%s)", ctx.Request.URL) - } - if ctx.Query != "key=val" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid query (%s)", ctx.Query) - } - - return &base.Response{ - StatusCode: base.StatusOK, - }, nil - }, - onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { - if ctx.Path != "test/stream" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid path (%s)", ctx.Request.URL) - } - if ctx.Query != "key=val" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid query (%s)", ctx.Query) - } - - return &base.Response{ - StatusCode: base.StatusOK, - }, nil - }, - onPacketRTP: func(ctx *ServerHandlerOnPacketRTPCtx) { - mutex.Lock() - defer mutex.Unlock() - - if ctx.Session == publisher { - stream.WritePacketRTP(ctx.TrackID, ctx.Packet, true) - } - }, - }, - RTSPAddress: "localhost:8554", - } - - var proto string - if ca.publisherProto == "tls" { - proto = "rtsps" - cert, err := tls.X509KeyPair(serverCert, serverKey) - require.NoError(t, err) - s.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}} - } else { - proto = "rtsp" - s.UDPRTPAddress = "127.0.0.1:8000" - s.UDPRTCPAddress = "127.0.0.1:8001" - s.MulticastIPRange = "224.1.0.0/16" - s.MulticastRTPPort = 8002 - s.MulticastRTCPPort = 8003 - } - - err := s.Start() - require.NoError(t, err) - defer s.Close() - - switch ca.publisherSoft { - case "ffmpeg": - ts := func() string { - switch ca.publisherProto { - case "udp", "tcp": - return ca.publisherProto - } - return "tcp" - }() - - cnt1, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.mkv", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", ts, - proto + "://localhost:8554/test/stream?key=val", - }) - require.NoError(t, err) - defer cnt1.close() - - case "gstreamer": - ts := func() string { - switch ca.publisherProto { - case "udp", "tcp": - return ca.publisherProto - } - return "tcp" - }() - - cnt1, err := newContainer("gstreamer", "publish", []string{ - "filesrc location=emptyvideo.mkv ! matroskademux ! video/x-h264 ! rtspclientsink " + - "location=" + proto + "://127.0.0.1:8554/test/stream?key=val protocols=" + ts + - " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0", - }) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - } - - time.Sleep(1 * time.Second) - - switch ca.readerSoft { - case "ffmpeg": - ts := func() string { - switch ca.readerProto { - case "udp", "tcp": - return ca.readerProto - case "multicast": - return "udp_multicast" - } - return "tcp" - }() - - cnt2, err := newContainer("ffmpeg", "read", []string{ - "-rtsp_transport", ts, - "-i", proto + "://localhost:8554/test/stream?key=val", - "-vframes", "1", - "-f", "image2", - "-y", "/dev/null", - }) - require.NoError(t, err) - defer cnt2.close() - require.Equal(t, 0, cnt2.wait()) - - case "gstreamer": - ts := func() string { - switch ca.readerProto { - case "udp", "tcp": - return ca.readerProto - case "multicast": - return "udp-mcast" - } - return "tcp" - }() - - cnt2, err := newContainer("gstreamer", "read", []string{ - "rtspsrc location=" + proto + "://127.0.0.1:8554/test/stream?key=val protocols=" + ts + - " tls-validation-flags=0 latency=0 " + - "! application/x-rtp,media=video ! decodebin ! exitafterframe ! fakesink", - }) - require.NoError(t, err) - defer cnt2.close() - require.Equal(t, 0, cnt2.wait()) - } - }) - } -} - func TestServerClose(t *testing.T) { s := &Server{ Handler: &testServerHandler{},