From f6a86b8789fc64ea78be7eb51ddaa4b39ab93cd3 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Wed, 19 Jan 2022 23:07:53 +0100 Subject: [PATCH] server: send session timeout to clients through the session header (https://github.com/aler9/rtsp-simple-server/issues/702) --- server.go | 10 +-- server_publish_test.go | 89 +++++++++++++++++------ server_read_test.go | 156 ++++++++++++++++++++++++++--------------- server_test.go | 28 ++++++-- serversession.go | 10 ++- 5 files changed, 201 insertions(+), 92 deletions(-) diff --git a/server.go b/server.go index 7c29e2f6..621a3002 100644 --- a/server.go +++ b/server.go @@ -134,9 +134,9 @@ type Server struct { // private // - udpReceiverReportPeriod time.Duration - closeSessionAfterNoRequestsFor time.Duration - checkStreamPeriod time.Duration + udpReceiverReportPeriod time.Duration + sessionTimeout time.Duration + checkStreamPeriod time.Duration ctx context.Context ctxCancel func() @@ -188,8 +188,8 @@ func (s *Server) Start() error { if s.udpReceiverReportPeriod == 0 { s.udpReceiverReportPeriod = 10 * time.Second } - if s.closeSessionAfterNoRequestsFor == 0 { - s.closeSessionAfterNoRequestsFor = 1 * 60 * time.Second + if s.sessionTimeout == 0 { + s.sessionTimeout = 1 * 60 * time.Second } if s.checkStreamPeriod == 0 { s.checkStreamPeriod = 1 * time.Second diff --git a/server_publish_test.go b/server_publish_test.go index 20eb40d5..a53b4e22 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -312,13 +312,17 @@ func TestServerPublishSetupPath(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL(ca.url), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -396,13 +400,17 @@ func TestServerPublishErrorSetupDifferentPaths(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/test2stream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -481,13 +489,17 @@ func TestServerPublishErrorSetupTrackTwice(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -499,7 +511,7 @@ func TestServerPublishErrorSetupTrackTwice(t *testing.T) { Header: base.Header{ "CSeq": base.HeaderValue{"3"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -588,13 +600,17 @@ func TestServerPublishErrorRecordPartialTracks(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -605,7 +621,7 @@ func TestServerPublishErrorRecordPartialTracks(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -756,13 +772,17 @@ func TestServerPublish(t *testing.T) { inTH.InterleavedIDs = &[2]int{0, 1} } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -777,7 +797,7 @@ func TestServerPublish(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -839,7 +859,7 @@ func TestServerPublish(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -929,13 +949,17 @@ func TestServerPublishNonStandardFrameSize(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -946,7 +970,7 @@ func TestServerPublishNonStandardFrameSize(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1037,13 +1061,17 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) { ClientPorts: &[2]int{35466, 35467}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1058,7 +1086,7 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1139,6 +1167,10 @@ func TestServerPublishRTCPReport(t *testing.T) { require.NoError(t, err) defer l2.Close() + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), @@ -1156,7 +1188,7 @@ func TestServerPublishRTCPReport(t *testing.T) { Protocol: headers.TransportProtocolUDP, ClientPorts: &[2]int{34556, 34557}, }.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1171,7 +1203,7 @@ func TestServerPublishRTCPReport(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1328,13 +1360,17 @@ func TestServerPublishTimeout(t *testing.T) { inTH.InterleavedIDs = &[2]int{0, 1} } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1349,7 +1385,7 @@ func TestServerPublishTimeout(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1458,13 +1494,17 @@ func TestServerPublishWithoutTeardown(t *testing.T) { inTH.InterleavedIDs = &[2]int{0, 1} } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1479,7 +1519,7 @@ func TestServerPublishWithoutTeardown(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1574,13 +1614,17 @@ func TestServerPublishUDPChangeConn(t *testing.T) { ClientPorts: &[2]int{35466, 35467}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1591,12 +1635,13 @@ func TestServerPublishUDPChangeConn(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - sxID = res.Header["Session"][0] + + sxID = sx.Session }() func() { diff --git a/server_read_test.go b/server_read_test.go index c8e7b2f1..acad7c0d 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -200,52 +200,43 @@ func TestServerReadSetupErrors(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + res, err := writeReqReadRes(conn, br, base.Request{ + Method: base.Setup, + URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": th.Write(), + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + th.InterleavedIDs = &[2]int{2, 3} + + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + if ca == "different paths" { - res, err := writeReqReadRes(conn, br, base.Request{ - Method: base.Setup, - URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Transport": th.Write(), - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - th.InterleavedIDs = &[2]int{2, 3} - res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/test12stream/trackID=1"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) require.Equal(t, base.StatusBadRequest, res.StatusCode) } else { - res, err := writeReqReadRes(conn, br, base.Request{ - Method: base.Setup, - URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Transport": th.Write(), - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - th.InterleavedIDs = &[2]int{2, 3} - res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": th.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -461,12 +452,16 @@ func TestServerRead(t *testing.T) { <-sessionOpened + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -538,7 +533,7 @@ func TestServerRead(t *testing.T) { URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"4"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -550,7 +545,7 @@ func TestServerRead(t *testing.T) { URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"5"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -562,7 +557,7 @@ func TestServerRead(t *testing.T) { URL: mustParseURL("rtsp://" + listenIP + ":8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"6"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -689,12 +684,16 @@ func TestServerReadNonStandardFrameSize(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -790,12 +789,16 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -864,12 +867,16 @@ func TestServerReadPlayPlay(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -880,7 +887,7 @@ func TestServerReadPlayPlay(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -974,12 +981,16 @@ func TestServerReadPlayPausePlay(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -990,7 +1001,7 @@ func TestServerReadPlayPausePlay(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1001,7 +1012,7 @@ func TestServerReadPlayPausePlay(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1092,12 +1103,16 @@ func TestServerReadPlayPausePause(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1108,7 +1123,7 @@ func TestServerReadPlayPausePause(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }.Write(&bb) _, err = conn.Write(bb.Bytes()) @@ -1123,7 +1138,7 @@ func TestServerReadPlayPausePause(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }.Write(&bb) _, err = conn.Write(bb.Bytes()) @@ -1170,11 +1185,11 @@ func TestServerReadTimeout(t *testing.T) { }, nil }, }, - ReadTimeout: 1 * time.Second, - closeSessionAfterNoRequestsFor: 1 * time.Second, - UDPRTPAddress: "127.0.0.1:8000", - UDPRTCPAddress: "127.0.0.1:8001", - RTSPAddress: "localhost:8554", + ReadTimeout: 1 * time.Second, + sessionTimeout: 1 * time.Second, + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + RTSPAddress: "localhost:8554", } err = s.Start() @@ -1211,12 +1226,16 @@ func TestServerReadTimeout(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1267,9 +1286,9 @@ func TestServerReadWithoutTeardown(t *testing.T) { }, nil }, }, - ReadTimeout: 1 * time.Second, - closeSessionAfterNoRequestsFor: 1 * time.Second, - RTSPAddress: "localhost:8554", + ReadTimeout: 1 * time.Second, + sessionTimeout: 1 * time.Second, + RTSPAddress: "localhost:8554", } if transport == "udp" { @@ -1316,12 +1335,16 @@ func TestServerReadWithoutTeardown(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1402,17 +1425,22 @@ func TestServerReadUDPChangeConn(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - sxID = res.Header["Session"][0] + + sxID = sx.Session }() func() { @@ -1502,12 +1530,16 @@ func TestServerReadPartialTracks(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1572,13 +1604,17 @@ func TestServerReadAdditionalInfos(t *testing.T) { InterleavedIDs: &[2]int{2, 3}, } + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=1"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, "Transport": inTH.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1594,7 +1630,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"3"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1788,12 +1824,16 @@ func TestServerReadErrorUDPSamePorts(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) diff --git a/server_test.go b/server_test.go index 164f9b3e..49c31f99 100644 --- a/server_test.go +++ b/server_test.go @@ -749,12 +749,16 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn1, br1, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -782,7 +786,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { }(), InterleavedIDs: &[2]int{0, 1}, }.Write(), - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -849,12 +853,16 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + res, err = writeReqReadRes(conn, br, base.Request{ Method: base.Play, URL: mustParseURL("rtsp://localhost:8554/teststream"), Header: base.Header{ "CSeq": base.HeaderValue{"2"}, - "Session": res.Header["Session"], + "Session": base.HeaderValue{sx.Session}, }, }) require.NoError(t, err) @@ -1193,7 +1201,12 @@ func TestServerErrorInvalidPath(t *testing.T) { }) require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - sxID = res.Header["Session"][0] + + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + + sxID = sx.Session } if method == base.Play || method == base.Record || method == base.Pause { @@ -1223,7 +1236,12 @@ func TestServerErrorInvalidPath(t *testing.T) { }) require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - sxID = res.Header["Session"][0] + + var sx headers.Session + err = sx.Read(res.Header["Session"]) + require.NoError(t, err) + + sxID = sx.Session } if method == base.Pause { diff --git a/serversession.go b/serversession.go index e2f97805..7244a550 100644 --- a/serversession.go +++ b/serversession.go @@ -283,7 +283,13 @@ func (ss *ServerSession) run() { if res.Header == nil { res.Header = make(base.Header) } - res.Header["Session"] = base.HeaderValue{ss.secretID} + res.Header["Session"] = headers.Session{ + Session: ss.secretID, + Timeout: func() *uint { + v := uint(ss.s.sessionTimeout / time.Second) + return &v + }(), + }.Write() } if _, ok := err.(liberrors.ErrServerSessionTeardown); ok { @@ -337,7 +343,7 @@ func (ss *ServerSession) run() { } // in case of PLAY and UDP, timeout happens when no RTSP request arrives - } else if now.Sub(ss.lastRequestTime) >= ss.s.closeSessionAfterNoRequestsFor { + } else if now.Sub(ss.lastRequestTime) >= ss.s.sessionTimeout { return liberrors.ErrServerNoRTSPRequestsInAWhile{} }