diff --git a/client.go b/client.go index ff858a18..005b723e 100644 --- a/client.go +++ b/client.go @@ -980,7 +980,6 @@ func (c *Client) do(req *base.Request, skipResponse bool) (*base.Response, error if sx.Timeout != nil && *sx.Timeout > 0 { c.keepalivePeriod = time.Duration(float64(*sx.Timeout)*0.8) * time.Second - c.keepaliveTimer.Reset(c.keepalivePeriod) } } diff --git a/client_read_test.go b/client_read_test.go index 427c3fa4..9dadb8c3 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -2519,3 +2519,143 @@ func TestClientReadSeek(t *testing.T) { }) require.NoError(t, err) } + +func TestClientReadKeepaliveFromSession(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + + keepaliveOk := make(chan struct{}) + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := l.Accept() + require.NoError(t, err) + defer conn.Close() + br := bufio.NewReader(conn) + var bb bytes.Buffer + + req, err := readRequest(br) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }.Write(&bb) + _, err = conn.Write(bb.Bytes()) + require.NoError(t, err) + + req, err = readRequest(br) + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) + + track, err := NewTrackH264(96, &TrackConfigH264{ + []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}, + }) + require.NoError(t, err) + + tracks := cloneAndClearTracks(Tracks{track}) + + base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, + }, + Body: tracks.Write(false), + }.Write(&bb) + _, err = conn.Write(bb.Bytes()) + require.NoError(t, err) + + req, err = readRequest(br) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + var inTH headers.Transport + err = inTH.Read(req.Header["Transport"]) + require.NoError(t, err) + + base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": headers.Transport{ + Protocol: headers.TransportProtocolUDP, + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + ClientPorts: inTH.ClientPorts, + ServerPorts: &[2]int{34556, 34557}, + }.Write(), + "Session": headers.Session{ + Session: "ABCDE", + Timeout: func() *uint { + v := uint(1) + return &v + }(), + }.Write(), + }, + }.Write(&bb) + _, err = conn.Write(bb.Bytes()) + require.NoError(t, err) + + req, err = readRequest(br) + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + + base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Session": headers.Session{ + Session: "ABCDE", + Timeout: func() *uint { + v := uint(1) + return &v + }(), + }.Write(), + }, + }.Write(&bb) + _, err = conn.Write(bb.Bytes()) + require.NoError(t, err) + + recv := make(chan struct{}) + go func() { + defer close(recv) + req, err = readRequest(br) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + base.Response{ + StatusCode: base.StatusOK, + }.Write(&bb) + _, err = conn.Write(bb.Bytes()) + require.NoError(t, err) + }() + + select { + case <-recv: + case <-time.After(3 * time.Second): + t.Errorf("should not happen") + } + + close(keepaliveOk) + }() + + c := &Client{} + + err = c.StartReading("rtsp://localhost:8554/teststream") + require.NoError(t, err) + defer c.Close() + + <-keepaliveOk +}