diff --git a/client_read_test.go b/client_read_test.go index 353be336..ec4e2552 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -2,7 +2,6 @@ package gortsplib import ( "bufio" - "bytes" "crypto/tls" "fmt" "net" @@ -22,22 +21,6 @@ import ( "github.com/aler9/gortsplib/pkg/url" ) -func mergeBytes(vals ...[]byte) []byte { - size := 0 - for _, v := range vals { - size += len(v) - } - res := make([]byte, size) - - pos := 0 - for _, v := range vals { - n := copy(res[pos:], v) - pos += n - } - - return res -} - func TestClientReadTracks(t *testing.T) { track1, err := NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}, nil) require.NoError(t, err) @@ -443,183 +426,6 @@ func TestClientRead(t *testing.T) { } } -var oversizedPacketRTPIn = rtp.Packet{ - Header: rtp.Header{ - Version: 2, - PayloadType: 96, - Marker: true, - SequenceNumber: 34572, - }, - Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4096/5), -} - -var oversizedPacketsRTPOut = []rtp.Packet{ - { - Header: rtp.Header{ - Version: 2, - PayloadType: 96, - Marker: false, - SequenceNumber: 34572, - }, - Payload: mergeBytes( - []byte{0x1c, 0x81, 0x02, 0x03, 0x04, 0x05}, - bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 290), - []byte{0x01, 0x02, 0x03, 0x04}, - ), - }, - { - Header: rtp.Header{ - Version: 2, - PayloadType: 96, - Marker: false, - SequenceNumber: 34573, - }, - Payload: mergeBytes( - []byte{0x1c, 0x01, 0x05}, - bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 291), - []byte{0x01, 0x02}, - ), - }, - { - Header: rtp.Header{ - Version: 2, - PayloadType: 96, - Marker: true, - SequenceNumber: 34574, - }, - Payload: mergeBytes( - []byte{0x1c, 0x41, 0x03, 0x04, 0x05}, - bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 235), - ), - }, -} - -func TestClientReadOversizedPacket(t *testing.T) { - oversizedPacketsRTPOut := append([]rtp.Packet(nil), oversizedPacketsRTPOut...) - - l, err := net.Listen("tcp", "localhost:8554") - require.NoError(t, err) - defer l.Close() - - serverDone := make(chan struct{}) - defer func() { <-serverDone }() - go func() { - defer close(serverDone) - - conn, err := l.Accept() - require.NoError(t, err) - defer conn.Close() - br := bufio.NewReader(conn) - - req, err := readRequest(br) - require.NoError(t, err) - require.Equal(t, base.Options, req.Method) - require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) - - byts, _ := base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.Describe), - string(base.Setup), - string(base.Play), - }, ", ")}, - }, - }.Write() - _, err = conn.Write(byts) - require.NoError(t, err) - - req, err = readRequest(br) - require.NoError(t, err) - require.Equal(t, base.Describe, req.Method) - require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) - - track, err := NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}, nil) - require.NoError(t, err) - - tracks := Tracks{track} - tracks.setControls() - - byts, _ = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, - }, - Body: tracks.Write(false), - }.Write() - _, err = conn.Write(byts) - require.NoError(t, err) - - req, err = readRequest(br) - require.NoError(t, err) - require.Equal(t, base.Setup, req.Method) - require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), req.URL) - - th := headers.Transport{ - Delivery: func() *headers.TransportDelivery { - v := headers.TransportDeliveryUnicast - return &v - }(), - Protocol: headers.TransportProtocolTCP, - InterleavedIDs: &[2]int{0, 1}, - } - - byts, _ = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": th.Write(), - }, - }.Write() - _, err = conn.Write(byts) - require.NoError(t, err) - - req, err = readRequest(br) - require.NoError(t, err) - require.Equal(t, base.Play, req.Method) - require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) - require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"]) - - byts, _ = base.Response{ - StatusCode: base.StatusOK, - }.Write() - _, err = conn.Write(byts) - require.NoError(t, err) - - byts, _ = oversizedPacketRTPIn.Marshal() - byts, _ = base.InterleavedFrame{ - Channel: 0, - Payload: byts, - }.Write() - _, err = conn.Write(byts) - require.NoError(t, err) - }() - - packetRecv := make(chan struct{}) - - c := &Client{ - Transport: func() *Transport { - v := TransportTCP - return &v - }(), - OnPacketRTP: func(ctx *ClientOnPacketRTPCtx) { - require.Equal(t, 0, ctx.TrackID) - cmp := oversizedPacketsRTPOut[0] - oversizedPacketsRTPOut = oversizedPacketsRTPOut[1:] - require.Equal(t, &cmp, ctx.Packet) - if len(oversizedPacketsRTPOut) == 0 { - close(packetRecv) - } - }, - } - - err = c.StartReading("rtsp://localhost:8554/teststream") - require.NoError(t, err) - defer c.Close() - - <-packetRecv -} - func TestClientReadPartial(t *testing.T) { listenIP := multicastCapableIP(t) l, err := net.Listen("tcp", listenIP+":8554") diff --git a/pkg/rtpproc/processor_test.go b/pkg/rtpproc/processor_test.go new file mode 100644 index 00000000..57b0f2ca --- /dev/null +++ b/pkg/rtpproc/processor_test.go @@ -0,0 +1,128 @@ +package rtpproc + +import ( + "bytes" + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestProcessRemovePadding(t *testing.T) { + proc := NewProcessor(false, false) + + out, err := proc.Process(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: true, + SequenceNumber: 34572, + Padding: true, + }, + Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 64/4), + PaddingSize: 64, + }) + require.NoError(t, err) + require.Equal(t, []*ProcessorOutput{{ + Packet: &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: true, + SequenceNumber: 34572, + }, + Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 64/4), + }, + PTSEqualsDTS: true, + }}, out) +} + +func TestProcessH264Oversized(t *testing.T) { + proc := NewProcessor(true, true) + + out, err := proc.Process(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: false, + SequenceNumber: 34572, + }, + Payload: append( + []byte{0x1C, 1<<7 | 0x05}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 2050/5)..., + ), + }) + require.NoError(t, err) + require.Equal(t, []*ProcessorOutput(nil), out) + + out, err = proc.Process(&rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: true, + SequenceNumber: 34573, + }, + Payload: append( + []byte{0x1C, 1 << 6}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 2050/5)..., + ), + }) + require.NoError(t, err) + require.Equal(t, []*ProcessorOutput{ + { + Packet: &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: false, + SequenceNumber: 34572, + }, + Payload: append( + append( + []byte{0x1c, 0x85}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 291)..., + ), + []byte{0x01, 0x02, 0x03}..., + ), + }, + }, + { + Packet: &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: false, + SequenceNumber: 34573, + }, + Payload: append( + append( + []byte{0x1c, 0x05, 0x04, 0x05}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 291)..., + ), + []byte{0x01}..., + ), + }, + }, + { + Packet: &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + Marker: true, + SequenceNumber: 34574, + }, + Payload: append( + []byte{0x1c, 0x45, 0x02, 0x03, 0x04, 0x05}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 236)..., + ), + }, + H264NALUs: [][]byte{ + append( + []byte{0x05}, + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04, 0x05}, 4100/5)..., + ), + }, + PTSEqualsDTS: true, + }, + }, out) +} diff --git a/server_publish_test.go b/server_publish_test.go index b361dcc2..f3d38045 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -811,117 +811,6 @@ func TestServerPublish(t *testing.T) { } } -func TestServerPublishOversizedPacket(t *testing.T) { - oversizedPacketsRTPOut := append([]rtp.Packet(nil), oversizedPacketsRTPOut...) - packetRecv := make(chan struct{}) - - s := &Server{ - RTSPAddress: "localhost:8554", - Handler: &testServerHandler{ - onAnnounce: func(ctx *ServerHandlerOnAnnounceCtx) (*base.Response, error) { - return &base.Response{ - StatusCode: base.StatusOK, - }, nil - }, - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { - return &base.Response{ - StatusCode: base.StatusOK, - }, nil, nil - }, - onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) { - return &base.Response{ - StatusCode: base.StatusOK, - }, nil - }, - onPacketRTP: func(ctx *ServerHandlerOnPacketRTPCtx) { - require.Equal(t, 0, ctx.TrackID) - cmp := oversizedPacketsRTPOut[0] - oversizedPacketsRTPOut = oversizedPacketsRTPOut[1:] - require.Equal(t, &cmp, ctx.Packet) - if len(oversizedPacketsRTPOut) == 0 { - close(packetRecv) - } - }, - }, - } - - err := s.Start() - require.NoError(t, err) - defer s.Close() - - conn, err := net.Dial("tcp", "localhost:8554") - require.NoError(t, err) - defer conn.Close() - br := bufio.NewReader(conn) - - track, err := NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}, nil) - require.NoError(t, err) - - tracks := Tracks{track} - tracks.setControls() - - res, err := writeReqReadRes(conn, br, base.Request{ - Method: base.Announce, - URL: mustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Body: tracks.Write(false), - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - inTH := &headers.Transport{ - Delivery: func() *headers.TransportDelivery { - v := headers.TransportDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModeRecord - return &v - }(), - Protocol: headers.TransportProtocolTCP, - InterleavedIDs: &[2]int{0, 1}, - } - - res, err = writeReqReadRes(conn, br, base.Request{ - Method: base.Setup, - URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - "Transport": inTH.Write(), - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - var sx headers.Session - err = sx.Read(res.Header["Session"]) - require.NoError(t, err) - - res, err = writeReqReadRes(conn, br, base.Request{ - Method: base.Record, - URL: mustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"3"}, - "Session": base.HeaderValue{sx.Session}, - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - byts, _ := oversizedPacketRTPIn.Marshal() - byts, _ = base.InterleavedFrame{ - Channel: 0, - Payload: byts, - }.Write() - _, err = conn.Write(byts) - require.NoError(t, err) - - <-packetRecv -} - func TestServerPublishErrorInvalidProtocol(t *testing.T) { s := &Server{ Handler: &testServerHandler{