client: generate RTCP sender reports when publishing with TCP too

This commit is contained in:
aler9
2022-11-02 12:26:28 +01:00
parent 89aa62d01b
commit 58bc92133a
4 changed files with 300 additions and 255 deletions

View File

@@ -29,7 +29,7 @@ Features:
* Publish TLS-encrypted streams (TCP only) * Publish TLS-encrypted streams (TCP only)
* Switch transport protocol automatically * Switch transport protocol automatically
* Pause without disconnecting from the server * Pause without disconnecting from the server
* Generate RTCP sender reports (UDP only) * Generate RTCP sender reports
* Server * Server
* Handle requests from clients * Handle requests from clients
* Sessions and connections are independent * Sessions and connections are independent
@@ -43,7 +43,7 @@ Features:
* Write streams to clients with the UDP, UDP-multicast or TCP transport protocol * Write streams to clients with the UDP, UDP-multicast or TCP transport protocol
* Write TLS-encrypted streams * Write TLS-encrypted streams
* Compute and provide SSRC, RTP-Info to clients * Compute and provide SSRC, RTP-Info to clients
* Generate RTCP sender reports (UDP only) * Generate RTCP sender reports
* Utilities * Utilities
* Parse RTSP elements: requests, responses, SDP * Parse RTSP elements: requests, responses, SDP
* Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS * Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS

View File

@@ -97,7 +97,7 @@ type clientTrack struct {
cleaner *rtpcleaner.Cleaner cleaner *rtpcleaner.Cleaner
// record // record
udpRTCPSender *rtcpsender.RTCPSender rtcpSender *rtcpsender.RTCPSender
} }
func (s clientState) String() string { func (s clientState) String() string {
@@ -751,21 +751,22 @@ func (c *Client) playRecordStart() {
v := time.Now().Unix() v := time.Now().Unix()
c.tcpLastFrameTime = &v c.tcpLastFrameTime = &v
} }
} else if *c.effectiveTransport == TransportUDP { } else {
for trackID, ct := range c.tracks { for trackID, ct := range c.tracks {
ctrackID := trackID ctrackID := trackID
ct.rtcpSender = rtcpsender.New(c.udpSenderReportPeriod,
ct.udpRTCPSender = rtcpsender.New(c.udpSenderReportPeriod,
ct.track.ClockRate(), func(pkt rtcp.Packet) { ct.track.ClockRate(), func(pkt rtcp.Packet) {
c.WritePacketRTCP(ctrackID, pkt) c.WritePacketRTCP(ctrackID, pkt)
}) })
} }
if *c.effectiveTransport == TransportUDP {
for _, ct := range c.tracks { for _, ct := range c.tracks {
ct.udpRTPListener.start(false) ct.udpRTPListener.start(false)
ct.udpRTCPListener.start(false) ct.udpRTCPListener.start(false)
} }
} }
}
// for some reason, SetReadDeadline() must always be called in the same // for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes. // goroutine, otherwise Read() freezes.
@@ -920,15 +921,14 @@ func (c *Client) playRecordStop(isClosing bool) {
ct.udpRTCPReceiver.Close() ct.udpRTCPReceiver.Close()
ct.udpRTCPReceiver = nil ct.udpRTCPReceiver = nil
} }
} else {
for _, ct := range c.tracks {
ct.udpRTCPSender.Close()
ct.udpRTCPSender = nil
}
} }
} }
for _, ct := range c.tracks { for _, ct := range c.tracks {
if ct.rtcpSender != nil {
ct.rtcpSender.Close()
ct.rtcpSender = nil
}
ct.cleaner = nil ct.cleaner = nil
ct.reorderer = nil ct.reorderer = nil
} }
@@ -1891,9 +1891,7 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool)
} }
byts = byts[:n] byts = byts[:n]
if c.tracks[trackID].udpRTCPSender != nil { c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt, ptsEqualsDTS)
c.tracks[trackID].udpRTCPSender.ProcessPacketRTP(time.Now(), pkt, ptsEqualsDTS)
}
c.writeBuffer.Push(trackTypePayload{ c.writeBuffer.Push(trackTypePayload{
trackID: trackID, trackID: trackID,

View File

@@ -834,6 +834,8 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
} }
func TestClientPublishRTCPReport(t *testing.T) { func TestClientPublishRTCPReport(t *testing.T) {
for _, ca := range []string{"udp", "tcp"} {
t.Run(ca, func(t *testing.T) {
reportReceived := make(chan struct{}) reportReceived := make(chan struct{})
l, err := net.Listen("tcp", "localhost:8554") l, err := net.Listen("tcp", "localhost:8554")
@@ -883,6 +885,22 @@ func TestClientPublishRTCPReport(t *testing.T) {
err = inTH.Unmarshal(req.Header["Transport"]) err = inTH.Unmarshal(req.Header["Transport"])
require.NoError(t, err) require.NoError(t, err)
th := headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
}
if ca == "udp" {
th.Protocol = headers.TransportProtocolUDP
th.ClientPorts = inTH.ClientPorts
th.ServerPorts = &[2]int{34556, 34557}
} else {
th.Protocol = headers.TransportProtocolTCP
th.InterleavedIDs = inTH.InterleavedIDs
}
l1, err := net.ListenPacket("udp", "localhost:34556") l1, err := net.ListenPacket("udp", "localhost:34556")
require.NoError(t, err) require.NoError(t, err)
defer l1.Close() defer l1.Close()
@@ -894,15 +912,7 @@ func TestClientPublishRTCPReport(t *testing.T) {
err = conn.WriteResponse(&base.Response{ err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK, StatusCode: base.StatusOK,
Header: base.Header{ Header: base.Header{
"Transport": headers.Transport{ "Transport": th.Marshal(),
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Protocol: headers.TransportProtocolUDP,
ClientPorts: inTH.ClientPorts,
ServerPorts: &[2]int{34556, 34557},
}.Marshal(),
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -916,27 +926,34 @@ func TestClientPublishRTCPReport(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
buf := make([]byte, 2048) var buf []byte
n, _, err := l1.ReadFrom(buf)
require.NoError(t, err)
var pkt rtp.Packet
err = pkt.Unmarshal(buf[:n])
require.NoError(t, err)
if ca == "udp" {
buf = make([]byte, 2048) buf = make([]byte, 2048)
n, _, err = l2.ReadFrom(buf) n, _, err := l2.ReadFrom(buf)
require.NoError(t, err) require.NoError(t, err)
packets, err := rtcp.Unmarshal(buf[:n]) buf = buf[:n]
} else {
for i := 0; i < 2; i++ {
_, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
}
f, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 1, f.Channel)
buf = f.Payload
}
packets, err := rtcp.Unmarshal(buf)
require.NoError(t, err) require.NoError(t, err)
sr, ok := packets[0].(*rtcp.SenderReport)
require.True(t, ok)
require.Equal(t, &rtcp.SenderReport{ require.Equal(t, &rtcp.SenderReport{
SSRC: 753621, SSRC: 0x38F27A2F,
NTPTime: sr.NTPTime, NTPTime: packets[0].(*rtcp.SenderReport).NTPTime,
RTPTime: sr.RTPTime, RTPTime: packets[0].(*rtcp.SenderReport).RTPTime,
PacketCount: 1, PacketCount: 2,
OctetCount: 4, OctetCount: 8,
}, sr) }, packets[0])
close(reportReceived) close(reportReceived)
@@ -951,34 +968,35 @@ func TestClientPublishRTCPReport(t *testing.T) {
}() }()
c := Client{ c := Client{
udpSenderReportPeriod: 1 * time.Second, Transport: func() *Transport {
if ca == "udp" {
v := TransportUDP
return &v
} }
v := TransportTCP
track := &TrackH264{ return &v
PayloadType: 96, }(),
SPS: []byte{0x01, 0x02, 0x03, 0x04}, udpSenderReportPeriod: 500 * time.Millisecond,
PPS: []byte{0x01, 0x02, 0x03, 0x04},
} }
err = c.StartPublishing("rtsp://localhost:8554/teststream", err = c.StartPublishing("rtsp://localhost:8554/teststream",
Tracks{track}) Tracks{&TrackH264{
PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
}})
require.NoError(t, err) require.NoError(t, err)
defer c.Close() defer c.Close()
err = c.WritePacketRTP(0, &rtp.Packet{ err = c.WritePacketRTP(0, &testRTPPacket, true)
Header: rtp.Header{ require.NoError(t, err)
Version: 2,
Marker: true, err = c.WritePacketRTP(0, &testRTPPacket, true)
PayloadType: 96,
SequenceNumber: 946,
Timestamp: 54352,
SSRC: 753621,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, true)
require.NoError(t, err) require.NoError(t, err)
<-reportReceived <-reportReceived
})
}
} }
func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) { func TestClientPublishIgnoreTCPRTPPackets(t *testing.T) {

View File

@@ -708,13 +708,13 @@ func TestServerRead(t *testing.T) {
} }
func TestServerReadRTCPReport(t *testing.T) { func TestServerReadRTCPReport(t *testing.T) {
track := &TrackH264{ for _, ca := range []string{"udp", "tcp"} {
t.Run(ca, func(t *testing.T) {
stream := NewServerStream(Tracks{&TrackH264{
PayloadType: 96, PayloadType: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04}, SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04}, PPS: []byte{0x01, 0x02, 0x03, 0x04},
} }})
stream := NewServerStream(Tracks{track})
defer stream.Close() defer stream.Close()
s := &Server{ s := &Server{
@@ -730,7 +730,7 @@ func TestServerReadRTCPReport(t *testing.T) {
}, nil }, nil
}, },
}, },
udpSenderReportPeriod: 1 * time.Second, udpSenderReportPeriod: 500 * time.Millisecond,
RTSPAddress: "localhost:8554", RTSPAddress: "localhost:8554",
UDPRTPAddress: "127.0.0.1:8000", UDPRTPAddress: "127.0.0.1:8000",
UDPRTCPAddress: "127.0.0.1:8001", UDPRTCPAddress: "127.0.0.1:8001",
@@ -754,8 +754,14 @@ func TestServerReadRTCPReport(t *testing.T) {
v := headers.TransportDeliveryUnicast v := headers.TransportDeliveryUnicast
return &v return &v
}(), }(),
Protocol: headers.TransportProtocolUDP, }
ClientPorts: &[2]int{35466, 35467},
if ca == "udp" {
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466, 35467}
} else {
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0, 1}
} }
res, err := writeReqReadRes(conn, base.Request{ res, err := writeReqReadRes(conn, base.Request{
@@ -769,13 +775,17 @@ func TestServerReadRTCPReport(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode) require.Equal(t, base.StatusOK, res.StatusCode)
l1, err := net.ListenPacket("udp", "localhost:35466") var l1 net.PacketConn
var l2 net.PacketConn
if ca == "udp" {
l1, err = net.ListenPacket("udp", "localhost:35466")
require.NoError(t, err) require.NoError(t, err)
defer l1.Close() defer l1.Close()
l2, err := net.ListenPacket("udp", "localhost:35467") l2, err = net.ListenPacket("udp", "localhost:35467")
require.NoError(t, err) require.NoError(t, err)
defer l2.Close() defer l2.Close()
}
var sx headers.Session var sx headers.Session
err = sx.Unmarshal(res.Header["Session"]) err = sx.Unmarshal(res.Header["Session"])
@@ -795,10 +805,27 @@ func TestServerReadRTCPReport(t *testing.T) {
stream.WritePacketRTP(0, &testRTPPacket, true) stream.WritePacketRTP(0, &testRTPPacket, true)
stream.WritePacketRTP(0, &testRTPPacket, true) stream.WritePacketRTP(0, &testRTPPacket, true)
buf := make([]byte, 2048) var buf []byte
n, _, err := l2.ReadFrom(buf)
if ca == "udp" {
buf = make([]byte, 2048)
var n int
n, _, err = l2.ReadFrom(buf)
require.NoError(t, err) require.NoError(t, err)
packets, err := rtcp.Unmarshal(buf[:n]) buf = buf[:n]
} else {
for i := 0; i < 2; i++ {
_, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
}
f, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 1, f.Channel)
buf = f.Payload
}
packets, err := rtcp.Unmarshal(buf)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &rtcp.SenderReport{ require.Equal(t, &rtcp.SenderReport{
SSRC: 0x38F27A2F, SSRC: 0x38F27A2F,
@@ -818,6 +845,8 @@ func TestServerReadRTCPReport(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode) require.Equal(t, base.StatusOK, res.StatusCode)
})
}
} }
func TestServerReadVLCMulticast(t *testing.T) { func TestServerReadVLCMulticast(t *testing.T) {