diff --git a/client_read_test.go b/client_read_test.go index 0194cf7c..07dcdbd7 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -2,6 +2,7 @@ package gortsplib import ( "bufio" + "bytes" "crypto/tls" "fmt" "net" @@ -444,6 +445,131 @@ func TestClientRead(t *testing.T) { } } +func TestClientReadNonStandardFrameSize(t *testing.T) { + refPayload := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4096/5) + + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) + + conn, err := l.Accept() + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + req, err := readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) + require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Describe, req.Method) + require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + + track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) + require.NoError(t, err) + + track.Media.Attributes = append(track.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=0", + }) + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, + }, + Body: Tracks{track}.Write(), + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) + require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), req.URL) + + th := headers.Transport{ + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + Protocol: headers.TransportProtocolTCP, + InterleavedIDs: &[2]int{0, 1}, + } + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + req, err = readRequest(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.Play, req.Method) + require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) + require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"]) + + err = base.Response{ + StatusCode: base.StatusOK, + }.Write(bconn.Writer) + require.NoError(t, err) + + err = base.InterleavedFrame{ + Channel: 0, + Payload: refPayload, + }.Write(bconn.Writer) + require.NoError(t, err) + }() + + c := &Client{ + ReadBufferSize: 4500, + Transport: func() *Transport { + v := TransportTCP + return &v + }(), + } + + conn, err := c.DialRead("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + frameRecv := make(chan struct{}) + done := make(chan struct{}) + go func() { + defer close(done) + conn.ReadFrames(func(id int, streamType StreamType, payload []byte) { + require.Equal(t, 0, id) + require.Equal(t, StreamTypeRTP, streamType) + require.Equal(t, refPayload, payload) + close(frameRecv) + }) + }() + + <-frameRecv + conn.Close() + <-done +} + func TestClientReadPartial(t *testing.T) { listenIP := multicastCapableIP(t) l, err := net.Listen("tcp", listenIP+":8554") diff --git a/server_publish_test.go b/server_publish_test.go index b5f70cea..c9f79b9d 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -2,6 +2,7 @@ package gortsplib import ( "bufio" + "bytes" "crypto/tls" "net" "strconv" @@ -842,6 +843,112 @@ func TestServerPublish(t *testing.T) { } } +func TestServerPublishNonStandardFrameSize(t *testing.T) { + payload := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4096/5) + frameReceived := make(chan struct{}) + + s := &Server{ + ReadBufferSize: 4500, + Handler: &testServerHandler{ + onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil, nil + }, + onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + onFrame: func(ctx *ServerHandlerOnFrameCtx) { + require.Equal(t, 0, ctx.TrackID) + require.Equal(t, StreamTypeRTP, ctx.StreamType) + require.Equal(t, payload, ctx.Payload) + close(frameReceived) + }, + }, + } + + err := s.Start("localhost:8554") + require.NoError(t, err) + defer s.Wait() + defer s.Close() + + nconn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer nconn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(nconn), bufio.NewWriter(nconn)) + + track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) + require.NoError(t, err) + + track.Media.Attributes = append(track.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=0", + }) + + res, err := writeReqReadRes(bconn, base.Request{ + Method: base.Announce, + URL: mustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Body: Tracks{track}.Write(), + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + inTH := &headers.Transport{ + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + Mode: func() *headers.TransportMode { + v := headers.TransportModeRecord + return &v + }(), + Protocol: headers.TransportProtocolTCP, + InterleavedIDs: &[2]int{0, 1}, + } + + res, err = writeReqReadRes(bconn, base.Request{ + Method: base.Setup, + URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Transport": inTH.Write(), + "Session": res.Header["Session"], + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + res, err = writeReqReadRes(bconn, base.Request{ + Method: base.Record, + URL: mustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + "Session": res.Header["Session"], + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + err = base.InterleavedFrame{ + Channel: 0, + Payload: payload, + }.Write(bconn.Writer) + require.NoError(t, err) + + <-frameReceived +} + func TestServerPublishErrorInvalidProtocol(t *testing.T) { s := &Server{ Handler: &testServerHandler{ diff --git a/server_read_test.go b/server_read_test.go index a45461ad..0f5870ca 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -2,6 +2,7 @@ package gortsplib import ( "bufio" + "bytes" "crypto/tls" "net" "strconv" @@ -569,6 +570,86 @@ func TestServerRead(t *testing.T) { } } +func TestServerReadNonStandardFrameSize(t *testing.T) { + track, err := NewTrackH264(96, &TrackConfigH264{[]byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}}) + require.NoError(t, err) + + stream := NewServerStream(Tracks{track}) + + payload := bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4096/5) + + s := &Server{ + Handler: &testServerHandler{ + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + go func() { + time.Sleep(1 * time.Second) + stream.WriteFrame(0, StreamTypeRTP, payload) + }() + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + } + + err = s.Start("localhost:8554") + require.NoError(t, err) + defer s.Wait() + defer s.Close() + + nconn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + bconn := bufio.NewReadWriter(bufio.NewReader(nconn), bufio.NewWriter(nconn)) + + inTH := &headers.Transport{ + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + Protocol: headers.TransportProtocolTCP, + InterleavedIDs: &[2]int{0, 1}, + } + + res, err := writeReqReadRes(bconn, base.Request{ + Method: base.Setup, + URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": inTH.Write(), + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + res, err = writeReqReadRes(bconn, base.Request{ + Method: base.Play, + URL: mustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Session": res.Header["Session"], + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var f base.InterleavedFrame + f.Payload = make([]byte, 4500) + err = f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, 0, f.Channel) + require.Equal(t, payload, f.Payload) +} + func TestServerReadTCPResponseBeforeFrames(t *testing.T) { writerDone := make(chan struct{}) writerTerminate := make(chan struct{})