diff --git a/client.go b/client.go index 192cf9a6..576e8d5e 100644 --- a/client.go +++ b/client.go @@ -1883,6 +1883,7 @@ func (c *Client) runWriter() { if isRTP { f := rtpFrames[trackID] f.Payload = payload + buf.Reset() f.Write(&buf) c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) @@ -1890,6 +1891,7 @@ func (c *Client) runWriter() { } else { f := rtcpFrames[trackID] f.Payload = payload + buf.Reset() f.Write(&buf) c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) diff --git a/client_publish_test.go b/client_publish_test.go index 388da87d..9c6fe0a8 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -90,6 +90,7 @@ func TestClientPublishSerial(t *testing.T) { require.Equal(t, base.Options, req.Method) require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -108,6 +109,7 @@ func TestClientPublishSerial(t *testing.T) { require.Equal(t, base.Announce, req.Method) require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -151,6 +153,7 @@ func TestClientPublishSerial(t *testing.T) { th.InterleavedIDs = inTH.InterleavedIDs } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -165,6 +168,7 @@ func TestClientPublishSerial(t *testing.T) { require.Equal(t, base.Record, req.Method) require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -198,6 +202,7 @@ func TestClientPublishSerial(t *testing.T) { Port: th.ClientPorts[1], }) } else { + bb.Reset() base.InterleavedFrame{ Channel: 1, Payload: testRTCPPacketMarshaled, @@ -211,6 +216,7 @@ func TestClientPublishSerial(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -303,6 +309,7 @@ func TestClientPublishParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -320,6 +327,7 @@ func TestClientPublishParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Announce, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -350,6 +358,7 @@ func TestClientPublishParallel(t *testing.T) { th.InterleavedIDs = inTH.InterleavedIDs } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -363,6 +372,7 @@ func TestClientPublishParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Record, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -373,6 +383,7 @@ func TestClientPublishParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -449,6 +460,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -467,6 +479,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Announce, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -497,6 +510,7 @@ func TestClientPublishPauseSerial(t *testing.T) { th.InterleavedIDs = inTH.InterleavedIDs } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -510,6 +524,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Record, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -520,6 +535,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Pause, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -530,6 +546,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Record, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -540,6 +557,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -606,6 +624,7 @@ func TestClientPublishPauseParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -624,6 +643,7 @@ func TestClientPublishPauseParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Announce, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -654,6 +674,7 @@ func TestClientPublishPauseParallel(t *testing.T) { th.InterleavedIDs = inTH.InterleavedIDs } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -667,6 +688,7 @@ func TestClientPublishPauseParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Record, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -677,6 +699,7 @@ func TestClientPublishPauseParallel(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Pause, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -749,6 +772,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { require.Equal(t, base.Options, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -767,6 +791,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { require.Equal(t, base.Announce, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -777,6 +802,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Setup, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusUnsupportedTransport, }.Write(&bb) @@ -801,6 +827,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -815,6 +842,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { require.Equal(t, base.Record, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -834,6 +862,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -877,6 +906,7 @@ func TestClientPublishRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -894,6 +924,7 @@ func TestClientPublishRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Announce, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -916,6 +947,7 @@ func TestClientPublishRTCPReport(t *testing.T) { require.NoError(t, err) defer l2.Close() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -937,6 +969,7 @@ func TestClientPublishRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Record, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -971,6 +1004,7 @@ func TestClientPublishRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1026,6 +1060,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1043,6 +1078,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Announce, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1066,6 +1102,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { InterleavedIDs: inTH.InterleavedIDs, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1079,12 +1116,14 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Record, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -1092,6 +1131,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 1, Payload: testRTCPPacketMarshaled, @@ -1103,6 +1143,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) diff --git a/client_read_test.go b/client_read_test.go index 7403539d..efc1d4b2 100644 --- a/client_read_test.go +++ b/client_read_test.go @@ -66,6 +66,7 @@ func TestClientReadTracks(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -87,6 +88,7 @@ func TestClientReadTracks(t *testing.T) { tracks := Tracks{track1, track2, track3} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -118,6 +120,7 @@ func TestClientReadTracks(t *testing.T) { ServerPorts: &[2]int{34556 + i*2, 34557 + i*2}, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -133,6 +136,7 @@ func TestClientReadTracks(t *testing.T) { require.Equal(t, base.Play, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -144,6 +148,7 @@ func TestClientReadTracks(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -203,6 +208,7 @@ func TestClientRead(t *testing.T) { require.Equal(t, base.Options, req.Method) require.Equal(t, mustParseURL(scheme+"://"+listenIP+":8554/test/stream?param=value"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -227,6 +233,7 @@ func TestClientRead(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -311,6 +318,7 @@ func TestClientRead(t *testing.T) { th.InterleavedIDs = &[2]int{0, 1} } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -326,6 +334,7 @@ func TestClientRead(t *testing.T) { require.Equal(t, mustParseURL(scheme+"://"+listenIP+":8554/test/stream?param=value/"), req.URL) require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"]) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -349,6 +358,7 @@ func TestClientRead(t *testing.T) { }) case "tcp", "tls": + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -389,6 +399,7 @@ func TestClientRead(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL(scheme+"://"+listenIP+":8554/test/stream?param=value/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -518,6 +529,7 @@ func TestClientReadOversizedPacket(t *testing.T) { require.Equal(t, base.Options, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -542,6 +554,7 @@ func TestClientReadOversizedPacket(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -567,6 +580,7 @@ func TestClientReadOversizedPacket(t *testing.T) { InterleavedIDs: &[2]int{0, 1}, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -582,6 +596,7 @@ func TestClientReadOversizedPacket(t *testing.T) { require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"]) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -589,6 +604,7 @@ func TestClientReadOversizedPacket(t *testing.T) { require.NoError(t, err) byts, _ := oversizedPacketRTPIn.Marshal() + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: byts, @@ -643,6 +659,7 @@ func TestClientReadPartial(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -670,6 +687,7 @@ func TestClientReadPartial(t *testing.T) { tracks := Tracks{track1, track2} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -700,6 +718,7 @@ func TestClientReadPartial(t *testing.T) { InterleavedIDs: inTH.InterleavedIDs, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -714,12 +733,14 @@ func TestClientReadPartial(t *testing.T) { require.Equal(t, base.Play, req.Method) require.Equal(t, mustParseURL("rtsp://"+listenIP+":8554/teststream/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -732,6 +753,7 @@ func TestClientReadPartial(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL("rtsp://"+listenIP+":8554/teststream/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -792,6 +814,7 @@ func TestClientReadNoContentBase(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -816,6 +839,7 @@ func TestClientReadNoContentBase(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -845,6 +869,7 @@ func TestClientReadNoContentBase(t *testing.T) { ServerPorts: &[2]int{34556, 34557}, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -859,6 +884,7 @@ func TestClientReadNoContentBase(t *testing.T) { require.Equal(t, base.Play, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -870,6 +896,7 @@ func TestClientReadNoContentBase(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -913,6 +940,7 @@ func TestClientReadAnyPort(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -936,6 +964,7 @@ func TestClientReadAnyPort(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -963,6 +992,7 @@ func TestClientReadAnyPort(t *testing.T) { require.NoError(t, err) defer l1b.Close() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -998,6 +1028,7 @@ func TestClientReadAnyPort(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1073,6 +1104,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1096,6 +1128,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1111,6 +1144,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Setup, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusUnsupportedTransport, }.Write(&bb) @@ -1126,6 +1160,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, headers.TransportProtocolTCP, inTH.Protocol) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1146,12 +1181,14 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -1194,6 +1231,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1213,6 +1251,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { v := auth.NewValidator("myuser", "mypass", nil) + bb.Reset() base.Response{ StatusCode: base.StatusUnauthorized, Header: base.Header{ @@ -1235,6 +1274,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1265,6 +1305,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { ClientPorts: inTH.ClientPorts, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1278,6 +1319,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1288,6 +1330,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1304,6 +1347,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1321,6 +1365,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Describe, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1338,6 +1383,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { v = auth.NewValidator("myuser", "mypass", nil) + bb.Reset() base.Response{ StatusCode: base.StatusUnauthorized, Header: base.Header{ @@ -1368,6 +1414,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { InterleavedIDs: inTH.InterleavedIDs, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1381,12 +1428,14 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -1398,6 +1447,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1444,6 +1494,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1468,6 +1519,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { tracks := Tracks{track1} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1497,6 +1549,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { InterleavedIDs: &[2]int{2, 3}, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1511,12 +1564,14 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { require.Equal(t, base.Play, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 2, Payload: testRTPPacketMarshaled, @@ -1529,6 +1584,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1575,6 +1631,7 @@ func TestClientReadRedirect(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1592,6 +1649,7 @@ func TestClientReadRedirect(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Describe, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusMovedPermanently, Header: base.Header{ @@ -1612,6 +1670,7 @@ func TestClientReadRedirect(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1635,6 +1694,7 @@ func TestClientReadRedirect(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1654,6 +1714,7 @@ func TestClientReadRedirect(t *testing.T) { err = th.Read(req.Header["Transport"]) require.NoError(t, err) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1675,6 +1736,7 @@ func TestClientReadRedirect(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1737,6 +1799,7 @@ func TestClientReadPause(t *testing.T) { Port: inTH.ClientPorts[0], }) } else { + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -1777,6 +1840,7 @@ func TestClientReadPause(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1800,6 +1864,7 @@ func TestClientReadPause(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1835,6 +1900,7 @@ func TestClientReadPause(t *testing.T) { th.InterleavedIDs = inTH.InterleavedIDs } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1848,6 +1914,7 @@ func TestClientReadPause(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1863,6 +1930,7 @@ func TestClientReadPause(t *testing.T) { close(writerTerminate) <-writerDone + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1873,6 +1941,7 @@ func TestClientReadPause(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1888,6 +1957,7 @@ func TestClientReadPause(t *testing.T) { close(writerTerminate) <-writerDone + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -1956,6 +2026,7 @@ func TestClientReadRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -1980,6 +2051,7 @@ func TestClientReadRTCPReport(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2007,6 +2079,7 @@ func TestClientReadRTCPReport(t *testing.T) { require.NoError(t, err) defer l2.Close() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2028,6 +2101,7 @@ func TestClientReadRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2097,6 +2171,7 @@ func TestClientReadRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2141,6 +2216,7 @@ func TestClientReadErrorTimeout(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2164,6 +2240,7 @@ func TestClientReadErrorTimeout(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2205,6 +2282,7 @@ func TestClientReadErrorTimeout(t *testing.T) { th.InterleavedIDs = inTH.InterleavedIDs } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2218,6 +2296,7 @@ func TestClientReadErrorTimeout(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2236,6 +2315,7 @@ func TestClientReadErrorTimeout(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2296,6 +2376,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2319,6 +2400,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2347,6 +2429,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { th.Protocol = headers.TransportProtocolTCP th.InterleavedIDs = inTH.InterleavedIDs + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2360,12 +2443,14 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 6, Payload: testRTPPacketMarshaled, @@ -2373,6 +2458,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -2384,6 +2470,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2430,6 +2517,7 @@ func TestClientReadSeek(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2453,6 +2541,7 @@ func TestClientReadSeek(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2481,6 +2570,7 @@ func TestClientReadSeek(t *testing.T) { InterleavedIDs: inTH.InterleavedIDs, } + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2503,6 +2593,7 @@ func TestClientReadSeek(t *testing.T) { }, }, ra) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2513,6 +2604,7 @@ func TestClientReadSeek(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Pause, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2531,6 +2623,7 @@ func TestClientReadSeek(t *testing.T) { }, }, ra) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2541,6 +2634,7 @@ func TestClientReadSeek(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2607,6 +2701,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2630,6 +2725,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2649,6 +2745,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) { err = inTH.Read(req.Header["Transport"]) require.NoError(t, err) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2677,6 +2774,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Play, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2690,6 +2788,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2738,6 +2837,7 @@ func TestClientReadDifferentSource(t *testing.T) { require.Equal(t, base.Options, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/test/stream?param=value"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2762,6 +2862,7 @@ func TestClientReadDifferentSource(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2804,6 +2905,7 @@ func TestClientReadDifferentSource(t *testing.T) { require.NoError(t, err) defer l2.Close() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -2819,6 +2921,7 @@ func TestClientReadDifferentSource(t *testing.T) { require.Equal(t, mustParseURL("rtsp://localhost:8554/test/stream?param=value/"), req.URL) require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"]) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) @@ -2837,6 +2940,7 @@ func TestClientReadDifferentSource(t *testing.T) { require.Equal(t, base.Teardown, req.Method) require.Equal(t, mustParseURL("rtsp://localhost:8554/test/stream?param=value/"), req.URL) + bb.Reset() base.Response{ StatusCode: base.StatusOK, }.Write(&bb) diff --git a/client_test.go b/client_test.go index 9c4403bf..2cfa8950 100644 --- a/client_test.go +++ b/client_test.go @@ -101,6 +101,7 @@ func TestClientSession(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -125,6 +126,7 @@ func TestClientSession(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -170,6 +172,7 @@ func TestClientAuth(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -187,6 +190,7 @@ func TestClientAuth(t *testing.T) { v := auth.NewValidator("myuser", "mypass", nil) + bb.Reset() base.Response{ StatusCode: base.StatusUnauthorized, Header: base.Header{ @@ -209,6 +213,7 @@ func TestClientAuth(t *testing.T) { tracks := Tracks{track} tracks.setControls() + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -253,6 +258,7 @@ func TestClientDescribeCharset(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Options, req.Method) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ @@ -272,6 +278,7 @@ func TestClientDescribeCharset(t *testing.T) { track1, err := NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}, nil) require.NoError(t, err) + bb.Reset() base.Response{ StatusCode: base.StatusOK, Header: base.Header{ diff --git a/pkg/base/body.go b/pkg/base/body.go index 33a39bd6..f91fc44d 100644 --- a/pkg/base/body.go +++ b/pkg/base/body.go @@ -2,7 +2,6 @@ package base import ( "bufio" - "bytes" "fmt" "io" "strconv" @@ -36,10 +35,10 @@ func (b *body) read(header Header, rb *bufio.Reader) error { return nil } -func (b body) write(bb *bytes.Buffer) { +func (b body) write(w io.Writer) { if len(b) == 0 { return } - bb.Write(b) + w.Write(b) } diff --git a/pkg/base/header.go b/pkg/base/header.go index d3b58f23..7526f08d 100644 --- a/pkg/base/header.go +++ b/pkg/base/header.go @@ -2,8 +2,8 @@ package base import ( "bufio" - "bytes" "fmt" + "io" "net/http" "sort" "strings" @@ -98,7 +98,7 @@ func (h *Header) read(rb *bufio.Reader) error { return nil } -func (h Header) write(bb *bytes.Buffer) { +func (h Header) write(w io.Writer) { // sort headers by key // in order to obtain deterministic results keys := make([]string, len(h)) @@ -109,9 +109,9 @@ func (h Header) write(bb *bytes.Buffer) { for _, key := range keys { for _, val := range h[key] { - bb.Write([]byte(key + ": " + val + "\r\n")) + w.Write([]byte(key + ": " + val + "\r\n")) } } - bb.Write([]byte("\r\n")) + w.Write([]byte("\r\n")) } diff --git a/pkg/base/interleavedframe.go b/pkg/base/interleavedframe.go index 4cecb08a..cc9d47df 100644 --- a/pkg/base/interleavedframe.go +++ b/pkg/base/interleavedframe.go @@ -2,7 +2,6 @@ package base import ( "bufio" - "bytes" "encoding/binary" "fmt" "io" @@ -107,13 +106,11 @@ func (f *InterleavedFrame) Read(maxPayloadSize int, br *bufio.Reader) error { } // Write writes an InterleavedFrame into a buffered writer. -func (f InterleavedFrame) Write(bb *bytes.Buffer) { - bb.Reset() - +func (f InterleavedFrame) Write(w io.Writer) { buf := []byte{0x24, byte(f.Channel), 0x00, 0x00} binary.BigEndian.PutUint16(buf[2:], uint16(len(f.Payload))) - bb.Write(buf) + w.Write(buf) - bb.Write(f.Payload) + w.Write(f.Payload) } diff --git a/pkg/base/request.go b/pkg/base/request.go index 926c1721..fb58176e 100644 --- a/pkg/base/request.go +++ b/pkg/base/request.go @@ -5,6 +5,7 @@ import ( "bufio" "bytes" "fmt" + "io" "strconv" ) @@ -117,19 +118,17 @@ func (req *Request) ReadIgnoreFrames(maxPayloadSize int, rb *bufio.Reader) error } // Write writes a request. -func (req Request) Write(bb *bytes.Buffer) { - bb.Reset() - +func (req Request) Write(w io.Writer) { urStr := req.URL.CloneWithoutCredentials().String() - bb.Write([]byte(string(req.Method) + " " + urStr + " " + rtspProtocol10 + "\r\n")) + w.Write([]byte(string(req.Method) + " " + urStr + " " + rtspProtocol10 + "\r\n")) if len(req.Body) != 0 { req.Header["Content-Length"] = HeaderValue{strconv.FormatInt(int64(len(req.Body)), 10)} } - req.Header.write(bb) + req.Header.write(w) - body(req.Body).write(bb) + body(req.Body).write(w) } // String implements fmt.Stringer. diff --git a/pkg/base/response.go b/pkg/base/response.go index 9884705c..ad70eeba 100644 --- a/pkg/base/response.go +++ b/pkg/base/response.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "strconv" ) @@ -203,16 +204,14 @@ func (res *Response) ReadIgnoreFrames(maxPayloadSize int, rb *bufio.Reader) erro } // Write writes a Response. -func (res Response) Write(bb *bytes.Buffer) { - bb.Reset() - +func (res Response) Write(w io.Writer) { if res.StatusMessage == "" { if status, ok := statusMessages[res.StatusCode]; ok { res.StatusMessage = status } } - bb.Write([]byte(rtspProtocol10 + " " + + w.Write([]byte(rtspProtocol10 + " " + strconv.FormatInt(int64(res.StatusCode), 10) + " " + res.StatusMessage + "\r\n")) @@ -220,9 +219,9 @@ func (res Response) Write(bb *bytes.Buffer) { res.Header["Content-Length"] = HeaderValue{strconv.FormatInt(int64(len(res.Body)), 10)} } - res.Header.write(bb) + res.Header.write(w) - body(res.Body).write(bb) + body(res.Body).write(w) } // String implements fmt.Stringer. diff --git a/server_publish_test.go b/server_publish_test.go index d7957b80..bb488ba8 100644 --- a/server_publish_test.go +++ b/server_publish_test.go @@ -765,6 +765,7 @@ func TestServerPublish(t *testing.T) { Port: th.ServerPorts[1], }) } else { + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: testRTPPacketMarshaled, @@ -772,6 +773,7 @@ func TestServerPublish(t *testing.T) { _, err = conn.Write(bb.Bytes()) require.NoError(t, err) + bb.Reset() base.InterleavedFrame{ Channel: 1, Payload: testRTCPPacketMarshaled, @@ -915,6 +917,7 @@ func TestServerPublishOversizedPacket(t *testing.T) { require.Equal(t, base.StatusOK, res.StatusCode) byts, _ := oversizedPacketRTPIn.Marshal() + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: byts, @@ -1023,6 +1026,7 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + bb.Reset() base.InterleavedFrame{ Channel: 0, Payload: []byte{0x01, 0x02, 0x03, 0x04}, diff --git a/server_read_test.go b/server_read_test.go index 18bcb93b..9065e4db 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -547,6 +547,7 @@ func TestServerRead(t *testing.T) { <-framesReceived default: + bb.Reset() base.InterleavedFrame{ Channel: 5, Payload: testRTCPPacketMarshaled, @@ -1166,6 +1167,7 @@ func TestServerReadPlayPausePause(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + bb.Reset() base.Request{ Method: base.Pause, URL: mustParseURL("rtsp://localhost:8554/teststream"), @@ -1181,6 +1183,7 @@ func TestServerReadPlayPausePause(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + bb.Reset() base.Request{ Method: base.Pause, URL: mustParseURL("rtsp://localhost:8554/teststream"), diff --git a/server_test.go b/server_test.go index 5ab9a157..6047f561 100644 --- a/server_test.go +++ b/server_test.go @@ -1025,8 +1025,8 @@ func TestServerSessionClose(t *testing.T) { conn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer conn.Close() - var bb bytes.Buffer + var bb bytes.Buffer base.Request{ Method: base.Setup, URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), diff --git a/serversession.go b/serversession.go index d8fd6093..6a57d5b0 100644 --- a/serversession.go +++ b/serversession.go @@ -1188,6 +1188,7 @@ func (ss *ServerSession) runWriter() { if isRTP { f := rtpFrames[trackID] f.Payload = payload + buf.Reset() f.Write(&buf) ss.tcpConn.conn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout)) @@ -1195,6 +1196,7 @@ func (ss *ServerSession) runWriter() { } else { f := rtcpFrames[trackID] f.Payload = payload + buf.Reset() f.Write(&buf) ss.tcpConn.conn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout))