diff --git a/clientconnread_test.go b/clientconnread_test.go index 813b5669..0be8d710 100644 --- a/clientconnread_test.go +++ b/clientconnread_test.go @@ -1164,3 +1164,164 @@ func TestClientReadRTCPReport(t *testing.T) { conn.Close() <-done } + +func TestClientReadErrorTimeout(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + "auto", + } { + t.Run(proto, func(t *testing.T) { + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := l.Accept() + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + var req base.Request + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: Tracks{track}.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + + var inTH headers.Transport + err = inTH.Read(req.Header["Transport"]) + require.NoError(t, err) + + th := headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + } + + var l1 net.PacketConn + if proto == "udp" || proto == "auto" { + var err error + l1, err = net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l1.Close() + + th.Protocol = StreamProtocolUDP + th.ServerPorts = &[2]int{34556, 34557} + th.ClientPorts = inTH.ClientPorts + + } else { + th.Protocol = StreamProtocolTCP + th.InterleavedIDs = inTH.InterleavedIDs + } + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + if proto == "udp" || proto == "auto" { + time.Sleep(500 * time.Millisecond) + + l1, err := net.ListenPacket("udp", "localhost:34556") + require.NoError(t, err) + defer l1.Close() + + // write a packet to skip the protocol autodetection feature + l1.WriteTo([]byte("\x01\x02\x03\x04"), &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ClientPorts[0], + }) + } + + err = req.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Teardown, req.Method) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + }() + + conf := ClientConf{ + StreamProtocol: func() *StreamProtocol { + switch proto { + case "udp": + v := StreamProtocolUDP + return &v + + case "tcp": + v := StreamProtocolTCP + return &v + } + return nil + }(), + InitialUDPReadTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, + } + + conn, err := conf.DialRead("rtsp://localhost:8554/teststream") + require.NoError(t, err) + defer conn.Close() + + err = <-conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + }) + + switch proto { + case "udp", "auto": + require.Equal(t, "UDP timeout", err.Error()) + + case "tcp": + require.True(t, strings.HasSuffix(err.Error(), "i/o timeout")) + } + }) + } +} diff --git a/serverconnpublish_test.go b/serverconnpublish_test.go index f35e30f6..df6294a6 100644 --- a/serverconnpublish_test.go +++ b/serverconnpublish_test.go @@ -4,6 +4,7 @@ import ( "bufio" "net" "strconv" + "strings" "sync/atomic" "testing" "time" @@ -688,6 +689,8 @@ func TestServerPublish(t *testing.T) { Port: th.ServerPorts[0], }) + time.Sleep(500 * time.Millisecond) + l2.WriteTo([]byte{0x05, 0x06, 0x07, 0x08}, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: th.ServerPorts[1], @@ -1036,3 +1039,157 @@ func TestServerPublishRTCPReport(t *testing.T) { }.Write(bconn.Writer) require.NoError(t, err) } + +func TestServerPublishErrorTimeout(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + errDone := make(chan struct{}) + + conf := ServerConf{ + ReadTimeout: 1 * time.Second, + } + + if proto == "udp" { + conf.UDPRTPAddress = "127.0.0.1:8000" + conf.UDPRTCPAddress = "127.0.0.1:8001" + } + + s, err := conf.Serve("127.0.0.1:8554") + require.NoError(t, err) + defer s.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := s.Accept() + require.NoError(t, err) + defer conn.Close() + + onAnnounce := func(ctx *ServerConnAnnounceCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onSetup := func(ctx *ServerConnSetupCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onRecord := func(ctx *ServerConnRecordCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + } + + onFrame := func(trackID int, typ StreamType, buf []byte) { + } + + err = <-conn.Read(ServerConnReadHandlers{ + OnAnnounce: onAnnounce, + OnSetup: onSetup, + OnRecord: onRecord, + OnFrame: onFrame, + }) + + if proto == "udp" { + require.Equal(t, "no UDP packets received (maybe there's a firewall/NAT in between)", err.Error()) + } else { + require.True(t, strings.HasSuffix(err.Error(), "i/o timeout")) + } + + close(errDone) + }() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) + require.NoError(t, err) + + tracks := Tracks{track} + for i, t := range tracks { + t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=" + strconv.FormatInt(int64(i), 10), + }) + } + + err = base.Request{ + Method: base.Announce, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: tracks.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + inTH := &headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModeRecord + return &v + }(), + } + + if proto == "udp" { + inTH.Protocol = StreamProtocolUDP + inTH.ClientPorts = &[2]int{35466, 35467} + } else { + inTH.Protocol = StreamProtocolTCP + inTH.InterleavedIDs = &[2]int{0, 1} + } + + err = base.Request{ + Method: base.Setup, + URL: base.MustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": inTH.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var th headers.Transport + err = th.Read(res.Header["Transport"]) + require.NoError(t, err) + + err = base.Request{ + Method: base.Record, + URL: base.MustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + <-errDone + }) + } +}