diff --git a/clientconnpublish_test.go b/clientconnpublish_test.go index 534f8439..244d69cb 100644 --- a/clientconnpublish_test.go +++ b/clientconnpublish_test.go @@ -33,6 +33,7 @@ func TestClientPublishSerial(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) + defer conn.Close() bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) var req base.Request @@ -94,6 +95,18 @@ func TestClientPublishSerial(t *testing.T) { }.Write(bconn.Writer) require.NoError(t, err) + var l1 net.PacketConn + var l2 net.PacketConn + if proto == "udp" { + l1, err = net.ListenPacket("udp", "localhost:34556") + require.NoError(t, err) + defer l1.Close() + + l2, err = net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l2.Close() + } + err = req.Read(bconn.Reader) require.NoError(t, err) require.Equal(t, base.Record, req.Method) @@ -103,8 +116,39 @@ func TestClientPublishSerial(t *testing.T) { }.Write(bconn.Writer) require.NoError(t, err) - buf := make([]byte, 2048) - err = req.ReadIgnoreFrames(bconn.Reader, buf) + // client -> server + if proto == "udp" { + buf := make([]byte, 2048) + n, _, err := l1.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n]) + + } else { + var f base.InterleavedFrame + f.Payload = make([]byte, 2048) + err = f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, StreamTypeRTP, f.StreamType) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) + } + + // server -> client (RTCP) + if proto == "udp" { + l2.WriteTo([]byte{0x05, 0x06, 0x07, 0x08}, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: th.ClientPorts[1], + }) + + } else { + err = base.InterleavedFrame{ + TrackID: 0, + StreamType: StreamTypeRTCP, + Payload: []byte{0x05, 0x06, 0x07, 0x08}, + }.Write(bconn.Writer) + require.NoError(t, err) + } + + err = req.Read(bconn.Reader) require.NoError(t, err) require.Equal(t, base.Teardown, req.Method) @@ -112,8 +156,6 @@ func TestClientPublishSerial(t *testing.T) { StatusCode: base.StatusOK, }.Write(bconn.Writer) require.NoError(t, err) - - conn.Close() }() conf := ClientConf{ @@ -134,11 +176,21 @@ func TestClientPublishSerial(t *testing.T) { Tracks{track}) require.NoError(t, err) + recvDone := make(chan struct{}) + done := conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { + require.Equal(t, 0, trackID) + require.Equal(t, StreamTypeRTCP, streamType) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, payload) + close(recvDone) + }) + err = conn.WriteFrame(track.ID, StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) require.NoError(t, err) + <-recvDone conn.Close() + <-done err = conn.WriteFrame(track.ID, StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) @@ -164,6 +216,7 @@ func TestClientPublishParallel(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) + defer conn.Close() bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) var req base.Request @@ -243,8 +296,6 @@ func TestClientPublishParallel(t *testing.T) { StatusCode: base.StatusOK, }.Write(bconn.Writer) require.NoError(t, err) - - conn.Close() }() conf := ClientConf{ @@ -306,6 +357,7 @@ func TestClientPublishPauseSerial(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) + defer conn.Close() bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) var req base.Request @@ -405,8 +457,6 @@ func TestClientPublishPauseSerial(t *testing.T) { StatusCode: base.StatusOK, }.Write(bconn.Writer) require.NoError(t, err) - - conn.Close() }() conf := ClientConf{ @@ -466,6 +516,7 @@ func TestClientPublishPauseParallel(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) + defer conn.Close() bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) var req base.Request @@ -546,8 +597,6 @@ func TestClientPublishPauseParallel(t *testing.T) { StatusCode: base.StatusOK, }.Write(bconn.Writer) require.NoError(t, err) - - conn.Close() }() conf := ClientConf{ @@ -595,7 +644,7 @@ func TestClientPublishPauseParallel(t *testing.T) { } } -func TestClientPublishRTCP(t *testing.T) { +func TestClientPublishRTCPReport(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -607,6 +656,7 @@ func TestClientPublishRTCP(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) + defer conn.Close() bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) var req base.Request @@ -714,8 +764,6 @@ func TestClientPublishRTCP(t *testing.T) { base.Response{ StatusCode: base.StatusOK, }.Write(bconn.Writer) - - conn.Close() }() conf := ClientConf{ @@ -753,174 +801,3 @@ func TestClientPublishRTCP(t *testing.T) { err = conn.WriteFrame(track.ID, StreamTypeRTP, byts) require.NoError(t, err) } - -func TestClientPublishReadManualRTCP(t *testing.T) { - for _, proto := range []string{ - "udp", - "tcp", - } { - t.Run(proto, func(t *testing.T) { - l, err := net.Listen("tcp", "localhost:8554") - require.NoError(t, err) - defer l.Close() - - serverDone := make(chan struct{}) - defer func() { <-serverDone }() - go func() { - defer close(serverDone) - - conn, err := l.Accept() - require.NoError(t, err) - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - var req base.Request - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Options, req.Method) - - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.Announce), - string(base.Setup), - string(base.Record), - }, ", ")}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Announce, req.Method) - - err = base.Response{ - StatusCode: base.StatusOK, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Setup, req.Method) - - var inTH headers.Transport - err = inTH.Read(req.Header["Transport"]) - require.NoError(t, err) - - th := headers.Transport{ - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - } - - var l1 net.PacketConn - if proto == "udp" { - var err error - l1, err = net.ListenPacket("udp", "localhost:34557") - require.NoError(t, err) - defer l1.Close() - - th.Protocol = StreamProtocolUDP - th.ServerPorts = &[2]int{34556, 34557} - th.ClientPorts = inTH.ClientPorts - - } else { - th.Protocol = StreamProtocolTCP - th.InterleavedIDs = inTH.InterleavedIDs - } - - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": th.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Record, req.Method) - - err = base.Response{ - StatusCode: base.StatusOK, - }.Write(bconn.Writer) - require.NoError(t, err) - - if proto == "udp" { - buf := make([]byte, 2048) - n, _, err := l1.ReadFrom(buf) - require.NoError(t, err) - require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n]) - - } else { - var f base.InterleavedFrame - f.Payload = make([]byte, 2048) - err = f.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, StreamTypeRTCP, f.StreamType) - require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) - } - - if proto == "udp" { - l1.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: th.ClientPorts[1], - }) - - } else { - err = base.InterleavedFrame{ - TrackID: 0, - StreamType: StreamTypeRTCP, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, - }.Write(bconn.Writer) - require.NoError(t, err) - } - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Teardown, req.Method) - - base.Response{ - StatusCode: base.StatusOK, - }.Write(bconn.Writer) - - conn.Close() - }() - - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - if proto == "udp" { - v := StreamProtocolUDP - return &v - } - v := StreamProtocolTCP - return &v - }(), - } - - track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) - - conn, err := conf.DialPublish("rtsp://localhost:8554/teststream", - Tracks{track}) - require.NoError(t, err) - - recvDone := make(chan struct{}) - done := conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { - require.Equal(t, 0, trackID) - require.Equal(t, StreamTypeRTCP, streamType) - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) - close(recvDone) - }) - - err = conn.WriteFrame(track.ID, StreamTypeRTCP, - []byte{0x05, 0x06, 0x07, 0x08}) - require.NoError(t, err) - - <-recvDone - conn.Close() - <-done - }) - } -} diff --git a/clientconnread_test.go b/clientconnread_test.go index 4f7abc20..813b5669 100644 --- a/clientconnread_test.go +++ b/clientconnread_test.go @@ -35,6 +35,8 @@ func TestClientRead(t *testing.T) { }() t.Run(encryptedStr+"_"+ca.proto, func(t *testing.T) { + frameRecv := make(chan struct{}) + var scheme string var l net.Listener if ca.encrypted { @@ -105,43 +107,44 @@ func TestClientRead(t *testing.T) { require.NoError(t, err) require.Equal(t, base.Setup, req.Method) - var th headers.Transport - err = th.Read(req.Header["Transport"]) + var inTH headers.Transport + err = inTH.Read(req.Header["Transport"]) require.NoError(t, err) - if ca.proto == "udp" { - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": headers.Transport{ - Protocol: StreamProtocolUDP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - ClientPorts: th.ClientPorts, - ServerPorts: &[2]int{34556, 34557}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) + th := headers.Transport{ + Delivery: func() *base.StreamDelivery { + v := base.StreamDeliveryUnicast + return &v + }(), + } + if ca.proto == "udp" { + th.Protocol = StreamProtocolUDP + th.ClientPorts = inTH.ClientPorts + th.ServerPorts = &[2]int{34556, 34557} } else { - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": headers.Transport{ - Protocol: StreamProtocolTCP, - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - ClientPorts: th.ClientPorts, - InterleavedIDs: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) + th.Protocol = StreamProtocolTCP + th.InterleavedIDs = &[2]int{0, 1} + } + + err = base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Write(), + }, + }.Write(bconn.Writer) + require.NoError(t, err) + + var l1 net.PacketConn + var l2 net.PacketConn + if ca.proto == "udp" { + l1, err = net.ListenPacket("udp", "localhost:34556") require.NoError(t, err) + defer l1.Close() + + l2, err = net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l2.Close() } err = req.Read(bconn.Reader) @@ -153,26 +156,44 @@ func TestClientRead(t *testing.T) { }.Write(bconn.Writer) require.NoError(t, err) + // server -> client if ca.proto == "udp" { time.Sleep(1 * time.Second) - - l1, err := net.ListenPacket("udp", "localhost:34556") - require.NoError(t, err) - defer l1.Close() - - l1.WriteTo([]byte("\x00\x00\x00\x00"), &net.UDPAddr{ + l1.WriteTo([]byte{0x01, 0x02, 0x03, 0x04}, &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: th.ClientPorts[0], }) - } else { err = base.InterleavedFrame{ TrackID: 0, StreamType: StreamTypeRTP, - Payload: []byte("\x00\x00\x00\x00"), + Payload: []byte{0x01, 0x02, 0x03, 0x04}, }.Write(bconn.Writer) require.NoError(t, err) } + + // client -> server (RTCP) + if ca.proto == "udp" { + // skip firewall opening + buf := make([]byte, 2048) + _, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + + buf = make([]byte, 2048) + n, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, buf[:n]) + } else { + var f base.InterleavedFrame + f.Payload = make([]byte, 2048) + err := f.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, 0, f.TrackID) + require.Equal(t, StreamTypeRTCP, f.StreamType) + require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) + } + + close(frameRecv) }() conf := ClientConf{ @@ -189,9 +210,13 @@ func TestClientRead(t *testing.T) { conn, err := conf.DialRead(scheme + "://localhost:8554/teststream") require.NoError(t, err) - frameRecv := make(chan struct{}) - done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) { - close(frameRecv) + done := conn.ReadFrames(func(id int, streamType StreamType, payload []byte) { + require.Equal(t, 0, id) + require.Equal(t, StreamTypeRTP, streamType) + require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) + + err = conn.WriteFrame(0, StreamTypeRTCP, []byte{0x05, 0x06, 0x07, 0x08}) + require.NoError(t, err) }) <-frameRecv @@ -813,6 +838,7 @@ func TestClientReadPause(t *testing.T) { conn, err := l.Accept() require.NoError(t, err) + defer conn.Close() bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) var req base.Request @@ -926,8 +952,6 @@ func TestClientReadPause(t *testing.T) { StatusCode: base.StatusOK, }.Write(bconn.Writer) require.NoError(t, err) - - conn.Close() }() conf := ClientConf{ @@ -975,7 +999,7 @@ func TestClientReadPause(t *testing.T) { } } -func TestClientReadRTCP(t *testing.T) { +func TestClientReadRTCPReport(t *testing.T) { l, err := net.Listen("tcp", "localhost:8554") require.NoError(t, err) defer l.Close() @@ -1140,152 +1164,3 @@ func TestClientReadRTCP(t *testing.T) { conn.Close() <-done } - -func TestClientReadWriteManualRTCP(t *testing.T) { - for _, proto := range []string{ - "udp", - "tcp", - } { - t.Run(proto, func(t *testing.T) { - l, err := net.Listen("tcp", "localhost:8554") - require.NoError(t, err) - defer l.Close() - - serverDone := make(chan struct{}) - defer func() { <-serverDone }() - go func() { - defer close(serverDone) - - conn, err := l.Accept() - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - var req base.Request - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Options, req.Method) - - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.Describe), - string(base.Setup), - string(base.Play), - }, ", ")}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Describe, req.Method) - - track, err := NewTrackH264(96, []byte("123456"), []byte("123456")) - require.NoError(t, err) - - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Body: Tracks{track}.Write(), - }.Write(bconn.Writer) - require.NoError(t, err) - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Setup, req.Method) - - var inTH headers.Transport - err = inTH.Read(req.Header["Transport"]) - require.NoError(t, err) - - th := headers.Transport{ - Delivery: func() *base.StreamDelivery { - v := base.StreamDeliveryUnicast - return &v - }(), - } - - var l1 net.PacketConn - if proto == "udp" { - var err error - l1, err = net.ListenPacket("udp", "localhost:34557") - require.NoError(t, err) - defer l1.Close() - - th.Protocol = StreamProtocolUDP - th.ServerPorts = &[2]int{34556, 34557} - th.ClientPorts = inTH.ClientPorts - - } else { - th.Protocol = StreamProtocolTCP - th.InterleavedIDs = inTH.InterleavedIDs - } - - err = base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": th.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - err = req.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.Play, req.Method) - - err = base.Response{ - StatusCode: base.StatusOK, - }.Write(bconn.Writer) - require.NoError(t, err) - - if proto == "udp" { - buf := make([]byte, 2048) - - // skip firewall opening - _, _, err := l1.ReadFrom(buf) - require.NoError(t, err) - - n, _, err := l1.ReadFrom(buf) - require.NoError(t, err) - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, buf[:n]) - - } else { - var f base.InterleavedFrame - f.Payload = make([]byte, 2048) - err = f.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, 0, f.TrackID) - require.Equal(t, StreamTypeRTCP, f.StreamType) - require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) - } - }() - - conf := ClientConf{ - StreamProtocol: func() *StreamProtocol { - if proto == "udp" { - v := StreamProtocolUDP - return &v - } - v := StreamProtocolTCP - return &v - }(), - } - - conn, err := conf.DialRead("rtsp://localhost:8554/teststream") - require.NoError(t, err) - defer conn.Close() - - conn.ReadFrames(func(trackID int, streamType StreamType, payload []byte) { - }) - - time.Sleep(500 * time.Millisecond) - - err = conn.WriteFrame(0, StreamTypeRTCP, []byte{0x01, 0x02, 0x03, 0x04}) - require.NoError(t, err) - }) - } -} diff --git a/serverconnread_test.go b/serverconnread_test.go index ade2d6d6..65eefdec 100644 --- a/serverconnread_test.go +++ b/serverconnread_test.go @@ -431,12 +431,14 @@ func TestServerRead(t *testing.T) { f.Payload = make([]byte, 2048) err := f.Read(bconn.Reader) require.NoError(t, err) + require.Equal(t, 0, f.TrackID) require.Equal(t, StreamTypeRTP, f.StreamType) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, f.Payload) f.Payload = make([]byte, 2048) err = f.Read(bconn.Reader) require.NoError(t, err) + require.Equal(t, 0, f.TrackID) require.Equal(t, StreamTypeRTCP, f.StreamType) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, f.Payload) }