From 58bc92133a51e862d5c87c62b1d6ec213d129587 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Wed, 2 Nov 2022 12:26:28 +0100 Subject: [PATCH] client: generate RTCP sender reports when publishing with TCP too --- README.md | 4 +- client.go | 28 ++--- client_publish_test.go | 276 ++++++++++++++++++++++------------------- server_read_test.go | 247 ++++++++++++++++++++---------------- 4 files changed, 300 insertions(+), 255 deletions(-) diff --git a/README.md b/README.md index 0d3b2586..fe94cdf9 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Features: * Publish TLS-encrypted streams (TCP only) * Switch transport protocol automatically * Pause without disconnecting from the server - * Generate RTCP sender reports (UDP only) + * Generate RTCP sender reports * Server * Handle requests from clients * Sessions and connections are independent @@ -43,7 +43,7 @@ Features: * Write streams to clients with the UDP, UDP-multicast or TCP transport protocol * Write TLS-encrypted streams * Compute and provide SSRC, RTP-Info to clients - * Generate RTCP sender reports (UDP only) + * Generate RTCP sender reports * Utilities * Parse RTSP elements: requests, responses, SDP * Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS diff --git a/client.go b/client.go index 2300be06..3f9443b6 100644 --- a/client.go +++ b/client.go @@ -97,7 +97,7 @@ type clientTrack struct { cleaner *rtpcleaner.Cleaner // record - udpRTCPSender *rtcpsender.RTCPSender + rtcpSender *rtcpsender.RTCPSender } func (s clientState) String() string { @@ -751,19 +751,20 @@ func (c *Client) playRecordStart() { v := time.Now().Unix() c.tcpLastFrameTime = &v } - } else if *c.effectiveTransport == TransportUDP { + } else { for trackID, ct := range c.tracks { ctrackID := trackID - - ct.udpRTCPSender = rtcpsender.New(c.udpSenderReportPeriod, + ct.rtcpSender = rtcpsender.New(c.udpSenderReportPeriod, ct.track.ClockRate(), func(pkt rtcp.Packet) { c.WritePacketRTCP(ctrackID, pkt) }) } - for _, ct := range c.tracks { - ct.udpRTPListener.start(false) - ct.udpRTCPListener.start(false) + if *c.effectiveTransport == TransportUDP { + for _, ct := range c.tracks { + ct.udpRTPListener.start(false) + ct.udpRTCPListener.start(false) + } } } @@ -920,15 +921,14 @@ func (c *Client) playRecordStop(isClosing bool) { ct.udpRTCPReceiver.Close() ct.udpRTCPReceiver = nil } - } else { - for _, ct := range c.tracks { - ct.udpRTCPSender.Close() - ct.udpRTCPSender = nil - } } } for _, ct := range c.tracks { + if ct.rtcpSender != nil { + ct.rtcpSender.Close() + ct.rtcpSender = nil + } ct.cleaner = nil ct.reorderer = nil } @@ -1891,9 +1891,7 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool) } byts = byts[:n] - if c.tracks[trackID].udpRTCPSender != nil { - c.tracks[trackID].udpRTCPSender.ProcessPacketRTP(time.Now(), pkt, ptsEqualsDTS) - } + c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt, ptsEqualsDTS) c.writeBuffer.Push(trackTypePayload{ trackID: trackID, diff --git a/client_publish_test.go b/client_publish_test.go index eb369960..32fea9f8 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -834,151 +834,169 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { } func TestClientPublishRTCPReport(t *testing.T) { - reportReceived := make(chan struct{}) + for _, ca := range []string{"udp", "tcp"} { + t.Run(ca, func(t *testing.T) { + reportReceived := make(chan struct{}) - l, err := net.Listen("tcp", "localhost:8554") - require.NoError(t, err) - defer l.Close() + l, err := net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() - serverDone := make(chan struct{}) - defer func() { <-serverDone }() - go func() { - defer close(serverDone) + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + go func() { + defer close(serverDone) - nconn, err := l.Accept() - require.NoError(t, err) - defer nconn.Close() - conn := conn.NewConn(nconn) + nconn, err := l.Accept() + require.NoError(t, err) + defer nconn.Close() + conn := conn.NewConn(nconn) - req, err := conn.ReadRequest() - require.NoError(t, err) - require.Equal(t, base.Options, req.Method) + req, err := conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Options, req.Method) - err = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.Announce), - string(base.Setup), - string(base.Record), - }, ", ")}, - }, - }) - require.NoError(t, err) + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Announce), + string(base.Setup), + string(base.Record), + }, ", ")}, + }, + }) + require.NoError(t, err) - req, err = conn.ReadRequest() - require.NoError(t, err) - require.Equal(t, base.Announce, req.Method) + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Announce, req.Method) - err = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - }) - require.NoError(t, err) + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err) - req, err = conn.ReadRequest() - require.NoError(t, err) - require.Equal(t, base.Setup, req.Method) + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Setup, req.Method) - var inTH headers.Transport - err = inTH.Unmarshal(req.Header["Transport"]) - require.NoError(t, err) + var inTH headers.Transport + err = inTH.Unmarshal(req.Header["Transport"]) + require.NoError(t, err) - l1, err := net.ListenPacket("udp", "localhost:34556") - require.NoError(t, err) - defer l1.Close() - - l2, err := net.ListenPacket("udp", "localhost:34557") - require.NoError(t, err) - defer l2.Close() - - err = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Transport": headers.Transport{ + th := headers.Transport{ Delivery: func() *headers.TransportDelivery { v := headers.TransportDeliveryUnicast return &v }(), - Protocol: headers.TransportProtocolUDP, - ClientPorts: inTH.ClientPorts, - ServerPorts: &[2]int{34556, 34557}, - }.Marshal(), - }, + } + + if ca == "udp" { + th.Protocol = headers.TransportProtocolUDP + th.ClientPorts = inTH.ClientPorts + th.ServerPorts = &[2]int{34556, 34557} + } else { + th.Protocol = headers.TransportProtocolTCP + th.InterleavedIDs = inTH.InterleavedIDs + } + + l1, err := net.ListenPacket("udp", "localhost:34556") + require.NoError(t, err) + defer l1.Close() + + l2, err := net.ListenPacket("udp", "localhost:34557") + require.NoError(t, err) + defer l2.Close() + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": th.Marshal(), + }, + }) + require.NoError(t, err) + + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Record, req.Method) + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err) + + var buf []byte + + if ca == "udp" { + buf = make([]byte, 2048) + n, _, err := l2.ReadFrom(buf) + require.NoError(t, err) + buf = buf[:n] + } else { + for i := 0; i < 2; i++ { + _, err := conn.ReadInterleavedFrame() + require.NoError(t, err) + } + + f, err := conn.ReadInterleavedFrame() + require.NoError(t, err) + require.Equal(t, 1, f.Channel) + buf = f.Payload + } + + packets, err := rtcp.Unmarshal(buf) + require.NoError(t, err) + require.Equal(t, &rtcp.SenderReport{ + SSRC: 0x38F27A2F, + NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, + RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, + PacketCount: 2, + OctetCount: 8, + }, packets[0]) + + close(reportReceived) + + req, err = conn.ReadRequest() + require.NoError(t, err) + require.Equal(t, base.Teardown, req.Method) + + err = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + }) + require.NoError(t, err) + }() + + c := Client{ + Transport: func() *Transport { + if ca == "udp" { + v := TransportUDP + return &v + } + v := TransportTCP + return &v + }(), + udpSenderReportPeriod: 500 * time.Millisecond, + } + + err = c.StartPublishing("rtsp://localhost:8554/teststream", + Tracks{&TrackH264{ + PayloadType: 96, + SPS: []byte{0x01, 0x02, 0x03, 0x04}, + PPS: []byte{0x01, 0x02, 0x03, 0x04}, + }}) + require.NoError(t, err) + defer c.Close() + + err = c.WritePacketRTP(0, &testRTPPacket, true) + require.NoError(t, err) + + err = c.WritePacketRTP(0, &testRTPPacket, true) + require.NoError(t, err) + + <-reportReceived }) - require.NoError(t, err) - - req, err = conn.ReadRequest() - require.NoError(t, err) - require.Equal(t, base.Record, req.Method) - - err = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - }) - require.NoError(t, err) - - buf := make([]byte, 2048) - n, _, err := l1.ReadFrom(buf) - require.NoError(t, err) - var pkt rtp.Packet - err = pkt.Unmarshal(buf[:n]) - require.NoError(t, err) - - buf = make([]byte, 2048) - n, _, err = l2.ReadFrom(buf) - require.NoError(t, err) - packets, err := rtcp.Unmarshal(buf[:n]) - require.NoError(t, err) - sr, ok := packets[0].(*rtcp.SenderReport) - require.True(t, ok) - require.Equal(t, &rtcp.SenderReport{ - SSRC: 753621, - NTPTime: sr.NTPTime, - RTPTime: sr.RTPTime, - PacketCount: 1, - OctetCount: 4, - }, sr) - - close(reportReceived) - - req, err = conn.ReadRequest() - require.NoError(t, err) - require.Equal(t, base.Teardown, req.Method) - - err = conn.WriteResponse(&base.Response{ - StatusCode: base.StatusOK, - }) - require.NoError(t, err) - }() - - c := Client{ - udpSenderReportPeriod: 1 * time.Second, } - - track := &TrackH264{ - PayloadType: 96, - SPS: []byte{0x01, 0x02, 0x03, 0x04}, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, - } - - err = c.StartPublishing("rtsp://localhost:8554/teststream", - Tracks{track}) - require.NoError(t, err) - defer c.Close() - - err = c.WritePacketRTP(0, &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 946, - Timestamp: 54352, - SSRC: 753621, - }, - Payload: []byte{0x01, 0x02, 0x03, 0x04}, - }, true) - require.NoError(t, err) - - <-reportReceived } func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { diff --git a/server_read_test.go b/server_read_test.go index 19931baf..1bfe1b3e 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -708,116 +708,145 @@ func TestServerRead(t *testing.T) { } func TestServerReadRTCPReport(t *testing.T) { - track := &TrackH264{ - PayloadType: 96, - SPS: []byte{0x01, 0x02, 0x03, 0x04}, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, + for _, ca := range []string{"udp", "tcp"} { + t.Run(ca, func(t *testing.T) { + stream := NewServerStream(Tracks{&TrackH264{ + PayloadType: 96, + SPS: []byte{0x01, 0x02, 0x03, 0x04}, + PPS: []byte{0x01, 0x02, 0x03, 0x04}, + }}) + defer stream.Close() + + s := &Server{ + Handler: &testServerHandler{ + onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + udpSenderReportPeriod: 500 * time.Millisecond, + RTSPAddress: "localhost:8554", + UDPRTPAddress: "127.0.0.1:8000", + UDPRTCPAddress: "127.0.0.1:8001", + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + nconn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer nconn.Close() + conn := conn.NewConn(nconn) + + inTH := &headers.Transport{ + Mode: func() *headers.TransportMode { + v := headers.TransportModePlay + return &v + }(), + Delivery: func() *headers.TransportDelivery { + v := headers.TransportDeliveryUnicast + return &v + }(), + } + + if ca == "udp" { + inTH.Protocol = headers.TransportProtocolUDP + inTH.ClientPorts = &[2]int{35466, 35467} + } else { + inTH.Protocol = headers.TransportProtocolTCP + inTH.InterleavedIDs = &[2]int{0, 1} + } + + res, err := writeReqReadRes(conn, base.Request{ + Method: base.Setup, + URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + "Transport": inTH.Marshal(), + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var l1 net.PacketConn + var l2 net.PacketConn + if ca == "udp" { + l1, err = net.ListenPacket("udp", "localhost:35466") + require.NoError(t, err) + defer l1.Close() + + l2, err = net.ListenPacket("udp", "localhost:35467") + require.NoError(t, err) + defer l2.Close() + } + + var sx headers.Session + err = sx.Unmarshal(res.Header["Session"]) + require.NoError(t, err) + + res, err = writeReqReadRes(conn, base.Request{ + Method: base.Play, + URL: mustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"2"}, + "Session": base.HeaderValue{sx.Session}, + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket, true) + + var buf []byte + + if ca == "udp" { + buf = make([]byte, 2048) + var n int + n, _, err = l2.ReadFrom(buf) + require.NoError(t, err) + buf = buf[:n] + } else { + for i := 0; i < 2; i++ { + _, err := conn.ReadInterleavedFrame() + require.NoError(t, err) + } + + f, err := conn.ReadInterleavedFrame() + require.NoError(t, err) + require.Equal(t, 1, f.Channel) + buf = f.Payload + } + + packets, err := rtcp.Unmarshal(buf) + require.NoError(t, err) + require.Equal(t, &rtcp.SenderReport{ + SSRC: 0x38F27A2F, + NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, + RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, + PacketCount: 2, + OctetCount: 8, + }, packets[0]) + + res, err = writeReqReadRes(conn, base.Request{ + Method: base.Teardown, + URL: mustParseURL("rtsp://localhost:8554/teststream"), + Header: base.Header{ + "CSeq": base.HeaderValue{"3"}, + "Session": base.HeaderValue{sx.Session}, + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + }) } - - stream := NewServerStream(Tracks{track}) - defer stream.Close() - - s := &Server{ - Handler: &testServerHandler{ - onSetup: func(ctx *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { - return &base.Response{ - StatusCode: base.StatusOK, - }, stream, nil - }, - onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { - return &base.Response{ - StatusCode: base.StatusOK, - }, nil - }, - }, - udpSenderReportPeriod: 1 * time.Second, - RTSPAddress: "localhost:8554", - UDPRTPAddress: "127.0.0.1:8000", - UDPRTCPAddress: "127.0.0.1:8001", - } - - err := s.Start() - require.NoError(t, err) - defer s.Close() - - nconn, err := net.Dial("tcp", "localhost:8554") - require.NoError(t, err) - defer nconn.Close() - conn := conn.NewConn(nconn) - - inTH := &headers.Transport{ - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - Delivery: func() *headers.TransportDelivery { - v := headers.TransportDeliveryUnicast - return &v - }(), - Protocol: headers.TransportProtocolUDP, - ClientPorts: &[2]int{35466, 35467}, - } - - res, err := writeReqReadRes(conn, base.Request{ - Method: base.Setup, - URL: mustParseURL("rtsp://localhost:8554/teststream/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - "Transport": inTH.Marshal(), - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - l1, err := net.ListenPacket("udp", "localhost:35466") - require.NoError(t, err) - defer l1.Close() - - l2, err := net.ListenPacket("udp", "localhost:35467") - require.NoError(t, err) - defer l2.Close() - - var sx headers.Session - err = sx.Unmarshal(res.Header["Session"]) - require.NoError(t, err) - - res, err = writeReqReadRes(conn, base.Request{ - Method: base.Play, - URL: mustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - "Session": base.HeaderValue{sx.Session}, - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - stream.WritePacketRTP(0, &testRTPPacket, true) - stream.WritePacketRTP(0, &testRTPPacket, true) - - buf := make([]byte, 2048) - n, _, err := l2.ReadFrom(buf) - require.NoError(t, err) - packets, err := rtcp.Unmarshal(buf[:n]) - require.NoError(t, err) - require.Equal(t, &rtcp.SenderReport{ - SSRC: 0x38F27A2F, - NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, - RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, - PacketCount: 2, - OctetCount: 8, - }, packets[0]) - - res, err = writeReqReadRes(conn, base.Request{ - Method: base.Teardown, - URL: mustParseURL("rtsp://localhost:8554/teststream"), - Header: base.Header{ - "CSeq": base.HeaderValue{"3"}, - "Session": base.HeaderValue{sx.Session}, - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) } func TestServerReadVLCMulticast(t *testing.T) {