accept io.Writer in Write() methods

This commit is contained in:
aler9
2022-05-08 14:33:29 +02:00
parent bdbce3c370
commit edeef85e9e
13 changed files with 183 additions and 26 deletions

View File

@@ -1883,6 +1883,7 @@ func (c *Client) runWriter() {
if isRTP {
f := rtpFrames[trackID]
f.Payload = payload
buf.Reset()
f.Write(&buf)
c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
@@ -1890,6 +1891,7 @@ func (c *Client) runWriter() {
} else {
f := rtcpFrames[trackID]
f.Payload = payload
buf.Reset()
f.Write(&buf)
c.conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))

View File

@@ -90,6 +90,7 @@ func TestClientPublishSerial(t *testing.T) {
require.Equal(t, base.Options, req.Method)
require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -108,6 +109,7 @@ func TestClientPublishSerial(t *testing.T) {
require.Equal(t, base.Announce, req.Method)
require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -151,6 +153,7 @@ func TestClientPublishSerial(t *testing.T) {
th.InterleavedIDs = inTH.InterleavedIDs
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -165,6 +168,7 @@ func TestClientPublishSerial(t *testing.T) {
require.Equal(t, base.Record, req.Method)
require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -198,6 +202,7 @@ func TestClientPublishSerial(t *testing.T) {
Port: th.ClientPorts[1],
})
} else {
bb.Reset()
base.InterleavedFrame{
Channel: 1,
Payload: testRTCPPacketMarshaled,
@@ -211,6 +216,7 @@ func TestClientPublishSerial(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -303,6 +309,7 @@ func TestClientPublishParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -320,6 +327,7 @@ func TestClientPublishParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Announce, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -350,6 +358,7 @@ func TestClientPublishParallel(t *testing.T) {
th.InterleavedIDs = inTH.InterleavedIDs
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -363,6 +372,7 @@ func TestClientPublishParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Record, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -373,6 +383,7 @@ func TestClientPublishParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -449,6 +460,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -467,6 +479,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Announce, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -497,6 +510,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
th.InterleavedIDs = inTH.InterleavedIDs
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -510,6 +524,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Record, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -520,6 +535,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Pause, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -530,6 +546,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Record, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -540,6 +557,7 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -606,6 +624,7 @@ func TestClientPublishPauseParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -624,6 +643,7 @@ func TestClientPublishPauseParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Announce, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -654,6 +674,7 @@ func TestClientPublishPauseParallel(t *testing.T) {
th.InterleavedIDs = inTH.InterleavedIDs
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -667,6 +688,7 @@ func TestClientPublishPauseParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Record, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -677,6 +699,7 @@ func TestClientPublishPauseParallel(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Pause, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -749,6 +772,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
require.Equal(t, base.Options, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -767,6 +791,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
require.Equal(t, base.Announce, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -777,6 +802,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusUnsupportedTransport,
}.Write(&bb)
@@ -801,6 +827,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
InterleavedIDs: &[2]int{0, 1},
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -815,6 +842,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
require.Equal(t, base.Record, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -834,6 +862,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -877,6 +906,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -894,6 +924,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Announce, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -916,6 +947,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
require.NoError(t, err)
defer l2.Close()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -937,6 +969,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Record, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -971,6 +1004,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1026,6 +1060,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1043,6 +1078,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Announce, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1066,6 +1102,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {
InterleavedIDs: inTH.InterleavedIDs,
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1079,12 +1116,14 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Record, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -1092,6 +1131,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 1,
Payload: testRTCPPacketMarshaled,
@@ -1103,6 +1143,7 @@ func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)

View File

@@ -66,6 +66,7 @@ func TestClientReadTracks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -87,6 +88,7 @@ func TestClientReadTracks(t *testing.T) {
tracks := Tracks{track1, track2, track3}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -118,6 +120,7 @@ func TestClientReadTracks(t *testing.T) {
ServerPorts: &[2]int{34556 + i*2, 34557 + i*2},
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -133,6 +136,7 @@ func TestClientReadTracks(t *testing.T) {
require.Equal(t, base.Play, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -144,6 +148,7 @@ func TestClientReadTracks(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -203,6 +208,7 @@ func TestClientRead(t *testing.T) {
require.Equal(t, base.Options, req.Method)
require.Equal(t, mustParseURL(scheme+"://"+listenIP+":8554/test/stream?param=value"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -227,6 +233,7 @@ func TestClientRead(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -311,6 +318,7 @@ func TestClientRead(t *testing.T) {
th.InterleavedIDs = &[2]int{0, 1}
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -326,6 +334,7 @@ func TestClientRead(t *testing.T) {
require.Equal(t, mustParseURL(scheme+"://"+listenIP+":8554/test/stream?param=value/"), req.URL)
require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"])
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -349,6 +358,7 @@ func TestClientRead(t *testing.T) {
})
case "tcp", "tls":
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -389,6 +399,7 @@ func TestClientRead(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL(scheme+"://"+listenIP+":8554/test/stream?param=value/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -518,6 +529,7 @@ func TestClientReadOversizedPacket(t *testing.T) {
require.Equal(t, base.Options, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -542,6 +554,7 @@ func TestClientReadOversizedPacket(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -567,6 +580,7 @@ func TestClientReadOversizedPacket(t *testing.T) {
InterleavedIDs: &[2]int{0, 1},
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -582,6 +596,7 @@ func TestClientReadOversizedPacket(t *testing.T) {
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"])
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -589,6 +604,7 @@ func TestClientReadOversizedPacket(t *testing.T) {
require.NoError(t, err)
byts, _ := oversizedPacketRTPIn.Marshal()
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: byts,
@@ -643,6 +659,7 @@ func TestClientReadPartial(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -670,6 +687,7 @@ func TestClientReadPartial(t *testing.T) {
tracks := Tracks{track1, track2}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -700,6 +718,7 @@ func TestClientReadPartial(t *testing.T) {
InterleavedIDs: inTH.InterleavedIDs,
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -714,12 +733,14 @@ func TestClientReadPartial(t *testing.T) {
require.Equal(t, base.Play, req.Method)
require.Equal(t, mustParseURL("rtsp://"+listenIP+":8554/teststream/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -732,6 +753,7 @@ func TestClientReadPartial(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL("rtsp://"+listenIP+":8554/teststream/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -792,6 +814,7 @@ func TestClientReadNoContentBase(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -816,6 +839,7 @@ func TestClientReadNoContentBase(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -845,6 +869,7 @@ func TestClientReadNoContentBase(t *testing.T) {
ServerPorts: &[2]int{34556, 34557},
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -859,6 +884,7 @@ func TestClientReadNoContentBase(t *testing.T) {
require.Equal(t, base.Play, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -870,6 +896,7 @@ func TestClientReadNoContentBase(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -913,6 +940,7 @@ func TestClientReadAnyPort(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -936,6 +964,7 @@ func TestClientReadAnyPort(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -963,6 +992,7 @@ func TestClientReadAnyPort(t *testing.T) {
require.NoError(t, err)
defer l1b.Close()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -998,6 +1028,7 @@ func TestClientReadAnyPort(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1073,6 +1104,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1096,6 +1128,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1111,6 +1144,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusUnsupportedTransport,
}.Write(&bb)
@@ -1126,6 +1160,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, headers.TransportProtocolTCP, inTH.Protocol)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1146,12 +1181,14 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -1194,6 +1231,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1213,6 +1251,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
v := auth.NewValidator("myuser", "mypass", nil)
bb.Reset()
base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
@@ -1235,6 +1274,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1265,6 +1305,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
ClientPorts: inTH.ClientPorts,
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1278,6 +1319,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1288,6 +1330,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1304,6 +1347,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1321,6 +1365,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1338,6 +1383,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
v = auth.NewValidator("myuser", "mypass", nil)
bb.Reset()
base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
@@ -1368,6 +1414,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
InterleavedIDs: inTH.InterleavedIDs,
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1381,12 +1428,14 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -1398,6 +1447,7 @@ func TestClientReadAutomaticProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1444,6 +1494,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1468,6 +1519,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) {
tracks := Tracks{track1}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1497,6 +1549,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) {
InterleavedIDs: &[2]int{2, 3},
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1511,12 +1564,14 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) {
require.Equal(t, base.Play, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 2,
Payload: testRTPPacketMarshaled,
@@ -1529,6 +1584,7 @@ func TestClientReadDifferentInterleavedIDs(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/teststream/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1575,6 +1631,7 @@ func TestClientReadRedirect(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1592,6 +1649,7 @@ func TestClientReadRedirect(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusMovedPermanently,
Header: base.Header{
@@ -1612,6 +1670,7 @@ func TestClientReadRedirect(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1635,6 +1694,7 @@ func TestClientReadRedirect(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1654,6 +1714,7 @@ func TestClientReadRedirect(t *testing.T) {
err = th.Read(req.Header["Transport"])
require.NoError(t, err)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1675,6 +1736,7 @@ func TestClientReadRedirect(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1737,6 +1799,7 @@ func TestClientReadPause(t *testing.T) {
Port: inTH.ClientPorts[0],
})
} else {
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -1777,6 +1840,7 @@ func TestClientReadPause(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1800,6 +1864,7 @@ func TestClientReadPause(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1835,6 +1900,7 @@ func TestClientReadPause(t *testing.T) {
th.InterleavedIDs = inTH.InterleavedIDs
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1848,6 +1914,7 @@ func TestClientReadPause(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1863,6 +1930,7 @@ func TestClientReadPause(t *testing.T) {
close(writerTerminate)
<-writerDone
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1873,6 +1941,7 @@ func TestClientReadPause(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1888,6 +1957,7 @@ func TestClientReadPause(t *testing.T) {
close(writerTerminate)
<-writerDone
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -1956,6 +2026,7 @@ func TestClientReadRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -1980,6 +2051,7 @@ func TestClientReadRTCPReport(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2007,6 +2079,7 @@ func TestClientReadRTCPReport(t *testing.T) {
require.NoError(t, err)
defer l2.Close()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2028,6 +2101,7 @@ func TestClientReadRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2097,6 +2171,7 @@ func TestClientReadRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2141,6 +2216,7 @@ func TestClientReadErrorTimeout(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2164,6 +2240,7 @@ func TestClientReadErrorTimeout(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2205,6 +2282,7 @@ func TestClientReadErrorTimeout(t *testing.T) {
th.InterleavedIDs = inTH.InterleavedIDs
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2218,6 +2296,7 @@ func TestClientReadErrorTimeout(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2236,6 +2315,7 @@ func TestClientReadErrorTimeout(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2296,6 +2376,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2319,6 +2400,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2347,6 +2429,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
th.Protocol = headers.TransportProtocolTCP
th.InterleavedIDs = inTH.InterleavedIDs
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2360,12 +2443,14 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 6,
Payload: testRTPPacketMarshaled,
@@ -2373,6 +2458,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -2384,6 +2470,7 @@ func TestClientReadIgnoreTCPInvalidTrack(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2430,6 +2517,7 @@ func TestClientReadSeek(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2453,6 +2541,7 @@ func TestClientReadSeek(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2481,6 +2570,7 @@ func TestClientReadSeek(t *testing.T) {
InterleavedIDs: inTH.InterleavedIDs,
}
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2503,6 +2593,7 @@ func TestClientReadSeek(t *testing.T) {
},
}, ra)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2513,6 +2604,7 @@ func TestClientReadSeek(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Pause, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2531,6 +2623,7 @@ func TestClientReadSeek(t *testing.T) {
},
}, ra)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2541,6 +2634,7 @@ func TestClientReadSeek(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2607,6 +2701,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2630,6 +2725,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2649,6 +2745,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) {
err = inTH.Read(req.Header["Transport"])
require.NoError(t, err)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2677,6 +2774,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2690,6 +2788,7 @@ func TestClientReadKeepaliveFromSession(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2738,6 +2837,7 @@ func TestClientReadDifferentSource(t *testing.T) {
require.Equal(t, base.Options, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/test/stream?param=value"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2762,6 +2862,7 @@ func TestClientReadDifferentSource(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2804,6 +2905,7 @@ func TestClientReadDifferentSource(t *testing.T) {
require.NoError(t, err)
defer l2.Close()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -2819,6 +2921,7 @@ func TestClientReadDifferentSource(t *testing.T) {
require.Equal(t, mustParseURL("rtsp://localhost:8554/test/stream?param=value/"), req.URL)
require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"])
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)
@@ -2837,6 +2940,7 @@ func TestClientReadDifferentSource(t *testing.T) {
require.Equal(t, base.Teardown, req.Method)
require.Equal(t, mustParseURL("rtsp://localhost:8554/test/stream?param=value/"), req.URL)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
}.Write(&bb)

View File

@@ -101,6 +101,7 @@ func TestClientSession(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -125,6 +126,7 @@ func TestClientSession(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -170,6 +172,7 @@ func TestClientAuth(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -187,6 +190,7 @@ func TestClientAuth(t *testing.T) {
v := auth.NewValidator("myuser", "mypass", nil)
bb.Reset()
base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
@@ -209,6 +213,7 @@ func TestClientAuth(t *testing.T) {
tracks := Tracks{track}
tracks.setControls()
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -253,6 +258,7 @@ func TestClientDescribeCharset(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@@ -272,6 +278,7 @@ func TestClientDescribeCharset(t *testing.T) {
track1, err := NewTrackH264(96, []byte{0x01, 0x02, 0x03, 0x04}, []byte{0x01, 0x02, 0x03, 0x04}, nil)
require.NoError(t, err)
bb.Reset()
base.Response{
StatusCode: base.StatusOK,
Header: base.Header{

View File

@@ -2,7 +2,6 @@ package base
import (
"bufio"
"bytes"
"fmt"
"io"
"strconv"
@@ -36,10 +35,10 @@ func (b *body) read(header Header, rb *bufio.Reader) error {
return nil
}
func (b body) write(bb *bytes.Buffer) {
func (b body) write(w io.Writer) {
if len(b) == 0 {
return
}
bb.Write(b)
w.Write(b)
}

View File

@@ -2,8 +2,8 @@ package base
import (
"bufio"
"bytes"
"fmt"
"io"
"net/http"
"sort"
"strings"
@@ -98,7 +98,7 @@ func (h *Header) read(rb *bufio.Reader) error {
return nil
}
func (h Header) write(bb *bytes.Buffer) {
func (h Header) write(w io.Writer) {
// sort headers by key
// in order to obtain deterministic results
keys := make([]string, len(h))
@@ -109,9 +109,9 @@ func (h Header) write(bb *bytes.Buffer) {
for _, key := range keys {
for _, val := range h[key] {
bb.Write([]byte(key + ": " + val + "\r\n"))
w.Write([]byte(key + ": " + val + "\r\n"))
}
}
bb.Write([]byte("\r\n"))
w.Write([]byte("\r\n"))
}

View File

@@ -2,7 +2,6 @@ package base
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
@@ -107,13 +106,11 @@ func (f *InterleavedFrame) Read(maxPayloadSize int, br *bufio.Reader) error {
}
// Write writes an InterleavedFrame into a buffered writer.
func (f InterleavedFrame) Write(bb *bytes.Buffer) {
bb.Reset()
func (f InterleavedFrame) Write(w io.Writer) {
buf := []byte{0x24, byte(f.Channel), 0x00, 0x00}
binary.BigEndian.PutUint16(buf[2:], uint16(len(f.Payload)))
bb.Write(buf)
w.Write(buf)
bb.Write(f.Payload)
w.Write(f.Payload)
}

View File

@@ -5,6 +5,7 @@ import (
"bufio"
"bytes"
"fmt"
"io"
"strconv"
)
@@ -117,19 +118,17 @@ func (req *Request) ReadIgnoreFrames(maxPayloadSize int, rb *bufio.Reader) error
}
// Write writes a request.
func (req Request) Write(bb *bytes.Buffer) {
bb.Reset()
func (req Request) Write(w io.Writer) {
urStr := req.URL.CloneWithoutCredentials().String()
bb.Write([]byte(string(req.Method) + " " + urStr + " " + rtspProtocol10 + "\r\n"))
w.Write([]byte(string(req.Method) + " " + urStr + " " + rtspProtocol10 + "\r\n"))
if len(req.Body) != 0 {
req.Header["Content-Length"] = HeaderValue{strconv.FormatInt(int64(len(req.Body)), 10)}
}
req.Header.write(bb)
req.Header.write(w)
body(req.Body).write(bb)
body(req.Body).write(w)
}
// String implements fmt.Stringer.

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"fmt"
"io"
"strconv"
)
@@ -203,16 +204,14 @@ func (res *Response) ReadIgnoreFrames(maxPayloadSize int, rb *bufio.Reader) erro
}
// Write writes a Response.
func (res Response) Write(bb *bytes.Buffer) {
bb.Reset()
func (res Response) Write(w io.Writer) {
if res.StatusMessage == "" {
if status, ok := statusMessages[res.StatusCode]; ok {
res.StatusMessage = status
}
}
bb.Write([]byte(rtspProtocol10 + " " +
w.Write([]byte(rtspProtocol10 + " " +
strconv.FormatInt(int64(res.StatusCode), 10) + " " +
res.StatusMessage + "\r\n"))
@@ -220,9 +219,9 @@ func (res Response) Write(bb *bytes.Buffer) {
res.Header["Content-Length"] = HeaderValue{strconv.FormatInt(int64(len(res.Body)), 10)}
}
res.Header.write(bb)
res.Header.write(w)
body(res.Body).write(bb)
body(res.Body).write(w)
}
// String implements fmt.Stringer.

View File

@@ -765,6 +765,7 @@ func TestServerPublish(t *testing.T) {
Port: th.ServerPorts[1],
})
} else {
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: testRTPPacketMarshaled,
@@ -772,6 +773,7 @@ func TestServerPublish(t *testing.T) {
_, err = conn.Write(bb.Bytes())
require.NoError(t, err)
bb.Reset()
base.InterleavedFrame{
Channel: 1,
Payload: testRTCPPacketMarshaled,
@@ -915,6 +917,7 @@ func TestServerPublishOversizedPacket(t *testing.T) {
require.Equal(t, base.StatusOK, res.StatusCode)
byts, _ := oversizedPacketRTPIn.Marshal()
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: byts,
@@ -1023,6 +1026,7 @@ func TestServerPublishErrorInvalidProtocol(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
bb.Reset()
base.InterleavedFrame{
Channel: 0,
Payload: []byte{0x01, 0x02, 0x03, 0x04},

View File

@@ -547,6 +547,7 @@ func TestServerRead(t *testing.T) {
<-framesReceived
default:
bb.Reset()
base.InterleavedFrame{
Channel: 5,
Payload: testRTCPPacketMarshaled,
@@ -1166,6 +1167,7 @@ func TestServerReadPlayPausePause(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
bb.Reset()
base.Request{
Method: base.Pause,
URL: mustParseURL("rtsp://localhost:8554/teststream"),
@@ -1181,6 +1183,7 @@ func TestServerReadPlayPausePause(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
bb.Reset()
base.Request{
Method: base.Pause,
URL: mustParseURL("rtsp://localhost:8554/teststream"),

View File

@@ -1025,8 +1025,8 @@ func TestServerSessionClose(t *testing.T) {
conn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer conn.Close()
var bb bytes.Buffer
var bb bytes.Buffer
base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"),

View File

@@ -1188,6 +1188,7 @@ func (ss *ServerSession) runWriter() {
if isRTP {
f := rtpFrames[trackID]
f.Payload = payload
buf.Reset()
f.Write(&buf)
ss.tcpConn.conn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout))
@@ -1195,6 +1196,7 @@ func (ss *ServerSession) runWriter() {
} else {
f := rtcpFrames[trackID]
f.Payload = payload
buf.Reset()
f.Write(&buf)
ss.tcpConn.conn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout))