From f74be9a72f33ecad766ad11d25aeb2823bb2e2e3 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 4 May 2021 16:42:56 +0200 Subject: [PATCH] server: add WithoutTeardown tests --- server_publish_test.go | 138 ++++++++++++++++++++++++++++++++++++++++- server_read_test.go | 104 ++++++++++++++++++++++++++++++- 2 files changed, 240 insertions(+), 2 deletions(-) diff --git a/server_publish_test.go b/server_publish_test.go index f1370846..ef6cfcba 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -1000,7 +1000,7 @@ func TestServerPublishRTCPReport(t *testing.T) { require.NoError(t, err) } -func TestServerPublishErrorTimeout(t *testing.T) { +func TestServerPublishTimeout(t *testing.T) { for _, proto := range []string{ "udp", "tcp", @@ -1137,3 +1137,139 @@ func TestServerPublishErrorTimeout(t *testing.T) { }) } } + +func TestServerPublishWithoutTeardown(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + connClosed := make(chan struct{}) + sessionClosed := make(chan struct{}) + + s := &Server{ + Handler: &testServerHandler{ + onConnClose: func(sc *ServerConn, err error) { + close(connClosed) + }, + onSessionClose: func(ss *ServerSession, err error) { + close(sessionClosed) + }, + onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + ReadTimeout: 1 * time.Second, + } + + if proto == "udp" { + s.UDPRTPAddress = "127.0.0.1:8000" + s.UDPRTCPAddress = "127.0.0.1:8001" + } + + err := s.Start("127.0.0.1:8554") + require.NoError(t, err) + defer s.Close() + + nconn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + bconn := bufio.NewReadWriter(bufio.NewReader(nconn), bufio.NewWriter(nconn)) + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + tracks := Tracks{track} + for i, t := range tracks { + t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=" + strconv.FormatInt(int64(i), 10), + }) + } + + err = base.Request{ + Method: base.Announce, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: tracks.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + inTH := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModeRecord + return &v + }(), + } + + if proto == "udp" { + inTH.Protocol = StreamProtocolUDP + inTH.ClientPorts = &[2]int{35466, 35467} + } else { + inTH.Protocol = StreamProtocolTCP + inTH.InterleavedIDs = &[2]int{0, 1} + } + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": inTH.Write(), + "Session": res.Header["Session"], + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var th headers.Transport + err = th.Read(res.Header["Transport"]) + require.NoError(t, err) + + err = base.Request{ + Method: base.Record, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + "Session": res.Header["Session"], + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + nconn.Close() + + <-sessionClosed + <-connClosed + }) + } +} diff --git a/server_read_test.go b/server_read_test.go index 6c1ae06c..9848ab9c 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -905,7 +905,7 @@ func TestServerReadPlayPausePause(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) } -func TestServerReadErrorTimeout(t *testing.T) { +func TestServerReadTimeout(t *testing.T) { for _, proto := range []string{ "udp", // checking TCP is useless, since there's no timeout when reading with TCP @@ -961,6 +961,105 @@ func TestServerReadErrorTimeout(t *testing.T) { }(), } + inTH.Protocol = StreamProtocolUDP + inTH.ClientPorts = &[2]int{35466, 35467} + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": inTH.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.Request{ + Method: base.Play, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Session": res.Header["Session"], + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + <-sessionClosed + }) + } +} + +func TestServerReadWithoutTeardown(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + connClosed := make(chan struct{}) + sessionClosed := make(chan struct{}) + + s := &Server{ + Handler: &testServerHandler{ + onConnClose: func(sc *ServerConn, err error) { + close(connClosed) + }, + onSessionClose: func(ss *ServerSession, err error) { + close(sessionClosed) + }, + onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + ReadTimeout: 1 * time.Second, + closeSessionAfterNoRequestsFor: 1 * time.Second, + } + + if proto == "udp" { + s.UDPRTPAddress = "127.0.0.1:8000" + s.UDPRTCPAddress = "127.0.0.1:8001" + } + + err := s.Start("127.0.0.1:8554") + require.NoError(t, err) + defer s.Close() + + nconn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer nconn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(nconn), bufio.NewWriter(nconn)) + + inTH := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + } + if proto == "udp" { inTH.Protocol = StreamProtocolUDP inTH.ClientPorts = &[2]int{35466, 35467} @@ -998,7 +1097,10 @@ func TestServerReadErrorTimeout(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + nconn.Close() + <-sessionClosed + <-connClosed }) } }