From d3561d9b26af6aaea7f2817ea3fda7ba8f1272c0 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 30 Mar 2021 22:06:58 +0200 Subject: [PATCH] server: if TCP frames are written before the PLAY response, queue and send them anyway, like UDP already does --- serverconn.go | 3 ++- serverconn_test.go | 2 +- serverconnpublish_test.go | 4 ++-- serverconnread_test.go | 45 +++++++++++++++++++++++++++++++++------ 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/serverconn.go b/serverconn.go index 603cb27f..a965e7be 100644 --- a/serverconn.go +++ b/serverconn.go @@ -425,6 +425,7 @@ func (sc *ServerConn) frameModeDisable() { sc.tcpFrameEnabled = false sc.tcpFrameWriteBuffer.Close() <-sc.tcpBackgroundWriteDone + sc.tcpFrameWriteBuffer.Reset() } else { for _, track := range sc.setuppedTracks { @@ -443,6 +444,7 @@ func (sc *ServerConn) frameModeDisable() { sc.tcpFrameEnabled = false sc.tcpFrameWriteBuffer.Close() <-sc.tcpBackgroundWriteDone + sc.tcpFrameWriteBuffer.Reset() } else { for _, track := range sc.setuppedTracks { @@ -1065,7 +1067,6 @@ func (sc *ServerConn) handleRequestOuter(req *base.Request) error { res.Write(sc.bw) // start background write - sc.tcpFrameWriteBuffer.Reset() sc.tcpBackgroundWriteDone = make(chan struct{}) go sc.tcpBackgroundWrite() diff --git a/serverconn_test.go b/serverconn_test.go index 404e8fc3..c4cd3a9c 100644 --- a/serverconn_test.go +++ b/serverconn_test.go @@ -272,7 +272,7 @@ y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD -----END RSA PRIVATE KEY----- `) -func TestServerPublishReadHighLevel(t *testing.T) { +func TestServerHighLevelPublishRead(t *testing.T) { for _, ca := range []struct { encrypted bool publisherSoft string diff --git a/serverconnpublish_test.go b/serverconnpublish_test.go index ef4da20a..52c3b7b0 100644 --- a/serverconnpublish_test.go +++ b/serverconnpublish_test.go @@ -518,7 +518,7 @@ func TestServerPublishRecordErrorPartialTracks(t *testing.T) { require.Equal(t, "not all announced tracks have been setup", err.Error()) } -func TestServerPublishFrames(t *testing.T) { +func TestServerPublish(t *testing.T) { for _, proto := range []string{ "udp", "tcp", @@ -711,7 +711,7 @@ func TestServerPublishFrames(t *testing.T) { } } -func TestServerPublishFramesErrorWrongProtocol(t *testing.T) { +func TestServerPublishErrorWrongProtocol(t *testing.T) { conf := ServerConf{ UDPRTPAddress: "127.0.0.1:8000", UDPRTCPAddress: "127.0.0.1:8001", diff --git a/serverconnread_test.go b/serverconnread_test.go index d5065afd..6b52c18c 100644 --- a/serverconnread_test.go +++ b/serverconnread_test.go @@ -291,7 +291,7 @@ func TestServerReadSetupErrorTrackTwice(t *testing.T) { require.Equal(t, "track 0 has already been setup", err.Error()) } -func TestServerReadFrames(t *testing.T) { +func TestServerRead(t *testing.T) { for _, proto := range []string{ "udp", "tcp", @@ -324,6 +324,9 @@ func TestServerReadFrames(t *testing.T) { } onPlay := func(ctx *ServerConnPlayCtx) (*base.Response, error) { + conn.WriteFrame(0, StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) + conn.WriteFrame(0, StreamTypeRTCP, []byte{0x05, 0x06, 0x07, 0x08}) + return &base.Response{ StatusCode: base.StatusOK, }, nil @@ -386,6 +389,14 @@ func TestServerReadFrames(t *testing.T) { err = th.Read(res.Header["Transport"]) require.NoError(t, err) + l1, err := net.ListenPacket("udp", "localhost:35466") + require.NoError(t, err) + defer l1.Close() + + l2, err := net.ListenPacket("udp", "localhost:35467") + require.NoError(t, err) + defer l2.Close() + err = base.Request{ Method: base.Play, URL: base.MustParseURL("rtsp://localhost:8554/teststream"), @@ -399,14 +410,36 @@ func TestServerReadFrames(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) + // server -> client if proto == "udp" { - time.Sleep(1 * time.Second) - - l1, err := net.ListenPacket("udp", "localhost:35467") + buf := make([]byte, 2048) + n, _, err := l1.ReadFrom(buf) require.NoError(t, err) - defer l1.Close() + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n]) - l1.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ + buf = make([]byte, 2048) + n, _, err = l2.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n]) + + } else { + var f base.InterleavedFrame + f.Payload = make([]byte, 2048) + err := f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) + + f.Payload = make([]byte, 2048) + err = f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) + } + + // client -> server (RTCP) + if proto == "udp" { + l2.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: th.ServerPorts[1], })