From 453dd19f909efc599b3a7f68f9a207598c18bc55 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Wed, 6 Jan 2021 19:19:39 +0100 Subject: [PATCH] add TestServerPauseMultiple, TestServerPlayMultiple --- clientconn.go | 28 ++----- pkg/base/response.go | 21 +++++ serverconf_test.go | 185 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 211 insertions(+), 23 deletions(-) diff --git a/clientconn.go b/clientconn.go index 75cfaa61..d9b98e4a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -208,15 +208,6 @@ func (c *ClientConn) Tracks() Tracks { return c.tracks } -func (c *ClientConn) readFrameTCPOrResponse() (interface{}, error) { - c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - f := base.InterleavedFrame{ - Payload: c.tcpFrameBuffer.Next(), - } - r := base.Response{} - return base.ReadInterleavedFrameOrResponse(&f, &r, c.br) -} - // Do writes a Request and reads a Response. // Interleaved frames received before the response are ignored. func (c *ClientConn) Do(req *base.Request) (*base.Response, error) { @@ -259,24 +250,15 @@ func (c *ClientConn) Do(req *base.Request) (*base.Response, error) { // interleaved frames are sent in two situations: // * when the server is v4lrtspserver, before the PLAY response // * when the stream is already playing - res, err := func() (*base.Response, error) { - for { - recv, err := c.readFrameTCPOrResponse() - if err != nil { - return nil, err - } - - if res, ok := recv.(*base.Response); ok { - return res, nil - } - } - }() + var res base.Response + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + err = res.ReadIgnoreFrames(c.br, c.tcpFrameBuffer.Next()) if err != nil { return nil, err } if c.conf.OnResponse != nil { - c.conf.OnResponse(res) + c.conf.OnResponse(&res) } // get session from response @@ -303,7 +285,7 @@ func (c *ClientConn) Do(req *base.Request) (*base.Response, error) { return c.Do(req) } - return res, nil + return &res, nil } // Options writes an OPTIONS request and reads a response. diff --git a/pkg/base/response.go b/pkg/base/response.go index d1b22559..e3a3da04 100644 --- a/pkg/base/response.go +++ b/pkg/base/response.go @@ -185,6 +185,27 @@ func (res *Response) Read(rb *bufio.Reader) error { return nil } +// ReadIgnoreFrames reads a response and ignores any interleaved frame sent +// before the response. +func (res *Response) ReadIgnoreFrames(rb *bufio.Reader, buf []byte) error { + buflen := len(buf) + f := InterleavedFrame{ + Payload: buf, + } + + for { + f.Payload = f.Payload[:buflen] + recv, err := ReadInterleavedFrameOrResponse(&f, res, rb) + if err != nil { + return err + } + + if _, ok := recv.(*Response); ok { + return nil + } + } +} + // Write writes a Response. func (res Response) Write(bw *bufio.Writer) error { if res.StatusMessage == "" { diff --git a/serverconf_test.go b/serverconf_test.go index 308ec081..96143adb 100644 --- a/serverconf_test.go +++ b/serverconf_test.go @@ -183,6 +183,20 @@ func (ts *testServ) handleConn(conn *ServerConn) { }, nil } + onPause := func(req *base.Request) (*base.Response, error) { + ts.mutex.Lock() + defer ts.mutex.Unlock() + + delete(ts.readers, conn) + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Session": base.HeaderValue{"12345678"}, + }, + }, nil + } + onFrame := func(trackID int, typ StreamType, buf []byte) { ts.mutex.Lock() defer ts.mutex.Unlock() @@ -200,6 +214,7 @@ func (ts *testServ) handleConn(conn *ServerConn) { OnSetup: onSetup, OnPlay: onPlay, OnRecord: onRecord, + OnPause: onPause, OnFrame: onFrame, }) @@ -466,3 +481,173 @@ func TestServerResponseBeforeFrames(t *testing.T) { err = fr.Read(bconn.Reader) require.NoError(t, err) } + +func TestServerPlayMultiple(t *testing.T) { + ts, err := newTestServ(nil) + require.NoError(t, err) + defer ts.close() + + cnt1, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "tcp", + "rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + buf := make([]byte, 2048) + err = res.ReadIgnoreFrames(bconn.Reader, buf) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) +} + +func TestServerPauseMultiple(t *testing.T) { + ts, err := newTestServ(nil) + require.NoError(t, err) + defer ts.close() + + cnt1, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "tcp", + "rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": headers.Transport{ + Protocol: StreamProtocolTCP, + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + InterleavedIds: &[2]int{0, 1}, + }.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Pause, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + buf := make([]byte, 2048) + err = res.ReadIgnoreFrames(bconn.Reader, buf) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Pause, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + buf = make([]byte, 2048) + err = res.ReadIgnoreFrames(bconn.Reader, buf) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) +}