From cb6684d3cf237541f2d9f51e6899541f96c8f81c Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 13 Mar 2021 16:57:37 +0100 Subject: [PATCH] rewrite tests --- clientconnpublish_test.go | 330 +++++++++++ clientconf_test.go => clientconnread_test.go | 377 +----------- gortsplib_test.go | 53 ++ serverconf_test.go => serverconn_test.go | 348 +---------- serverconnpublish_test.go | 211 +++++++ serverconnread_test.go | 592 +++++++++++++++++++ 6 files changed, 1210 insertions(+), 701 deletions(-) create mode 100644 clientconnpublish_test.go rename clientconf_test.go => clientconnread_test.go (61%) create mode 100644 gortsplib_test.go rename serverconf_test.go => serverconn_test.go (60%) create mode 100644 serverconnpublish_test.go create mode 100644 serverconnread_test.go diff --git a/clientconnpublish_test.go b/clientconnpublish_test.go new file mode 100644 index 00000000..6c2c8383 --- /dev/null +++ b/clientconnpublish_test.go @@ -0,0 +1,330 @@ +package gortsplib + +import ( + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/pkg/rtph264" +) + +func TestClientConnPublishSerial(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoder() + sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet + require.NoError(t, err) + + track, err := NewTrackH264(96, sps, pps) + require.NoError(t, err) + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + if proto == "udp" { + v := StreamProtocolUDP + return &v + } + v := StreamProtocolTCP + return &v + }(), + } + + conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + + buf := make([]byte, 2048) + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + require.NoError(t, err) + + conn.Close() + + n, _, err = pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + require.Error(t, err) + }) + } +} + +func TestClientConnPublishParallel(t *testing.T) { + for _, ca := range []struct { + proto string + server string + }{ + {"udp", "rtsp-simple-server"}, + {"udp", "ffmpeg"}, + {"tcp", "rtsp-simple-server"}, + {"tcp", "ffmpeg"}, + } { + t.Run(ca.proto+"_"+ca.server, func(t *testing.T) { + switch ca.server { + case "rtsp-simple-server": + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + default: + cnt0, err := newContainer("rtsp-simple-server", "server0", []string{"{}"}) + require.NoError(t, err) + defer cnt0.close() + + cnt1, err := newContainer("ffmpeg", "server", []string{ + "-fflags nobuffer -re -rtsp_flags listen -i rtsp://localhost:8555/teststream -c copy -f rtsp rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt1.close() + } + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoder() + sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet + require.NoError(t, err) + + track, err := NewTrackH264(96, sps, pps) + require.NoError(t, err) + + writerDone := make(chan struct{}) + defer func() { <-writerDone }() + + var conn *ClientConn + defer func() { conn.Close() }() + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + if ca.proto == "udp" { + v := StreamProtocolUDP + return &v + } + v := StreamProtocolTCP + return &v + }(), + } + + go func() { + defer close(writerDone) + + port := "8554" + if ca.server == "ffmpeg" { + port = "8555" + } + var err error + conn, err = conf.DialPublish("rtsp://localhost:"+port+"/teststream", + Tracks{track}) + require.NoError(t, err) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + if err != nil { + break + } + } + }() + + if ca.server == "ffmpeg" { + time.Sleep(5 * time.Second) + } + time.Sleep(1 * time.Second) + + cnt3, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://localhost:8554/teststream", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt3.close() + + code := cnt3.wait() + require.Equal(t, 0, code) + }) + } +} + +func TestClientConnPublishPauseSerial(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoder() + sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet + require.NoError(t, err) + + track, err := NewTrackH264(96, sps, pps) + require.NoError(t, err) + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + if proto == "udp" { + v := StreamProtocolUDP + return &v + } + v := StreamProtocolTCP + return &v + }(), + } + + conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + defer conn.Close() + + buf := make([]byte, 2048) + + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + require.NoError(t, err) + + _, err = conn.Pause() + require.NoError(t, err) + + n, _, err = pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + require.Error(t, err) + + _, err = conn.Record() + require.NoError(t, err) + + n, _, err = pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + require.NoError(t, err) + }) + } +} + +func TestClientConnPublishPauseParallel(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoder() + sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet + require.NoError(t, err) + + track, err := NewTrackH264(96, sps, pps) + require.NoError(t, err) + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + if proto == "udp" { + v := StreamProtocolUDP + return &v + } + v := StreamProtocolTCP + return &v + }(), + } + + conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + + writerDone := make(chan struct{}) + go func() { + defer close(writerDone) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + + err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) + if err != nil { + break + } + } + }() + + time.Sleep(1 * time.Second) + + _, err = conn.Pause() + require.NoError(t, err) + <-writerDone + + conn.Close() + }) + } +} diff --git a/clientconf_test.go b/clientconnread_test.go similarity index 61% rename from clientconf_test.go rename to clientconnread_test.go index 40e12023..2738c39d 100644 --- a/clientconf_test.go +++ b/clientconnread_test.go @@ -4,9 +4,6 @@ import ( "bufio" "crypto/tls" "net" - "os" - "os/exec" - "strconv" "strings" "sync/atomic" "testing" @@ -16,55 +13,9 @@ import ( "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" - "github.com/aler9/gortsplib/pkg/rtph264" ) -type container struct { - name string -} - -func newContainer(image string, name string, args []string) (*container, error) { - c := &container{ - name: name, - } - - exec.Command("docker", "kill", "gortsplib-test-"+name).Run() - exec.Command("docker", "wait", "gortsplib-test-"+name).Run() - - cmd := []string{"docker", "run", - "--network=host", - "--name=gortsplib-test-" + name, - "gortsplib-test-" + image} - cmd = append(cmd, args...) - ecmd := exec.Command(cmd[0], cmd[1:]...) - ecmd.Stdout = nil - ecmd.Stderr = os.Stderr - - err := ecmd.Start() - if err != nil { - return nil, err - } - - time.Sleep(1 * time.Second) - - return c, nil -} - -func (c *container) close() { - exec.Command("docker", "kill", "gortsplib-test-"+c.name).Run() - exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() - exec.Command("docker", "rm", "gortsplib-test-"+c.name).Run() -} - -func (c *container) wait() int { - exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() - out, _ := exec.Command("docker", "inspect", "gortsplib-test-"+c.name, - "--format={{.State.ExitCode}}").Output() - code, _ := strconv.ParseInt(string(out[:len(out)-1]), 10, 64) - return int(code) -} - -func TestClientRead(t *testing.T) { +func TestClientConnRead(t *testing.T) { for _, ca := range []struct { encrypted bool proto string @@ -250,7 +201,7 @@ func TestClientRead(t *testing.T) { } } -func TestClientReadNoServerPorts(t *testing.T) { +func TestClientConnReadNoServerPorts(t *testing.T) { for _, ca := range []string{ "zero", "no", @@ -370,7 +321,7 @@ func TestClientReadNoServerPorts(t *testing.T) { } } -func TestClientReadAutomaticProtocol(t *testing.T) { +func TestClientConnReadAutomaticProtocol(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -479,7 +430,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { <-done } -func TestClientReadRedirect(t *testing.T) { +func TestClientConnReadRedirect(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -617,7 +568,7 @@ func TestClientReadRedirect(t *testing.T) { <-done } -func TestClientReadPause(t *testing.T) { +func TestClientConnReadPause(t *testing.T) { for _, proto := range []string{ "udp", "tcp", @@ -687,321 +638,3 @@ func TestClientReadPause(t *testing.T) { }) } } - -func TestClientPublishSerial(t *testing.T) { - for _, proto := range []string{ - "udp", - "tcp", - } { - t.Run(proto, func(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoder() - sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet - require.NoError(t, err) - - track, err := NewTrackH264(96, sps, pps) - require.NoError(t, err) - - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - if proto == "udp" { - v := StreamProtocolUDP - return &v - } - v := StreamProtocolTCP - return &v - }(), - } - - conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", - Tracks{track}) - require.NoError(t, err) - - buf := make([]byte, 2048) - n, _, err := pc.ReadFrom(buf) - require.NoError(t, err) - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - require.NoError(t, err) - - conn.Close() - - n, _, err = pc.ReadFrom(buf) - require.NoError(t, err) - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - require.Error(t, err) - }) - } -} - -func TestClientPublishParallel(t *testing.T) { - for _, ca := range []struct { - proto string - server string - }{ - {"udp", "rtsp-simple-server"}, - {"udp", "ffmpeg"}, - {"tcp", "rtsp-simple-server"}, - {"tcp", "ffmpeg"}, - } { - t.Run(ca.proto+"_"+ca.server, func(t *testing.T) { - switch ca.server { - case "rtsp-simple-server": - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() - - default: - cnt0, err := newContainer("rtsp-simple-server", "server0", []string{"{}"}) - require.NoError(t, err) - defer cnt0.close() - - cnt1, err := newContainer("ffmpeg", "server", []string{ - "-fflags nobuffer -re -rtsp_flags listen -i rtsp://localhost:8555/teststream -c copy -f rtsp rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt1.close() - } - - time.Sleep(1 * time.Second) - - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoder() - sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet - require.NoError(t, err) - - track, err := NewTrackH264(96, sps, pps) - require.NoError(t, err) - - writerDone := make(chan struct{}) - defer func() { <-writerDone }() - - var conn *ClientConn - defer func() { conn.Close() }() - - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - if ca.proto == "udp" { - v := StreamProtocolUDP - return &v - } - v := StreamProtocolTCP - return &v - }(), - } - - go func() { - defer close(writerDone) - - port := "8554" - if ca.server == "ffmpeg" { - port = "8555" - } - var err error - conn, err = conf.DialPublish("rtsp://localhost:"+port+"/teststream", - Tracks{track}) - require.NoError(t, err) - - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } - - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - if err != nil { - break - } - } - }() - - if ca.server == "ffmpeg" { - time.Sleep(5 * time.Second) - } - time.Sleep(1 * time.Second) - - cnt3, err := newContainer("ffmpeg", "read", []string{ - "-rtsp_transport", "udp", - "-i", "rtsp://localhost:8554/teststream", - "-vframes", "1", - "-f", "image2", - "-y", "/dev/null", - }) - require.NoError(t, err) - defer cnt3.close() - - code := cnt3.wait() - require.Equal(t, 0, code) - }) - } -} - -func TestClientPublishPauseSerial(t *testing.T) { - for _, proto := range []string{ - "udp", - "tcp", - } { - t.Run(proto, func(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoder() - sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet - require.NoError(t, err) - - track, err := NewTrackH264(96, sps, pps) - require.NoError(t, err) - - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - if proto == "udp" { - v := StreamProtocolUDP - return &v - } - v := StreamProtocolTCP - return &v - }(), - } - - conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", - Tracks{track}) - require.NoError(t, err) - defer conn.Close() - - buf := make([]byte, 2048) - - n, _, err := pc.ReadFrom(buf) - require.NoError(t, err) - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - require.NoError(t, err) - - _, err = conn.Pause() - require.NoError(t, err) - - n, _, err = pc.ReadFrom(buf) - require.NoError(t, err) - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - require.Error(t, err) - - _, err = conn.Record() - require.NoError(t, err) - - n, _, err = pc.ReadFrom(buf) - require.NoError(t, err) - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - require.NoError(t, err) - }) - } -} - -func TestClientPublishPauseParallel(t *testing.T) { - for _, proto := range []string{ - "udp", - "tcp", - } { - t.Run(proto, func(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoder() - sps, pps, err := decoder.ReadSPSPPS(rtph264.PacketConnReader{pc}) //nolint:govet - require.NoError(t, err) - - track, err := NewTrackH264(96, sps, pps) - require.NoError(t, err) - - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - if proto == "udp" { - v := StreamProtocolUDP - return &v - } - v := StreamProtocolTCP - return &v - }(), - } - - conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", - Tracks{track}) - require.NoError(t, err) - - writerDone := make(chan struct{}) - go func() { - defer close(writerDone) - - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - require.NoError(t, err) - - err = conn.WriteFrame(track.ID, StreamTypeRTP, buf[:n]) - if err != nil { - break - } - } - }() - - time.Sleep(1 * time.Second) - - _, err = conn.Pause() - require.NoError(t, err) - <-writerDone - - conn.Close() - }) - } -} diff --git a/gortsplib_test.go b/gortsplib_test.go new file mode 100644 index 00000000..e927c9dd --- /dev/null +++ b/gortsplib_test.go @@ -0,0 +1,53 @@ +package gortsplib + +import ( + "os" + "os/exec" + "strconv" + "time" +) + +type container struct { + name string +} + +func newContainer(image string, name string, args []string) (*container, error) { + c := &container{ + name: name, + } + + exec.Command("docker", "kill", "gortsplib-test-"+name).Run() + exec.Command("docker", "wait", "gortsplib-test-"+name).Run() + + cmd := []string{"docker", "run", + "--network=host", + "--name=gortsplib-test-" + name, + "gortsplib-test-" + image} + cmd = append(cmd, args...) + ecmd := exec.Command(cmd[0], cmd[1:]...) + ecmd.Stdout = nil + ecmd.Stderr = os.Stderr + + err := ecmd.Start() + if err != nil { + return nil, err + } + + time.Sleep(1 * time.Second) + + return c, nil +} + +func (c *container) close() { + exec.Command("docker", "kill", "gortsplib-test-"+c.name).Run() + exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() + exec.Command("docker", "rm", "gortsplib-test-"+c.name).Run() +} + +func (c *container) wait() int { + exec.Command("docker", "wait", "gortsplib-test-"+c.name).Run() + out, _ := exec.Command("docker", "inspect", "gortsplib-test-"+c.name, + "--format={{.State.ExitCode}}").Output() + code, _ := strconv.ParseInt(string(out[:len(out)-1]), 10, 64) + return int(code) +} diff --git a/serverconf_test.go b/serverconn_test.go similarity index 60% rename from serverconf_test.go rename to serverconn_test.go index 04388054..01041b63 100644 --- a/serverconf_test.go +++ b/serverconn_test.go @@ -243,20 +243,6 @@ func (ts *testServ) handleConn(conn *ServerConn) { }, nil } - onPause := func(req *base.Request) (*base.Response, error) { - ts.mutex.Lock() - defer ts.mutex.Unlock() - - delete(ts.readers, conn) - - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Session": base.HeaderValue{"12345678"}, - }, - }, nil - } - onFrame := func(trackID int, typ StreamType, buf []byte) { ts.mutex.Lock() defer ts.mutex.Unlock() @@ -274,7 +260,6 @@ func (ts *testServ) handleConn(conn *ServerConn) { OnSetup: onSetup, OnPlay: onPlay, OnRecord: onRecord, - OnPause: onPause, OnFrame: onFrame, }) if err != io.EOF && err != ErrServerTeardown { @@ -344,36 +329,7 @@ y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD -----END RSA PRIVATE KEY----- `) -func TestServerTeardownResponse(t *testing.T) { - ts, err := newTestServ(nil) - require.NoError(t, err) - defer ts.close() - - conn, err := net.Dial("tcp", "localhost:8554") - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - err = base.Request{ - Method: base.Teardown, - URL: base.MustParseURL("rtsp://localhost:8554/"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - buf := make([]byte, 2048) - _, err = bconn.Read(buf) - require.Equal(t, io.EOF, err) -} - -func TestServerPublishRead(t *testing.T) { +func TestServerConnPublishReadHighLevel(t *testing.T) { for _, ca := range []struct { encrypted bool publisherSoft string @@ -478,24 +434,23 @@ func TestServerPublishRead(t *testing.T) { } } -func TestServerReadWithoutSetupTrackID(t *testing.T) { - ts, err := newTestServ(nil) +func TestServerConnTeardownResponse(t *testing.T) { + s, err := Serve(":8554") require.NoError(t, err) - defer ts.close() + defer s.Close() - cnt1, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "tcp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt1.close() + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) - time.Sleep(1 * time.Second) + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + err = <-conn.Read(ServerConnReadHandlers{}) + require.Equal(t, ErrServerTeardown, err) + }() conn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) @@ -503,22 +458,10 @@ func TestServerReadWithoutSetupTrackID(t *testing.T) { bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) err = base.Request{ - Method: base.Setup, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Method: base.Teardown, + URL: base.MustParseURL("rtsp://localhost:8554/"), Header: base.Header{ "CSeq": base.HeaderValue{"1"}, - "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - InterleavedIds: &[2]int{0, 1}, - }.Write(), }, }.Write(bconn.Writer) require.NoError(t, err) @@ -528,260 +471,7 @@ func TestServerReadWithoutSetupTrackID(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - err = base.Request{ - Method: base.Play, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - var fr base.InterleavedFrame - fr.Payload = make([]byte, 2048) - err = fr.Read(bconn.Reader) - require.NoError(t, err) -} - -func TestServerReadResponseBeforeFrames(t *testing.T) { - ts, err := newTestServ(nil) - require.NoError(t, err) - defer ts.close() - - cnt1, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "tcp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - conn, err := net.Dial("tcp", "localhost:8554") - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - err = base.Request{ - Method: base.Setup, - URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - InterleavedIds: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - err = base.Request{ - Method: base.Play, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - var fr base.InterleavedFrame - fr.Payload = make([]byte, 2048) - err = fr.Read(bconn.Reader) - require.NoError(t, err) -} - -func TestServerReadPlayMultiple(t *testing.T) { - ts, err := newTestServ(nil) - require.NoError(t, err) - defer ts.close() - - cnt1, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "tcp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - conn, err := net.Dial("tcp", "localhost:8554") - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - err = base.Request{ - Method: base.Setup, - URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - InterleavedIds: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - err = base.Request{ - Method: base.Play, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - err = base.Request{ - Method: base.Play, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - buf := make([]byte, 2048) - err = res.ReadIgnoreFrames(bconn.Reader, buf) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) -} - -func TestServerReadPauseMultiple(t *testing.T) { - ts, err := newTestServ(nil) - require.NoError(t, err) - defer ts.close() - - cnt1, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "tcp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - conn, err := net.Dial("tcp", "localhost:8554") - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - err = base.Request{ - Method: base.Setup, - URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - InterleavedIds: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - err = base.Request{ - Method: base.Play, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - err = base.Request{ - Method: base.Pause, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - buf := make([]byte, 2048) - err = res.ReadIgnoreFrames(bconn.Reader, buf) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - err = base.Request{ - Method: base.Pause, - URL: base.MustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - buf = make([]byte, 2048) - err = res.ReadIgnoreFrames(bconn.Reader, buf) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) + _, err = bconn.Read(buf) + require.Equal(t, io.EOF, err) } diff --git a/serverconnpublish_test.go b/serverconnpublish_test.go new file mode 100644 index 00000000..38c16e12 --- /dev/null +++ b/serverconnpublish_test.go @@ -0,0 +1,211 @@ +package gortsplib + +import ( + "bufio" + "io" + "net" + "strconv" + "sync/atomic" + "testing" + "time" + + psdp "github.com/pion/sdp/v3" + "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/gortsplib/pkg/headers" +) + +func TestServerConnPublishReceivePackets(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + packetsReceived := make(chan struct{}) + + conf := ServerConf{ + UDPRTPAddress: ":8000", + UDPRTCPAddress: ":8001", + } + + s, err := conf.Serve(":8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + onAnnounce := func(req *base.Request, tracks Tracks) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onRecord := func(req *base.Request) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + rtpReceived := uint64(0) + onFrame := func(trackID int, typ StreamType, buf []byte) { + if atomic.SwapUint64(&rtpReceived, 1) == 0 { + require.Equal(t, 0, trackID) + require.Equal(t, StreamTypeRTP, typ) + require.Equal(t, []byte("\x01\x02\x03\x04"), buf) + } else { + require.Equal(t, 0, trackID) + require.Equal(t, StreamTypeRTCP, typ) + require.Equal(t, []byte("\x05\x06\x07\x08"), buf) + close(packetsReceived) + } + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnAnnounce: onAnnounce, + OnSetup: onSetup, + OnRecord: onRecord, + OnFrame: onFrame, + }) + require.Equal(t, io.EOF, err) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + tracks := Tracks{track} + + for i, t := range tracks { + t.ID = i + t.BaseURL = base.MustParseURL("rtsp://localhost:8554/teststream") + t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=" + strconv.FormatInt(int64(i), 10), + }) + } + + err = base.Request{ + Method: base.Announce, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: tracks.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + th := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModeRecord + return &v + }(), + } + + if proto == "udp" { + th.Protocol = StreamProtocolUDP + th.ClientPorts = &[2]int{35466, 35467} + } else { + th.Protocol = StreamProtocolTCP + th.InterleavedIds = &[2]int{0, 1} + } + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + th, err = headers.ReadTransport(res.Header["Transport"]) + require.NoError(t, err) + + err = base.Request{ + Method: base.Record, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + if proto == "udp" { + time.Sleep(500 * time.Millisecond) + + l1, err := net.ListenPacket("udp", "localhost:35466") + require.NoError(t, err) + defer l1.Close() + + l1.WriteTo([]byte("\x01\x02\x03\x04"), &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ServerPorts[0], + }) + + time.Sleep(100 * time.Millisecond) + + l1, err = net.ListenPacket("udp", "localhost:35467") + require.NoError(t, err) + defer l1.Close() + + l1.WriteTo([]byte("\x05\x06\x07\x08"), &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ServerPorts[1], + }) + } else { + err = base.InterleavedFrame{ + TrackID: 0, + StreamType: StreamTypeRTP, + Payload: []byte("\x01\x02\x03\x04"), + }.Write(bconn.Writer) + require.NoError(t, err) + + err = base.InterleavedFrame{ + TrackID: 0, + StreamType: StreamTypeRTCP, + Payload: []byte("\x05\x06\x07\x08"), + }.Write(bconn.Writer) + require.NoError(t, err) + } + + <-packetsReceived + }) + } +} diff --git a/serverconnread_test.go b/serverconnread_test.go new file mode 100644 index 00000000..8d0640ef --- /dev/null +++ b/serverconnread_test.go @@ -0,0 +1,592 @@ +package gortsplib + +import ( + "bufio" + "io" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/gortsplib/pkg/headers" +) + +func TestServerConnReadReceivePackets(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + packetsReceived := make(chan struct{}) + + conf := ServerConf{ + UDPRTPAddress: ":8000", + UDPRTCPAddress: ":8001", + } + + s, err := conf.Serve(":8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onPlay := func(req *base.Request) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onFrame := func(trackID int, typ StreamType, buf []byte) { + require.Equal(t, 0, trackID) + require.Equal(t, StreamTypeRTCP, typ) + require.Equal(t, []byte("\x01\x02\x03\x04"), buf) + close(packetsReceived) + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnSetup: onSetup, + OnPlay: onPlay, + OnFrame: onFrame, + }) + require.Equal(t, io.EOF, err) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + th := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + } + + if proto == "udp" { + th.Protocol = StreamProtocolUDP + th.ClientPorts = &[2]int{35466, 35467} + } else { + th.Protocol = StreamProtocolTCP + th.InterleavedIds = &[2]int{0, 1} + } + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + th, err = headers.ReadTransport(res.Header["Transport"]) + require.NoError(t, err) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + if proto == "udp" { + time.Sleep(500 * time.Millisecond) + + l1, err := net.ListenPacket("udp", "localhost:35467") + require.NoError(t, err) + defer l1.Close() + + l1.WriteTo([]byte("\x01\x02\x03\x04"), &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ServerPorts[1], + }) + } else { + err = base.InterleavedFrame{ + TrackID: 0, + StreamType: StreamTypeRTCP, + Payload: []byte("\x01\x02\x03\x04"), + }.Write(bconn.Writer) + require.NoError(t, err) + } + + <-packetsReceived + }) + } +} + +func TestServerConnReadWithoutSetupTrackID(t *testing.T) { + s, err := Serve(":8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onPlay := func(req *base.Request) (*base.Response, error) { + go func() { + time.Sleep(100 * time.Millisecond) + conn.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) + }() + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnSetup: onSetup, + OnPlay: onPlay, + }) + require.Equal(t, io.EOF, err) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var fr base.InterleavedFrame + fr.Payload = make([]byte, 2048) + err = fr.Read(bconn.Reader) + require.NoError(t, err) +} + +func TestServerConnReadTCPResponseBeforeFrames(t *testing.T) { + s, err := Serve(":8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + writerDone := make(chan struct{}) + defer func() { <-writerDone }() + writerTerminate := make(chan struct{}) + defer close(writerTerminate) + + onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onPlay := func(req *base.Request) (*base.Response, error) { + go func() { + defer close(writerDone) + + conn.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) + + t := time.NewTicker(50 * time.Millisecond) + defer t.Stop() + + for { + select { + case <-t.C: + conn.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) + case <-writerTerminate: + return + } + } + }() + + time.Sleep(50 * time.Millisecond) + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnSetup: onSetup, + OnPlay: onPlay, + }) + require.Equal(t, io.EOF, err) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var fr base.InterleavedFrame + fr.Payload = make([]byte, 2048) + err = fr.Read(bconn.Reader) + require.NoError(t, err) +} + +func TestServerConnReadPlayMultiple(t *testing.T) { + s, err := Serve(":8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + writerDone := make(chan struct{}) + defer func() { <-writerDone }() + writerTerminate := make(chan struct{}) + defer close(writerTerminate) + + onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onPlay := func(req *base.Request) (*base.Response, error) { + if conn.State() != ServerConnStatePlay { + go func() { + defer close(writerDone) + + t := time.NewTicker(50 * time.Millisecond) + defer t.Stop() + + for { + select { + case <-t.C: + conn.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) + case <-writerTerminate: + return + } + } + }() + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnSetup: onSetup, + OnPlay: onPlay, + }) + require.Equal(t, io.EOF, err) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + buf := make([]byte, 2048) + err = res.ReadIgnoreFrames(bconn.Reader, buf) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) +} + +func TestServerConnReadPauseMultiple(t *testing.T) { + s, err := Serve(":8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + writerDone := make(chan struct{}) + defer func() { <-writerDone }() + writerTerminate := make(chan struct{}) + defer close(writerTerminate) + + onSetup := func(req *base.Request, th *headers.Transport, trackID int) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onPlay := func(req *base.Request) (*base.Response, error) { + go func() { + defer close(writerDone) + + t := time.NewTicker(50 * time.Millisecond) + defer t.Stop() + + for { + select { + case <-t.C: + conn.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) + case <-writerTerminate: + return + } + } + }() + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onPause := func(req *base.Request) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnSetup: onSetup, + OnPlay: onPlay, + OnPause: onPause, + }) + require.Equal(t, io.EOF, err) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Pause, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + buf := make([]byte, 2048) + err = res.ReadIgnoreFrames(bconn.Reader, buf) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Pause, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + buf = make([]byte, 2048) + err = res.ReadIgnoreFrames(bconn.Reader, buf) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) +}