add TestServerPauseMultiple, TestServerPlayMultiple

This commit is contained in:
aler9
2021-01-06 19:19:39 +01:00
parent d0834e7446
commit 453dd19f90
3 changed files with 211 additions and 23 deletions

View File

@@ -208,15 +208,6 @@ func (c *ClientConn) Tracks() Tracks {
return c.tracks return c.tracks
} }
func (c *ClientConn) readFrameTCPOrResponse() (interface{}, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
f := base.InterleavedFrame{
Payload: c.tcpFrameBuffer.Next(),
}
r := base.Response{}
return base.ReadInterleavedFrameOrResponse(&f, &r, c.br)
}
// Do writes a Request and reads a Response. // Do writes a Request and reads a Response.
// Interleaved frames received before the response are ignored. // Interleaved frames received before the response are ignored.
func (c *ClientConn) Do(req *base.Request) (*base.Response, error) { func (c *ClientConn) Do(req *base.Request) (*base.Response, error) {
@@ -259,24 +250,15 @@ func (c *ClientConn) Do(req *base.Request) (*base.Response, error) {
// interleaved frames are sent in two situations: // interleaved frames are sent in two situations:
// * when the server is v4lrtspserver, before the PLAY response // * when the server is v4lrtspserver, before the PLAY response
// * when the stream is already playing // * when the stream is already playing
res, err := func() (*base.Response, error) { var res base.Response
for { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
recv, err := c.readFrameTCPOrResponse() err = res.ReadIgnoreFrames(c.br, c.tcpFrameBuffer.Next())
if err != nil {
return nil, err
}
if res, ok := recv.(*base.Response); ok {
return res, nil
}
}
}()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if c.conf.OnResponse != nil { if c.conf.OnResponse != nil {
c.conf.OnResponse(res) c.conf.OnResponse(&res)
} }
// get session from response // get session from response
@@ -303,7 +285,7 @@ func (c *ClientConn) Do(req *base.Request) (*base.Response, error) {
return c.Do(req) return c.Do(req)
} }
return res, nil return &res, nil
} }
// Options writes an OPTIONS request and reads a response. // Options writes an OPTIONS request and reads a response.

View File

@@ -185,6 +185,27 @@ func (res *Response) Read(rb *bufio.Reader) error {
return nil return nil
} }
// ReadIgnoreFrames reads a response and ignores any interleaved frame sent
// before the response.
func (res *Response) ReadIgnoreFrames(rb *bufio.Reader, buf []byte) error {
buflen := len(buf)
f := InterleavedFrame{
Payload: buf,
}
for {
f.Payload = f.Payload[:buflen]
recv, err := ReadInterleavedFrameOrResponse(&f, res, rb)
if err != nil {
return err
}
if _, ok := recv.(*Response); ok {
return nil
}
}
}
// Write writes a Response. // Write writes a Response.
func (res Response) Write(bw *bufio.Writer) error { func (res Response) Write(bw *bufio.Writer) error {
if res.StatusMessage == "" { if res.StatusMessage == "" {

View File

@@ -183,6 +183,20 @@ func (ts *testServ) handleConn(conn *ServerConn) {
}, nil }, nil
} }
onPause := func(req *base.Request) (*base.Response, error) {
ts.mutex.Lock()
defer ts.mutex.Unlock()
delete(ts.readers, conn)
return &base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Session": base.HeaderValue{"12345678"},
},
}, nil
}
onFrame := func(trackID int, typ StreamType, buf []byte) { onFrame := func(trackID int, typ StreamType, buf []byte) {
ts.mutex.Lock() ts.mutex.Lock()
defer ts.mutex.Unlock() defer ts.mutex.Unlock()
@@ -200,6 +214,7 @@ func (ts *testServ) handleConn(conn *ServerConn) {
OnSetup: onSetup, OnSetup: onSetup,
OnPlay: onPlay, OnPlay: onPlay,
OnRecord: onRecord, OnRecord: onRecord,
OnPause: onPause,
OnFrame: onFrame, OnFrame: onFrame,
}) })
@@ -466,3 +481,173 @@ func TestServerResponseBeforeFrames(t *testing.T) {
err = fr.Read(bconn.Reader) err = fr.Read(bconn.Reader)
require.NoError(t, err) require.NoError(t, err)
} }
func TestServerPlayMultiple(t *testing.T) {
ts, err := newTestServ(nil)
require.NoError(t, err)
defer ts.close()
cnt1, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "tcp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = base.Request{
Method: base.Setup,
URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": headers.Transport{
Protocol: StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
InterleavedIds: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
var res base.Response
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Play,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Play,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
},
}.Write(bconn.Writer)
require.NoError(t, err)
buf := make([]byte, 2048)
err = res.ReadIgnoreFrames(bconn.Reader, buf)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}
func TestServerPauseMultiple(t *testing.T) {
ts, err := newTestServ(nil)
require.NoError(t, err)
defer ts.close()
cnt1, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "tcp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
err = base.Request{
Method: base.Setup,
URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Transport": headers.Transport{
Protocol: StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModePlay
return &v
}(),
InterleavedIds: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
var res base.Response
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Play,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = res.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Pause,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
},
}.Write(bconn.Writer)
require.NoError(t, err)
buf := make([]byte, 2048)
err = res.ReadIgnoreFrames(bconn.Reader, buf)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Pause,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
},
}.Write(bconn.Writer)
require.NoError(t, err)
buf = make([]byte, 2048)
err = res.ReadIgnoreFrames(bconn.Reader, buf)
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
}