server: if TCP frames are written before the PLAY response, queue and send them anyway, like UDP already does

This commit is contained in:
aler9
2021-03-30 22:06:58 +02:00
parent d39996c824
commit d3561d9b26
4 changed files with 44 additions and 10 deletions

View File

@@ -425,6 +425,7 @@ func (sc *ServerConn) frameModeDisable() {
sc.tcpFrameEnabled = false sc.tcpFrameEnabled = false
sc.tcpFrameWriteBuffer.Close() sc.tcpFrameWriteBuffer.Close()
<-sc.tcpBackgroundWriteDone <-sc.tcpBackgroundWriteDone
sc.tcpFrameWriteBuffer.Reset()
} else { } else {
for _, track := range sc.setuppedTracks { for _, track := range sc.setuppedTracks {
@@ -443,6 +444,7 @@ func (sc *ServerConn) frameModeDisable() {
sc.tcpFrameEnabled = false sc.tcpFrameEnabled = false
sc.tcpFrameWriteBuffer.Close() sc.tcpFrameWriteBuffer.Close()
<-sc.tcpBackgroundWriteDone <-sc.tcpBackgroundWriteDone
sc.tcpFrameWriteBuffer.Reset()
} else { } else {
for _, track := range sc.setuppedTracks { for _, track := range sc.setuppedTracks {
@@ -1065,7 +1067,6 @@ func (sc *ServerConn) handleRequestOuter(req *base.Request) error {
res.Write(sc.bw) res.Write(sc.bw)
// start background write // start background write
sc.tcpFrameWriteBuffer.Reset()
sc.tcpBackgroundWriteDone = make(chan struct{}) sc.tcpBackgroundWriteDone = make(chan struct{})
go sc.tcpBackgroundWrite() go sc.tcpBackgroundWrite()

View File

@@ -272,7 +272,7 @@ y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD
-----END RSA PRIVATE KEY----- -----END RSA PRIVATE KEY-----
`) `)
func TestServerPublishReadHighLevel(t *testing.T) { func TestServerHighLevelPublishRead(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
encrypted bool encrypted bool
publisherSoft string publisherSoft string

View File

@@ -518,7 +518,7 @@ func TestServerPublishRecordErrorPartialTracks(t *testing.T) {
require.Equal(t, "not all announced tracks have been setup", err.Error()) require.Equal(t, "not all announced tracks have been setup", err.Error())
} }
func TestServerPublishFrames(t *testing.T) { func TestServerPublish(t *testing.T) {
for _, proto := range []string{ for _, proto := range []string{
"udp", "udp",
"tcp", "tcp",
@@ -711,7 +711,7 @@ func TestServerPublishFrames(t *testing.T) {
} }
} }
func TestServerPublishFramesErrorWrongProtocol(t *testing.T) { func TestServerPublishErrorWrongProtocol(t *testing.T) {
conf := ServerConf{ conf := ServerConf{
UDPRTPAddress: "127.0.0.1:8000", UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001", UDPRTCPAddress: "127.0.0.1:8001",

View File

@@ -291,7 +291,7 @@ func TestServerReadSetupErrorTrackTwice(t *testing.T) {
require.Equal(t, "track 0 has already been setup", err.Error()) require.Equal(t, "track 0 has already been setup", err.Error())
} }
func TestServerReadFrames(t *testing.T) { func TestServerRead(t *testing.T) {
for _, proto := range []string{ for _, proto := range []string{
"udp", "udp",
"tcp", "tcp",
@@ -324,6 +324,9 @@ func TestServerReadFrames(t *testing.T) {
} }
onPlay := func(ctx *ServerConnPlayCtx) (*base.Response, error) { onPlay := func(ctx *ServerConnPlayCtx) (*base.Response, error) {
conn.WriteFrame(0, StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04})
conn.WriteFrame(0, StreamTypeRTCP, []byte{0x05, 0x06, 0x07, 0x08})
return &base.Response{ return &base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
}, nil }, nil
@@ -386,6 +389,14 @@ func TestServerReadFrames(t *testing.T) {
err = th.Read(res.Header["Transport"]) err = th.Read(res.Header["Transport"])
require.NoError(t, err) require.NoError(t, err)
l1, err := net.ListenPacket("udp", "localhost:35466")
require.NoError(t, err)
defer l1.Close()
l2, err := net.ListenPacket("udp", "localhost:35467")
require.NoError(t, err)
defer l2.Close()
err = base.Request{ err = base.Request{
Method: base.Play, Method: base.Play,
URL: base.MustParseURL("rtsp://localhost:8554/teststream"), URL: base.MustParseURL("rtsp://localhost:8554/teststream"),
@@ -399,14 +410,36 @@ func TestServerReadFrames(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode) require.Equal(t, base.StatusOK, res.StatusCode)
// server -> client
if proto == "udp" { if proto == "udp" {
time.Sleep(1 * time.Second) buf := make([]byte, 2048)
n, _, err := l1.ReadFrom(buf)
l1, err := net.ListenPacket("udp", "localhost:35467")
require.NoError(t, err) require.NoError(t, err)
defer l1.Close() require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n])
l1.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ buf = make([]byte, 2048)
n, _, err = l2.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n])
} else {
var f base.InterleavedFrame
f.Payload = make([]byte, 2048)
err := f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTP, f.StreamType)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload)
f.Payload = make([]byte, 2048)
err = f.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, StreamTypeRTCP, f.StreamType)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload)
}
// client -> server (RTCP)
if proto == "udp" {
l2.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"), IP: net.ParseIP("127.0.0.1"),
Port: th.ServerPorts[1], Port: th.ServerPorts[1],
}) })