diff --git a/client.go b/client.go index 5ccf445b..32439ee8 100644 --- a/client.go +++ b/client.go @@ -1781,11 +1781,13 @@ func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) { // to all listeners, including us, messing up the stream. if *c.effectiveTransport == TransportUDP { for _, cm := range c.setuppedMedias { - byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal() - cm.udpRTPListener.write(byts) //nolint:errcheck + if !cm.media.IsBackChannel { + byts, _ := (&rtp.Packet{Header: rtp.Header{Version: 2}}).Marshal() + cm.udpRTPListener.write(byts) //nolint:errcheck - byts, _ = (&rtcp.ReceiverReport{}).Marshal() - cm.udpRTCPListener.write(byts) //nolint:errcheck + byts, _ = (&rtcp.ReceiverReport{}).Marshal() + cm.udpRTCPListener.write(byts) //nolint:errcheck + } } } diff --git a/client_play_test.go b/client_play_test.go index 97a1ff41..1cd4ee86 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -3360,214 +3360,306 @@ func TestClientPlayPacketNTP(t *testing.T) { } func TestClientPlayBackChannel(t *testing.T) { - l, err := net.Listen("tcp", "localhost:8554") - require.NoError(t, err) - defer l.Close() + for _, transport := range []string{ + "udp", + "tcp", + } { + t.Run(transport, func(t *testing.T) { + serverOk := make(chan struct{}) - serverDone := make(chan struct{}) - defer func() { <-serverDone }() + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() - go func() { - defer close(serverDone) + serverDone := make(chan struct{}) + defer func() { <-serverDone }() - nconn, err2 := l.Accept() - require.NoError(t, err2) - defer nconn.Close() - conn := conn.NewConn(nconn) + go func() { + defer close(serverDone) - req, err2 := conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Options, req.Method) + nconn, err2 := l.Accept() + require.NoError(t, err2) + defer nconn.Close() + conn := conn.NewConn(nconn) - err2 = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.Describe), - string(base.Setup), - string(base.Play), - }, ", ")}, - }, - }) - require.NoError(t, err2) + req, err2 := conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Options, req.Method) - req, err2 = conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Describe, req.Method) - require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + string(base.Setup), + string(base.Play), + }, ", ")}, + }, + }) + require.NoError(t, err2) - err2 = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, - }, - Body: mediasToSDP([]*description.Media{ - testH264Media, - { - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.G711{ - PayloadTyp: 8, - MULaw: false, - SampleRate: 8000, - ChannelCount: 1, - }}, - IsBackChannel: true, + req, err2 = conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Describe, req.Method) + require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) + + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + "Content-Base": base.HeaderValue{"rtsp://localhost:8554/teststream/"}, + }, + Body: mediasToSDP([]*description.Media{ + testH264Media, + { + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.G711{ + PayloadTyp: 8, + MULaw: false, + SampleRate: 8000, + ChannelCount: 1, + }}, + IsBackChannel: true, + }, + }), + }) + require.NoError(t, err2) + + var l1s [2]net.PacketConn + var l2s [2]net.PacketConn + var clientPorts [2]*[2]int + + for i := 0; i < 2; i++ { + req, err2 = conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Setup, req.Method) + + if i == 0 { + require.Equal(t, base.HeaderValue(nil), req.Header["Require"]) + } else { + require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) + } + + var inTH headers.Transport + err2 = inTH.Unmarshal(req.Header["Transport"]) + require.NoError(t, err2) + + var th headers.Transport + v := headers.TransportDeliveryUnicast + th.Delivery = &v + + switch transport { + case "udp": + th.Protocol = headers.TransportProtocolUDP + th.ClientPorts = inTH.ClientPorts + clientPorts[i] = inTH.ClientPorts + th.ServerPorts = &[2]int{34556 + i*2, 34557 + i*2} + + l1s[i], err2 = net.ListenPacket( + "udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(th.ServerPorts[0]), 10))) + require.NoError(t, err2) + defer l1s[i].Close() + + l2s[i], err2 = net.ListenPacket( + "udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(th.ServerPorts[1]), 10))) + require.NoError(t, err2) + defer l2s[i].Close() + + case "tcp": + th.Protocol = headers.TransportProtocolTCP + th.InterleavedIDs = &[2]int{0 + i*2, 1 + i*2} + } + + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Marshal(), + "Session": headers.Session{ + Session: "ABCDE", + Timeout: uintPtr(1), + }.Marshal(), + }, + }) + require.NoError(t, err2) + } + + req, err2 = conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Play, req.Method) + require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) + + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err2) + + var pl []byte + + // client -> server RTP packet + + if transport == "udp" { + buf := make([]byte, 2048) + var n int + n, _, err2 = l1s[1].ReadFrom(buf) + require.NoError(t, err2) + pl = buf[:n] + } else { + var f *base.InterleavedFrame + f, err2 = conn.ReadInterleavedFrame() + require.NoError(t, err2) + require.Equal(t, 2, f.Channel) + pl = f.Payload + } + + var pkt rtp.Packet + err2 = pkt.Unmarshal(pl) + require.NoError(t, err2) + require.Equal(t, rtp.Packet{ + Header: rtp.Header{ + Version: 2, + CSRC: []uint32{}, + PayloadType: 8, + SSRC: pkt.SSRC, + }, + Payload: []byte{1, 2, 3, 4}, + }, pkt) + + // server -> client RTP packet + + if transport == "udp" { + _, err = l1s[0].WriteTo(testRTPPacketMarshaled, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: clientPorts[0][0], + }) + require.NoError(t, err2) + } else { + err2 = conn.WriteInterleavedFrame(&base.InterleavedFrame{ + Channel: 0, + Payload: testRTPPacketMarshaled, + }, make([]byte, 1024)) + require.NoError(t, err2) + } + + // client -> server RCTP sender report + + if transport == "udp" { + buf := make([]byte, 2048) + var n int + n, _, err2 = l2s[1].ReadFrom(buf) + require.NoError(t, err2) + pl = buf[:n] + } else { + var f *base.InterleavedFrame + f, err2 = conn.ReadInterleavedFrame() + require.NoError(t, err2) + require.Equal(t, 3, f.Channel) + pl = f.Payload + } + + packets, err2 := rtcp.Unmarshal(pl) + require.NoError(t, err2) + + require.Equal(t, []rtcp.Packet{&rtcp.SenderReport{ + SSRC: packets[0].(*rtcp.SenderReport).SSRC, + NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, + RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, + PacketCount: 1, + OctetCount: 4, + }}, packets) + + // client -> server RTCP receiver report (UDP only) + + if transport == "udp" { + buf := make([]byte, 2048) + var n int + n, _, err2 = l2s[0].ReadFrom(buf) + require.NoError(t, err2) + pl = buf[:n] + + packets, err2 = rtcp.Unmarshal(pl) + require.NoError(t, err2) + + require.Equal(t, []rtcp.Packet{&rtcp.ReceiverReport{ + ProfileExtensions: []uint8{}, + }}, packets) + } + + // client -> server Keepalive + + recv := make(chan struct{}) + go func() { + defer close(recv) + req, err2 = conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Options, req.Method) + }() + + select { + case <-recv: + case <-time.After(2 * time.Second): + t.Errorf("should not happen") + } + + close(serverOk) + + req, err2 = conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Teardown, req.Method) + require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) + + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err2) + }() + + c := Client{ + RequestBackChannels: true, + Transport: func() *Transport { + if transport == "tcp" { + return transportPtr(TransportTCP) + } + return transportPtr(TransportUDP) + }(), + senderReportPeriod: 500 * time.Millisecond, + receiverReportPeriod: 750 * time.Millisecond, + } + + u, err := base.ParseURL("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + err = c.Start(u.Scheme, u.Host) + require.NoError(t, err) + defer c.Close() + + sd, _, err := c.Describe(u) + require.NoError(t, err) + + err = c.SetupAll(sd.BaseURL, sd.Medias) + require.NoError(t, err) + + recv := make(chan struct{}) + + c.OnPacketRTP(sd.Medias[0], sd.Medias[0].Formats[0], func(_ *rtp.Packet) { + close(recv) + }) + + _, err = c.Play(nil) + require.NoError(t, err) + + err = c.WritePacketRTP(sd.Medias[1], &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 8, + CSRC: []uint32{}, + SSRC: 0x38F27A2F, }, - }), + Payload: []byte{1, 2, 3, 4}, + }) + require.NoError(t, err) + + <-recv + <-serverOk }) - require.NoError(t, err2) - - req, err2 = conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Setup, req.Method) - require.Equal(t, base.HeaderValue(nil), req.Header["Require"]) - - var inTH headers.Transport - err2 = inTH.Unmarshal(req.Header["Transport"]) - require.NoError(t, err2) - - th := headers.Transport{ - Delivery: deliveryPtr(headers.TransportDeliveryUnicast), - Protocol: headers.TransportProtocolTCP, - InterleavedIDs: inTH.InterleavedIDs, - } - - err2 = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": th.Marshal(), - "Session": headers.Session{ - Session: "ABCDE", - Timeout: uintPtr(1), - }.Marshal(), - }, - }) - require.NoError(t, err2) - - req, err2 = conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Setup, req.Method) - require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) - - err2 = inTH.Unmarshal(req.Header["Transport"]) - require.NoError(t, err2) - - th = headers.Transport{ - Delivery: deliveryPtr(headers.TransportDeliveryUnicast), - Protocol: headers.TransportProtocolTCP, - InterleavedIDs: inTH.InterleavedIDs, - } - - err2 = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": th.Marshal(), - "Session": headers.Session{ - Session: "ABCDE", - Timeout: uintPtr(1), - }.Marshal(), - }, - }) - require.NoError(t, err2) - - req, err2 = conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Play, req.Method) - require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) - - err2 = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - }) - require.NoError(t, err2) - - f, err2 := conn.ReadInterleavedFrame() - require.NoError(t, err2) - require.Equal(t, 2, f.Channel) - - f, err2 = conn.ReadInterleavedFrame() - require.NoError(t, err2) - require.Equal(t, 3, f.Channel) - - packets, err2 := rtcp.Unmarshal(f.Payload) - require.NoError(t, err2) - - sr, ok := packets[0].(*rtcp.SenderReport) - require.Equal(t, true, ok) - require.Equal(t, uint32(0x38F27A2F), sr.SSRC) - require.Equal(t, uint32(1), sr.PacketCount) - require.Equal(t, uint32(4), sr.OctetCount) - - recv := make(chan struct{}) - go func() { - defer close(recv) - req, err2 = conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Options, req.Method) - }() - - select { - case <-recv: - case <-time.After(2 * time.Second): - t.Errorf("should not happen") - } - - err2 = conn.WriteInterleavedFrame(&base.InterleavedFrame{ - Channel: 0, - Payload: testRTPPacketMarshaled, - }, make([]byte, 1024)) - require.NoError(t, err2) - - req, err2 = conn.ReadRequest() - require.NoError(t, err2) - require.Equal(t, base.Teardown, req.Method) - require.Equal(t, base.HeaderValue{"www.onvif.org/ver20/backchannel"}, req.Header["Require"]) - - err2 = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - }) - require.NoError(t, err2) - }() - - c := Client{ - RequestBackChannels: true, - Transport: transportPtr(TransportTCP), - senderReportPeriod: 500 * time.Millisecond, - receiverReportPeriod: 750 * time.Millisecond, } - - u, err := base.ParseURL("rtsp://localhost:8554/teststream") - require.NoError(t, err) - - err = c.Start(u.Scheme, u.Host) - require.NoError(t, err) - defer c.Close() - - sd, _, err := c.Describe(u) - require.NoError(t, err) - - err = c.SetupAll(sd.BaseURL, sd.Medias) - require.NoError(t, err) - - recv := make(chan struct{}) - - c.OnPacketRTP(sd.Medias[0], sd.Medias[0].Formats[0], func(_ *rtp.Packet) { - close(recv) - }) - - _, err = c.Play(nil) - require.NoError(t, err) - - err = c.WritePacketRTP(sd.Medias[1], &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - PayloadType: 8, - CSRC: []uint32{}, - SSRC: 0x38F27A2F, - }, - Payload: []byte{1, 2, 3, 4}, - }) - require.NoError(t, err) - - <-recv }