From a9995fb22858083ad87bd3ee6d0573da55f5da12 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Fri, 28 May 2021 13:57:31 +0200 Subject: [PATCH] client: add range argument to clientconn.Play() --- client.go | 2 +- client_read_test.go | 138 ++++++++++++++++++++++++++- clientconn.go | 20 +++- examples/client-read-partial/main.go | 2 +- examples/client-read-pause/main.go | 2 +- 5 files changed, 155 insertions(+), 9 deletions(-) diff --git a/client.go b/client.go index f76df95d..bde71405 100644 --- a/client.go +++ b/client.go @@ -165,7 +165,7 @@ func (c *Client) DialReadContext(ctx context.Context, address string) (*ClientCo } } - _, err = conn.Play() + _, err = conn.Play(nil) if err != nil { conn.Close() return nil, err diff --git a/client_read_test.go b/client_read_test.go index 106742fb..f3487c2e 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -1252,7 +1252,7 @@ func TestClientReadPause(t *testing.T) { conn.ReadFrames(func(id int, typ StreamType, payload []byte) { }) - _, err = conn.Play() + _, err = conn.Play(nil) require.NoError(t, err) firstFrame = int32(0) @@ -1734,3 +1734,139 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { conn.Close() <-done } + +func TestClientReadSeek(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := l.Accept() + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + req, err := readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, + }, + Body: Tracks{track}.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + var inTH headers.Transport + err = inTH.Read(req.Header["Transport"]) + require.NoError(t, err) + + th := headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Protocol: StreamProtocolTCP, + InterleavedIDs: inTH.InterleavedIDs, + } + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + + var ra headers.Range + err = ra.Read(req.Header["Range"]) + require.NoError(t, err) + require.Equal(t, headers.Range{ + Value: &headers.RangeNPT{ + Start: headers.RangeNPTTime(5500 * time.Millisecond), + }, + }, ra) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Teardown, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + }() + + c := &Client{ + StreamProtocol: func() *StreamProtocol { + v := StreamProtocolTCP + return &v + }(), + } + + u, err := base.ParseURL("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + conn, err := c.Dial(u.Scheme, u.Host) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.Options(u) + require.NoError(t, err) + + tracks, _, err := conn.Describe(u) + require.NoError(t, err) + + for _, track := range tracks { + _, err := conn.Setup(headers.TransportModePlay, track, 0, 0) + require.NoError(t, err) + } + + _, err = conn.Play(&headers.Range{ + Value: &headers.RangeNPT{ + Start: headers.RangeNPTTime(5500 * time.Millisecond), + }, + }) + require.NoError(t, err) + + // asdasdasd +} diff --git a/clientconn.go b/clientconn.go index 8d79dc74..50cb1857 100644 --- a/clientconn.go +++ b/clientconn.go @@ -96,6 +96,7 @@ type setupReq struct { } type playReq struct { + ra *headers.Range res chan clientRes } @@ -131,6 +132,7 @@ type ClientConn struct { streamBaseURL *base.URL streamProtocol *StreamProtocol tracks map[int]clientConnTrack + lastRange *headers.Range backgroundRunning bool backgroundErr error tcpFrameBuffer *multibuffer.MultiBuffer // tcp @@ -267,7 +269,7 @@ outer: req.res <- clientRes{res: res, err: err} case req := <-cc.play: - res, err := cc.doPlay(false) + res, err := cc.doPlay(req.ra, false) req.res <- clientRes{res: res, err: err} case req := <-cc.record: @@ -389,7 +391,7 @@ func (cc *ClientConn) switchProtocolIfTimeout(err error) error { } } - _, err = cc.doPlay(true) + _, err = cc.doPlay(cc.lastRange, true) if err != nil { return err } @@ -1409,7 +1411,7 @@ func (cc *ClientConn) Setup( } } -func (cc *ClientConn) doPlay(isSwitchingProtocol bool) (*base.Response, error) { +func (cc *ClientConn) doPlay(ra *headers.Range, isSwitchingProtocol bool) (*base.Response, error) { err := cc.checkState(map[clientConnState]struct{}{ clientConnStatePrePlay: {}, }) @@ -1417,9 +1419,16 @@ func (cc *ClientConn) doPlay(isSwitchingProtocol bool) (*base.Response, error) { return nil, err } + header := make(base.Header) + + if ra != nil { + header["Range"] = ra.Write() + } + res, err := cc.do(&base.Request{ Method: base.Play, URL: cc.streamBaseURL, + Header: header, }, false) if err != nil { return nil, err @@ -1432,6 +1441,7 @@ func (cc *ClientConn) doPlay(isSwitchingProtocol bool) (*base.Response, error) { } cc.state = clientConnStatePlay + cc.lastRange = ra if !isSwitchingProtocol { // use a temporary callback that is replaces as soon as @@ -1455,10 +1465,10 @@ func (cc *ClientConn) doPlay(isSwitchingProtocol bool) (*base.Response, error) { // Play writes a PLAY request and reads a Response. // This can be called only after Setup(). -func (cc *ClientConn) Play() (*base.Response, error) { +func (cc *ClientConn) Play(ra *headers.Range) (*base.Response, error) { cres := make(chan clientRes) select { - case cc.play <- playReq{res: cres}: + case cc.play <- playReq{ra: ra, res: cres}: res := <-cres return res.res, res.err diff --git a/examples/client-read-partial/main.go b/examples/client-read-partial/main.go index 856ee0f9..3795ea4a 100644 --- a/examples/client-read-partial/main.go +++ b/examples/client-read-partial/main.go @@ -46,7 +46,7 @@ func main() { } // play setupped tracks - _, err = conn.Play() + _, err = conn.Play(nil) if err != nil { panic(err) } diff --git a/examples/client-read-pause/main.go b/examples/client-read-pause/main.go index e5bb96e5..d5427317 100644 --- a/examples/client-read-pause/main.go +++ b/examples/client-read-pause/main.go @@ -47,7 +47,7 @@ func main() { time.Sleep(5 * time.Second) // play again - _, err = conn.Play() + _, err = conn.Play(nil) if err != nil { panic(err) }