diff --git a/client.go b/client.go index 23534503..7e797db6 100644 --- a/client.go +++ b/client.go @@ -1423,7 +1423,10 @@ func (c *Client) doSetup( c: c, media: medi, } - cm.initialize() + err = cm.initialize() + if err != nil { + return nil, err + } if c.effectiveTransport == nil { if c.connURL.Scheme == "rtsps" { // always use TCP if encrypted @@ -1981,13 +1984,6 @@ func (c *Client) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error // WritePacketRTPWithNTP writes a RTP packet to the server. // ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports. func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error { - byts := make([]byte, c.MaxPacketSize) - n, err := pkt.MarshalTo(byts) - if err != nil { - return err - } - byts = byts[:n] - select { case <-c.done: return c.closeError @@ -2003,26 +1999,11 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, cm := c.setuppedMedias[medi] cf := cm.formats[pkt.PayloadType] - - cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) - - ok := c.writer.push(func() error { - return cf.writePacketRTPInQueue(byts) - }) - if !ok { - return liberrors.ErrClientWriteQueueFull{} - } - - return nil + return cf.writePacketRTP(pkt, ntp) } // WritePacketRTCP writes a RTCP packet to the server. func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error { - byts, err := pkt.Marshal() - if err != nil { - return err - } - select { case <-c.done: return c.closeError @@ -2037,15 +2018,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error } cm := c.setuppedMedias[medi] - - ok := c.writer.push(func() error { - return cm.writePacketRTCPInQueue(byts) - }) - if !ok { - return liberrors.ErrClientWriteQueueFull{} - } - - return nil + return cm.writePacketRTCP(pkt) } // PacketPTS returns the PTS of an incoming RTP packet. @@ -2208,18 +2181,10 @@ func (c *Client) Stats() *ClientStats { RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), - LocalSSRC: func() uint32 { - if fo.rtcpReceiver != nil { - return *fo.rtcpReceiver.LocalSSRC - } - if sentStats != nil { - return sentStats.LocalSSRC - } - return 0 - }(), + LocalSSRC: fo.localSSRC, RemoteSSRC: func() uint32 { - if recvStats != nil { - return recvStats.RemoteSSRC + if v, ok := fo.remoteSSRC(); ok { + return v } return 0 }(), diff --git a/client_format.go b/client_format.go index 965d6dee..a6e41087 100644 --- a/client_format.go +++ b/client_format.go @@ -8,17 +8,43 @@ import ( "github.com/pion/rtp" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/liberrors" "github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver" "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" "github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector" "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) +func isClientLocalSSRCTaken(ssrc uint32, c *Client, exclude *clientFormat) bool { + for _, cm := range c.setuppedMedias { + for _, cf := range cm.formats { + if cf != exclude && cf.localSSRC == ssrc { + return true + } + } + } + return false +} + +func clientPickLocalSSRC(cf *clientFormat) (uint32, error) { + for { + ssrc, err := randUint32() + if err != nil { + return 0, err + } + + if ssrc != 0 && !isClientLocalSSRCTaken(ssrc, cf.cm.c, cf) { + return ssrc, nil + } + } +} + type clientFormat struct { cm *clientMedia format format.Format onPacketRTP OnPacketRTPFunc + localSSRC uint32 udpReorderer *rtpreorderer.Reorderer // play tcpLossDetector *rtplossdetector.LossDetector // play rtcpReceiver *rtcpreceiver.RTCPReceiver // play @@ -29,10 +55,18 @@ type clientFormat struct { rtpPacketsLost *uint64 } -func (cf *clientFormat) initialize() { +func (cf *clientFormat) initialize() error { + var err error + cf.localSSRC, err = clientPickLocalSSRC(cf) + if err != nil { + return err + } + cf.rtpPacketsReceived = new(uint64) cf.rtpPacketsSent = new(uint64) cf.rtpPacketsLost = new(uint64) + + return nil } func (cf *clientFormat) start() { @@ -64,6 +98,7 @@ func (cf *clientFormat) start() { cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ ClockRate: cf.format.ClockRate(), + LocalSSRC: &cf.localSSRC, Period: cf.cm.c.receiverReportPeriod, TimeNow: cf.cm.c.timeNow, WritePacketRTCP: func(pkt rtcp.Packet) { @@ -90,6 +125,16 @@ func (cf *clientFormat) stop() { } } +func (cf *clientFormat) remoteSSRC() (uint32, bool) { + if cf.rtcpReceiver != nil { + stats := cf.rtcpReceiver.Stats() + if stats != nil { + return stats.RemoteSSRC, true + } + } + return 0, false +} + func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) { packets, lost := cf.udpReorderer.Process(pkt) if lost != 0 { @@ -133,6 +178,28 @@ func (cf *clientFormat) handlePacketsLost(lost uint64) { cf.cm.c.OnPacketsLost(lost) } +func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error { + pkt.SSRC = cf.localSSRC + + byts := make([]byte, cf.cm.c.MaxPacketSize) + n, err := pkt.MarshalTo(byts) + if err != nil { + return err + } + byts = byts[:n] + + cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt)) + + ok := cf.cm.c.writer.push(func() error { + return cf.writePacketRTPInQueue(byts) + }) + if !ok { + return liberrors.ErrClientWriteQueueFull{} + } + + return nil +} + func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error { err := cf.cm.udpRTPListener.write(payload) if err != nil { diff --git a/client_media.go b/client_media.go index 8615d957..ead508d7 100644 --- a/client_media.go +++ b/client_media.go @@ -30,7 +30,7 @@ type clientMedia struct { rtcpPacketsInError *uint64 } -func (cm *clientMedia) initialize() { +func (cm *clientMedia) initialize() error { cm.onPacketRTCP = func(rtcp.Packet) {} cm.bytesReceived = new(uint64) cm.bytesSent = new(uint64) @@ -47,9 +47,15 @@ func (cm *clientMedia) initialize() { format: forma, onPacketRTP: func(*rtp.Packet) {}, } - f.initialize() + err := f.initialize() + if err != nil { + return err + } + cm.formats[forma.PayloadType()] = f } + + return nil } func (cm *clientMedia) close() { @@ -146,41 +152,15 @@ func (cm *clientMedia) stop() { } } -func (cm *clientMedia) findFormatBySSRC(ssrc uint32) *clientFormat { - for _, format := range cm.formats { - stats := format.rtcpReceiver.Stats() - if stats != nil && stats.RemoteSSRC == ssrc { - return format +func (cm *clientMedia) findFormatByRemoteSSRC(ssrc uint32) *clientFormat { + for _, cf := range cm.formats { + if v, ok := cf.remoteSSRC(); ok && v == ssrc { + return cf } } return nil } -func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error { - err := cm.udpRTCPListener.write(payload) - if err != nil { - return err - } - - atomic.AddUint64(cm.bytesSent, uint64(len(payload))) - atomic.AddUint64(cm.rtcpPacketsSent, 1) - return nil -} - -func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error { - cm.c.tcpFrame.Channel = cm.tcpChannel + 1 - cm.c.tcpFrame.Payload = payload - cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout)) - err := cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) - if err != nil { - return err - } - - atomic.AddUint64(cm.bytesSent, uint64(len(payload))) - atomic.AddUint64(cm.rtcpPacketsSent, 1) - return nil -} - func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool { atomic.AddUint64(cm.bytesReceived, uint64(len(payload))) @@ -226,7 +206,7 @@ func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := cm.findFormatBySSRC(sr.SSRC) + format := cm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } @@ -311,7 +291,7 @@ func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := cm.findFormatBySSRC(sr.SSRC) + format := cm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } @@ -359,3 +339,44 @@ func (cm *clientMedia) onPacketRTCPDecodeError(err error) { atomic.AddUint64(cm.rtcpPacketsInError, 1) cm.c.OnDecodeError(err) } + +func (cm *clientMedia) writePacketRTCP(pkt rtcp.Packet) error { + byts, err := pkt.Marshal() + if err != nil { + return err + } + + ok := cm.c.writer.push(func() error { + return cm.writePacketRTCPInQueue(byts) + }) + if !ok { + return liberrors.ErrClientWriteQueueFull{} + } + + return nil +} + +func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error { + err := cm.udpRTCPListener.write(payload) + if err != nil { + return err + } + + atomic.AddUint64(cm.bytesSent, uint64(len(payload))) + atomic.AddUint64(cm.rtcpPacketsSent, 1) + return nil +} + +func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error { + cm.c.tcpFrame.Channel = cm.tcpChannel + 1 + cm.c.tcpFrame.Payload = payload + cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout)) + err := cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer) + if err != nil { + return err + } + + atomic.AddUint64(cm.bytesSent, uint64(len(payload))) + atomic.AddUint64(cm.rtcpPacketsSent, 1) + return nil +} diff --git a/client_play_test.go b/client_play_test.go index e9100123..b9668fdb 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -558,6 +558,48 @@ func TestClientPlay(t *testing.T) { <-packetRecv s := c.Stats() + require.Equal(t, &ClientStats{ + Conn: StatsConn{ + BytesReceived: s.Conn.BytesReceived, + BytesSent: s.Conn.BytesSent, + }, + Session: StatsSession{ + BytesReceived: s.Session.BytesReceived, + BytesSent: s.Session.BytesSent, + RTPPacketsReceived: s.Session.RTPPacketsReceived, + RTCPPacketsReceived: s.Session.RTCPPacketsReceived, + RTCPPacketsSent: s.Session.RTCPPacketsSent, + Medias: map[*description.Media]StatsSessionMedia{ + sd.Medias[0]: { //nolint:dupl + BytesReceived: s.Session.Medias[sd.Medias[0]].BytesReceived, + BytesSent: s.Session.Medias[sd.Medias[0]].BytesSent, + RTCPPacketsReceived: s.Session.Medias[sd.Medias[0]].RTCPPacketsReceived, + RTCPPacketsSent: s.Session.Medias[sd.Medias[0]].RTCPPacketsSent, + Formats: map[format.Format]StatsSessionFormat{ + sd.Medias[0].Formats[0]: { + RTPPacketsReceived: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RTPPacketsReceived, + LocalSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].LocalSSRC, + RemoteSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RemoteSSRC, + }, + }, + }, + sd.Medias[1]: { //nolint:dupl + BytesReceived: s.Session.Medias[sd.Medias[1]].BytesReceived, + BytesSent: s.Session.Medias[sd.Medias[1]].BytesSent, + RTCPPacketsReceived: s.Session.Medias[sd.Medias[1]].RTCPPacketsReceived, + RTCPPacketsSent: s.Session.Medias[sd.Medias[1]].RTCPPacketsSent, + Formats: map[format.Format]StatsSessionFormat{ + sd.Medias[1].Formats[0]: { + RTPPacketsReceived: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RTPPacketsReceived, + LocalSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].LocalSSRC, + RemoteSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RemoteSSRC, + }, + }, + }, + }, + }, + }, s) + require.Greater(t, s.Session.BytesSent, uint64(19)) require.Less(t, s.Session.BytesSent, uint64(41)) require.Greater(t, s.Session.BytesReceived, uint64(31)) diff --git a/client_record_test.go b/client_record_test.go index be3476b0..5b0489ca 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -364,6 +364,38 @@ func TestClientRecord(t *testing.T) { <-recvDone s := c.Stats() + require.Equal(t, &ClientStats{ + Conn: StatsConn{ + BytesReceived: s.Conn.BytesReceived, + BytesSent: s.Conn.BytesSent, + }, + Session: StatsSession{ + BytesReceived: s.Session.BytesReceived, + BytesSent: s.Session.BytesSent, + RTPPacketsSent: s.Session.RTPPacketsSent, + RTPPacketsReceived: s.Session.RTPPacketsReceived, + RTCPPacketsReceived: s.Session.RTCPPacketsReceived, + RTCPPacketsSent: s.Session.RTCPPacketsSent, + Medias: map[*description.Media]StatsSessionMedia{ + medias[0]: { + BytesReceived: s.Session.Medias[medias[0]].BytesReceived, + BytesSent: s.Session.Medias[medias[0]].BytesSent, + RTCPPacketsReceived: s.Session.Medias[medias[0]].RTCPPacketsReceived, + RTCPPacketsSent: s.Session.Medias[medias[0]].RTCPPacketsSent, + Formats: map[format.Format]StatsSessionFormat{ + medias[0].Formats[0]: { + RTPPacketsSent: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsSent, + RTPPacketsReceived: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsReceived, + LocalSSRC: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].LocalSSRC, + RemoteSSRC: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RemoteSSRC, + RTPPacketsLastNTP: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsLastNTP, + }, + }, + }, + }, + }, + }, s) + require.Greater(t, s.Session.BytesSent, uint64(15)) require.Less(t, s.Session.BytesSent, uint64(17)) require.Greater(t, s.Session.BytesReceived, uint64(19)) @@ -1256,13 +1288,15 @@ func TestClientRecordRTCPReport(t *testing.T) { packets, err2 := rtcp.Unmarshal(buf) require.NoError(t, err2) - require.Equal(t, &rtcp.SenderReport{ - SSRC: 0x38F27A2F, - NTPTime: ntpTimeGoToRTCP(time.Date(1996, 2, 13, 14, 33, 5, 0, time.UTC)), - RTPTime: 1300000 + 60*90000, - PacketCount: 1, - OctetCount: 1, - }, packets[0]) + require.Equal(t, []rtcp.Packet{ + &rtcp.SenderReport{ + SSRC: packets[0].(*rtcp.SenderReport).SSRC, + NTPTime: ntpTimeGoToRTCP(time.Date(1996, 2, 13, 14, 33, 5, 0, time.UTC)), + RTPTime: 1300000 + 60*90000, + PacketCount: 1, + OctetCount: 1, + }, + }, packets) close(reportReceived) diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index bdeb60e5..15893975 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -87,6 +87,8 @@ func New( // Initialize initializes RTCPReceiver. func (rr *RTCPReceiver) Initialize() error { + // Deprecated: passing a nil LocalSSRC will be deprecated from next version. + // Please use a fixed LocalSSRC. if rr.LocalSSRC == nil { v, err := randUint32() if err != nil { diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 9cc64aee..14d7a020 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -163,7 +163,10 @@ func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) { // Stats are statistics. type Stats struct { - LocalSSRC uint32 + // Deprecated: this is not a statistics anymore but a fixed parameter. + // it will be removed in next version. + LocalSSRC uint32 + LastSequenceNumber uint16 LastRTP uint32 LastNTP time.Time diff --git a/server_play_test.go b/server_play_test.go index 0b5e5de9..3bc77a89 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -32,10 +32,6 @@ func uint16Ptr(v uint16) *uint16 { return &v } -func uint32Ptr(v uint32) *uint32 { - return &v -} - func multicastCapableIP(t *testing.T) string { intfs, err := net.Interfaces() require.NoError(t, err) @@ -848,7 +844,12 @@ func TestServerPlay(t *testing.T) { var n int n, _, err = l1.ReadFrom(buf) require.NoError(t, err) - require.Equal(t, testRTPPacketMarshaled, buf[:n]) + + var pkt rtp.Packet + err = pkt.Unmarshal(buf[:n]) + require.NoError(t, err) + pkt.SSRC = testRTPPacket.SSRC + require.Equal(t, testRTPPacket, pkt) buf = make([]byte, 2048) n, _, err = l2.ReadFrom(buf) @@ -864,7 +865,11 @@ func TestServerPlay(t *testing.T) { f, err = conn.ReadInterleavedFrame() require.NoError(t, err) require.Equal(t, 5, f.Channel) - require.Equal(t, testRTPPacketMarshaled, f.Payload) + var pkt rtp.Packet + err = pkt.Unmarshal(f.Payload) + require.NoError(t, err) + pkt.SSRC = testRTPPacket.SSRC + require.Equal(t, testRTPPacket, pkt) } // client -> server @@ -1385,13 +1390,15 @@ func TestServerPlayRTCPReport(t *testing.T) { packets, err := rtcp.Unmarshal(buf) require.NoError(t, err) - require.Equal(t, &rtcp.SenderReport{ - SSRC: 0x38F27A2F, - NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)), - RTPTime: 240000 + 90000*30, - PacketCount: 1, - OctetCount: 1, - }, packets[0]) + require.Equal(t, []rtcp.Packet{ + &rtcp.SenderReport{ + SSRC: packets[0].(*rtcp.SenderReport).SSRC, + NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)), + RTPTime: 240000 + 90000*30, + PacketCount: 1, + OctetCount: 1, + }, + }, packets) doTeardown(t, conn, "rtsp://localhost:8554/teststream", session) }) @@ -2072,7 +2079,12 @@ func TestServerPlayPartialMedias(t *testing.T) { f, err := conn.ReadInterleavedFrame() require.NoError(t, err) require.Equal(t, 4, f.Channel) - require.Equal(t, testRTPPacketMarshaled, f.Payload) + + var pkt rtp.Packet + err = pkt.Unmarshal(f.Payload) + require.NoError(t, err) + pkt.SSRC = testRTPPacket.SSRC + require.Equal(t, testRTPPacket, pkt) } func TestServerPlayAdditionalInfos(t *testing.T) { @@ -2203,10 +2215,9 @@ func TestServerPlayAdditionalInfos(t *testing.T) { }).String(), }, }, rtpInfo) - require.Equal(t, []*uint32{ - uint32Ptr(96342362), - nil, - }, ssrcs) + require.Len(t, ssrcs, 2) + require.NotNil(t, ssrcs[0]) + require.NotNil(t, ssrcs[1]) err = stream.WritePacketRTP(stream.Description().Medias[1], &rtp.Packet{ Header: rtp.Header{ @@ -2242,10 +2253,9 @@ func TestServerPlayAdditionalInfos(t *testing.T) { Timestamp: (*rtpInfo)[1].Timestamp, }, }, rtpInfo) - require.Equal(t, []*uint32{ - uint32Ptr(96342362), - uint32Ptr(536474323), - }, ssrcs) + require.Len(t, ssrcs, 2) + require.NotNil(t, ssrcs[0]) + require.NotNil(t, ssrcs[1]) } func TestServerPlayNoInterleavedIDs(t *testing.T) { @@ -2327,14 +2337,19 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { doPlay(t, conn, "rtsp://localhost:8554/teststream", session) - for i := 0; i < 2; i++ { + for i := range 2 { err := stream.WritePacketRTP(stream.Description().Medias[i], &testRTPPacket) require.NoError(t, err) f, err := conn.ReadInterleavedFrame() require.NoError(t, err) require.Equal(t, i*2, f.Channel) - require.Equal(t, testRTPPacketMarshaled, f.Payload) + + var pkt rtp.Packet + err = pkt.Unmarshal(f.Payload) + require.NoError(t, err) + pkt.SSRC = testRTPPacket.SSRC + require.Equal(t, testRTPPacket, pkt) } } @@ -2422,7 +2437,8 @@ func TestServerPlayStreamStats(t *testing.T) { Formats: map[format.Format]ServerStreamStatsFormat{ stream.Description().Medias[0].Formats[0]: { RTPPacketsSent: 2, - LocalSSRC: 955415087, + LocalSSRC: st.Medias[stream.Description().Medias[0]]. + Formats[stream.Description().Medias[0].Formats[0]].LocalSSRC, }, }, }, diff --git a/server_record_test.go b/server_record_test.go index 14a1a2e0..c0b1a6be 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -606,6 +606,7 @@ func TestServerRecord(t *testing.T) { ctx.Session.AnnouncedDescription().Medias[i], ctx.Session.AnnouncedDescription().Medias[i].Formats[0], func(pkt *rtp.Packet) { + pkt.SSRC = testRTPPacket.SSRC require.Equal(t, &testRTPPacket, pkt) }) diff --git a/server_session.go b/server_session.go index 877664e1..c4d7a4dd 100644 --- a/server_session.go +++ b/server_session.go @@ -180,26 +180,65 @@ func findFirstSupportedTransportHeader(s *Server, tsh headers.Transports) *heade return nil } +func generateRTPInfoEntry(ssm *serverStreamMedia, now time.Time) *headers.RTPInfoEntry { + // do not generate a RTP-Info entry when + // there are multiple formats inside a single media stream, + // since RTP-Info does not support multiple sequence numbers / timestamps. + if len(ssm.media.Formats) > 1 { + return nil + } + + format := ssm.formats[ssm.media.Formats[0].PayloadType()] + + stats := format.rtcpSender.Stats() + if stats == nil { + return nil + } + + clockRate := format.format.ClockRate() + if clockRate == 0 { + return nil + } + + // sequence number of the first packet of the stream + seqNum := stats.LastSequenceNumber + 1 + + // RTP timestamp corresponding to the time value in + // the Range response header. + // remove a small quantity in order to avoid DTS > PTS + ts := uint32(uint64(stats.LastRTP) + + uint64(now.Sub(stats.LastNTP).Seconds()*float64(clockRate)) - + uint64(clockRate)/10) + + return &headers.RTPInfoEntry{ + SequenceNumber: &seqNum, + Timestamp: &ts, + } +} + func generateRTPInfo( now time.Time, - setuppedMediasOrdered []*serverSessionMedia, - setuppedStream *ServerStream, - setuppedPath string, + mediasOrdered []*serverSessionMedia, + stream *ServerStream, + path string, u *base.URL, ) (headers.RTPInfo, bool) { var ri headers.RTPInfo - for _, sm := range setuppedMediasOrdered { - entry := setuppedStream.rtpInfoEntry(sm.media, now) + for _, sm := range mediasOrdered { + ssm := stream.medias[sm.media] + entry := generateRTPInfoEntry(ssm, now) if entry == nil { entry = &headers.RTPInfoEntry{} } + entry.URL = (&base.URL{ Scheme: u.Scheme, Host: u.Host, - Path: setuppedPath + "/trackID=" + - strconv.FormatInt(int64(setuppedStream.medias[sm.media].trackID), 10), + Path: path + "/trackID=" + + strconv.FormatInt(int64(ssm.trackID), 10), }).String() + ri = append(ri, entry) } @@ -500,18 +539,10 @@ func (ss *ServerSession) Stats() *StatsSession { RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived), RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost), - LocalSSRC: func() uint32 { - if fo.rtcpReceiver != nil { - return *fo.rtcpReceiver.LocalSSRC - } - if sentStats != nil { - return sentStats.LocalSSRC - } - return 0 - }(), + LocalSSRC: fo.localSSRC, RemoteSSRC: func() uint32 { - if recvStats != nil { - return recvStats.RemoteSSRC + if v, ok := fo.remoteSSRC(); ok { + return v } return 0 }(), @@ -1089,10 +1120,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( // since the Transport header does not support multiple SSRCs. if len(stream.medias[medi].formats) == 1 { format := stream.medias[medi].formats[medi.Formats[0].PayloadType()] - ssrc, ok := format.localSSRC() - if ok { - th.SSRC = &ssrc - } + th.SSRC = &format.localSSRC } } @@ -1105,7 +1133,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( media: medi, onPacketRTCP: func(_ rtcp.Packet) {}, } - sm.initialize() + err = sm.initialize() + if err != nil { + return &base.Response{ + StatusCode: base.StatusInternalServerError, + }, err + } switch transport { case TransportUDP: @@ -1490,57 +1523,22 @@ func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFu sm.onPacketRTCP = cb } -func (ss *ServerSession) writePacketRTP(medi *description.Media, payloadType uint8, byts []byte) error { +func (ss *ServerSession) writePacketRTPEncoded(medi *description.Media, payloadType uint8, byts []byte) error { sm := ss.setuppedMedias[medi] sf := sm.formats[payloadType] - - ss.writerMutex.RLock() - defer ss.writerMutex.RUnlock() - - if ss.writer == nil { - return nil - } - - ok := ss.writer.push(func() error { - return sf.writePacketRTPInQueue(byts) - }) - if !ok { - return liberrors.ErrServerWriteQueueFull{} - } - - return nil + return sf.writePacketRTPEncoded(byts) } // WritePacketRTP writes a RTP packet to the session. func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error { - byts := make([]byte, ss.s.MaxPacketSize) - n, err := pkt.MarshalTo(byts) - if err != nil { - return err - } - byts = byts[:n] - - return ss.writePacketRTP(medi, pkt.PayloadType, byts) + sm := ss.setuppedMedias[medi] + sf := sm.formats[pkt.PayloadType] + return sf.writePacketRTP(pkt) } -func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error { +func (ss *ServerSession) writePacketRTCPEncoded(medi *description.Media, byts []byte) error { sm := ss.setuppedMedias[medi] - - ss.writerMutex.RLock() - defer ss.writerMutex.RUnlock() - - if ss.writer == nil { - return nil - } - - ok := ss.writer.push(func() error { - return sm.writePacketRTCPInQueue(byts) - }) - if !ok { - return liberrors.ErrServerWriteQueueFull{} - } - - return nil + return sm.writePacketRTCPEncoded(byts) } // WritePacketRTCP writes a RTCP packet to the session. @@ -1550,7 +1548,7 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe return err } - return ss.writePacketRTCP(medi, byts) + return ss.writePacketRTCPEncoded(medi, byts) } // PacketPTS returns the PTS of an incoming RTP packet. diff --git a/server_session_format.go b/server_session_format.go index a686f84c..34046a10 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -15,11 +15,36 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer" ) +func isServerSessionLocalSSRCTaken(ssrc uint32, ss *ServerSession, exclude *serverSessionFormat) bool { + for _, sm := range ss.setuppedMedias { + for _, sf := range sm.formats { + if sf != exclude && sf.localSSRC == ssrc { + return true + } + } + } + return false +} + +func serverSessionPickLocalSSRC(sf *serverSessionFormat) (uint32, error) { + for { + ssrc, err := randUint32() + if err != nil { + return 0, err + } + + if ssrc != 0 && !isServerSessionLocalSSRCTaken(ssrc, sf.sm.ss, sf) { + return ssrc, nil + } + } +} + type serverSessionFormat struct { sm *serverSessionMedia format format.Format onPacketRTP OnPacketRTPFunc + localSSRC uint32 udpReorderer *rtpreorderer.Reorderer // publish or back channel tcpLossDetector *rtplossdetector.LossDetector rtcpReceiver *rtcpreceiver.RTCPReceiver @@ -29,10 +54,22 @@ type serverSessionFormat struct { rtpPacketsLost *uint64 } -func (sf *serverSessionFormat) initialize() { +func (sf *serverSessionFormat) initialize() error { + if sf.sm.ss.state == ServerSessionStatePreRecord || sf.sm.media.IsBackChannel { + var err error + sf.localSSRC, err = serverSessionPickLocalSSRC(sf) + if err != nil { + return err + } + } else { + sf.localSSRC = sf.sm.ss.setuppedStream.medias[sf.sm.media].formats[sf.format.PayloadType()].localSSRC + } + sf.rtpPacketsReceived = new(uint64) sf.rtpPacketsSent = new(uint64) sf.rtpPacketsLost = new(uint64) + + return nil } func (sf *serverSessionFormat) start() { @@ -54,6 +91,7 @@ func (sf *serverSessionFormat) start() { sf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{ ClockRate: sf.format.ClockRate(), + LocalSSRC: &sf.localSSRC, Period: sf.sm.ss.s.receiverReportPeriod, TimeNow: sf.sm.ss.s.timeNow, WritePacketRTCP: func(pkt rtcp.Packet) { @@ -76,6 +114,16 @@ func (sf *serverSessionFormat) stop() { } } +func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) { + if sf.rtcpReceiver != nil { + stats := sf.rtcpReceiver.Stats() + if stats != nil { + return stats.RemoteSSRC, true + } + } + return 0, false +} + func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) { packets, lost := sf.udpReorderer.Process(pkt) if lost != 0 { @@ -137,6 +185,37 @@ func (sf *serverSessionFormat) onPacketRTPLost(lost uint64) { } } +func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error { + pkt.SSRC = sf.localSSRC + + byts := make([]byte, sf.sm.ss.s.MaxPacketSize) + n, err := pkt.MarshalTo(byts) + if err != nil { + return err + } + byts = byts[:n] + + return sf.writePacketRTPEncoded(byts) +} + +func (sf *serverSessionFormat) writePacketRTPEncoded(payload []byte) error { + sf.sm.ss.writerMutex.RLock() + defer sf.sm.ss.writerMutex.RUnlock() + + if sf.sm.ss.writer == nil { + return nil + } + + ok := sf.sm.ss.writer.push(func() error { + return sf.writePacketRTPInQueue(payload) + }) + if !ok { + return liberrors.ErrServerWriteQueueFull{} + } + + return nil +} + func (sf *serverSessionFormat) writePacketRTPInQueueUDP(payload []byte) error { err := sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr) if err != nil { diff --git a/server_session_media.go b/server_session_media.go index 23155600..8a7c5e36 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -33,7 +33,7 @@ type serverSessionMedia struct { rtcpPacketsInError *uint64 } -func (sm *serverSessionMedia) initialize() { +func (sm *serverSessionMedia) initialize() error { sm.bytesReceived = new(uint64) sm.bytesSent = new(uint64) sm.rtpPacketsInError = new(uint64) @@ -49,9 +49,14 @@ func (sm *serverSessionMedia) initialize() { format: forma, onPacketRTP: func(*rtp.Packet) {}, } - f.initialize() + err := f.initialize() + if err != nil { + return err + } sm.formats[forma.PayloadType()] = f } + + return nil } func (sm *serverSessionMedia) start() { @@ -114,7 +119,7 @@ func (sm *serverSessionMedia) stop() { } } -func (sm *serverSessionMedia) findFormatBySSRC(ssrc uint32) *serverSessionFormat { +func (sm *serverSessionMedia) findFormatByRemoteSSRC(ssrc uint32) *serverSessionFormat { for _, format := range sm.formats { stats := format.rtcpReceiver.Stats() if stats != nil && stats.RemoteSSRC == ssrc { @@ -124,31 +129,6 @@ func (sm *serverSessionMedia) findFormatBySSRC(ssrc uint32) *serverSessionFormat return nil } -func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error { - err := sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr) - if err != nil { - return err - } - - atomic.AddUint64(sm.bytesSent, uint64(len(payload))) - atomic.AddUint64(sm.rtcpPacketsSent, 1) - return nil -} - -func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { - sm.ss.tcpFrame.Channel = sm.tcpChannel + 1 - sm.ss.tcpFrame.Payload = payload - sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) - err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) - if err != nil { - return err - } - - atomic.AddUint64(sm.bytesSent, uint64(len(payload))) - atomic.AddUint64(sm.rtcpPacketsSent, 1) - return nil -} - func (sm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool { atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) @@ -253,7 +233,7 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := sm.findFormatBySSRC(sr.SSRC) + format := sm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } @@ -354,7 +334,7 @@ func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool { for _, pkt := range packets { if sr, ok := pkt.(*rtcp.SenderReport); ok { - format := sm.findFormatBySSRC(sr.SSRC) + format := sm.findFormatByRemoteSSRC(sr.SSRC) if format != nil { format.rtcpReceiver.ProcessSenderReport(sr, now) } @@ -391,3 +371,46 @@ func (sm *serverSessionMedia) onPacketRTCPDecodeError(err error) { log.Println(err.Error()) } } + +func (sm *serverSessionMedia) writePacketRTCPEncoded(payload []byte) error { + sm.ss.writerMutex.RLock() + defer sm.ss.writerMutex.RUnlock() + + if sm.ss.writer == nil { + return nil + } + + ok := sm.ss.writer.push(func() error { + return sm.writePacketRTCPInQueue(payload) + }) + if !ok { + return liberrors.ErrServerWriteQueueFull{} + } + + return nil +} + +func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error { + err := sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr) + if err != nil { + return err + } + + atomic.AddUint64(sm.bytesSent, uint64(len(payload))) + atomic.AddUint64(sm.rtcpPacketsSent, 1) + return nil +} + +func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { + sm.ss.tcpFrame.Channel = sm.tcpChannel + 1 + sm.ss.tcpFrame.Payload = payload + sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) + err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) + if err != nil { + return err + } + + atomic.AddUint64(sm.bytesSent, uint64(len(payload))) + atomic.AddUint64(sm.rtcpPacketsSent, 1) + return nil +} diff --git a/server_stream.go b/server_stream.go index a5b58786..97cea81a 100644 --- a/server_stream.go +++ b/server_stream.go @@ -11,20 +11,9 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) -func firstFormat(formats map[uint8]*serverStreamFormat) *serverStreamFormat { - var firstKey uint8 - for key := range formats { - firstKey = key - break - } - - return formats[firstKey] -} - // NewServerStream allocates a ServerStream. // // Deprecated: replaced by ServerStream.Initialize(). @@ -42,9 +31,9 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream { // ServerStream represents a data stream. // This is in charge of +// - storing stream description and statistics // - distributing the stream to each reader // - allocating multicast listeners -// - gathering infos about the stream in order to generate SSRC and RTP-Info type ServerStream struct { Server *Server Desc *description.Session @@ -73,7 +62,14 @@ func (st *ServerStream) Initialize() error { media: medi, trackID: i, } - sm.initialize() + err := sm.initialize() + if err != nil { + for _, medi := range st.Desc.Medias[:i] { + st.medias[medi].close() + } + return err + } + st.medias[medi] = sm } @@ -152,12 +148,7 @@ func (st *ServerStream) Stats() *ServerStreamStats { for _, fo := range sm.formats { ret[fo.format] = ServerStreamStatsFormat{ RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent), - LocalSSRC: func() uint32 { - if v, ok := fo.localSSRC(); ok { - return v - } - return 0 - }(), + LocalSSRC: fo.localSSRC, } } @@ -171,47 +162,6 @@ func (st *ServerStream) Stats() *ServerStreamStats { } } -func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *headers.RTPInfoEntry { - st.mutex.Lock() - defer st.mutex.Unlock() - - sm := st.medias[medi] - - // if there are multiple formats inside a single media stream, - // do not generate a RTP-Info entry, since RTP-Info doesn't support - // multiple sequence numbers / timestamps. - if len(sm.formats) > 1 { - return nil - } - - format := firstFormat(sm.formats) - - stats := format.rtcpSender.Stats() - if stats == nil { - return nil - } - - clockRate := format.format.ClockRate() - if clockRate == 0 { - return nil - } - - // sequence number of the first packet of the stream - seqNum := stats.LastSequenceNumber + 1 - - // RTP timestamp corresponding to the time value in - // the Range response header. - // remove a small quantity in order to avoid DTS > PTS - ts := uint32(uint64(stats.LastRTP) + - uint64(now.Sub(stats.LastNTP).Seconds()*float64(clockRate)) - - uint64(clockRate)/10) - - return &headers.RTPInfoEntry{ - SequenceNumber: &seqNum, - Timestamp: &ts, - } -} - func (st *ServerStream) readerAdd( ss *ServerSession, clientPorts *[2]int, @@ -325,13 +275,6 @@ func (st *ServerStream) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) // WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. // ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports. func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error { - byts := make([]byte, st.Server.MaxPacketSize) - n, err := pkt.MarshalTo(byts) - if err != nil { - return err - } - byts = byts[:n] - st.mutex.RLock() defer st.mutex.RUnlock() @@ -341,16 +284,11 @@ func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp. sm := st.medias[medi] sf := sm.formats[pkt.PayloadType] - return sf.writePacketRTP(byts, pkt, ntp) + return sf.writePacketRTP(pkt, ntp) } // WritePacketRTCP writes a RTCP packet to all the readers of the stream. func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error { - byts, err := pkt.Marshal() - if err != nil { - return err - } - st.mutex.RLock() defer st.mutex.RUnlock() @@ -359,5 +297,5 @@ func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet } sm := st.medias[medi] - return sm.writePacketRTCP(byts) + return sm.writePacketRTCP(pkt) } diff --git a/server_stream_format.go b/server_stream_format.go index 8e7bb653..30d26048 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -1,6 +1,7 @@ package gortsplib import ( + "crypto/rand" "sync/atomic" "time" @@ -11,15 +12,55 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" ) +func randUint32() (uint32, error) { + var b [4]byte + _, err := rand.Read(b[:]) + if err != nil { + return 0, err + } + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil +} + +func isServerStreamLocalSSRCTaken(ssrc uint32, stream *ServerStream, exclude *serverStreamFormat) bool { + for _, sm := range stream.medias { + for _, sf := range sm.formats { + if sf != exclude && sf.localSSRC == ssrc { + return true + } + } + } + return false +} + +func serverStreamPickLocalSSRC(sf *serverStreamFormat) (uint32, error) { + for { + ssrc, err := randUint32() + if err != nil { + return 0, err + } + + if ssrc != 0 && !isServerStreamLocalSSRCTaken(ssrc, sf.sm.st, sf) { + return ssrc, nil + } + } +} + type serverStreamFormat struct { sm *serverStreamMedia format format.Format + localSSRC uint32 rtcpSender *rtcpsender.RTCPSender rtpPacketsSent *uint64 } -func (sf *serverStreamFormat) initialize() { +func (sf *serverStreamFormat) initialize() error { + var err error + sf.localSSRC, err = serverStreamPickLocalSSRC(sf) + if err != nil { + return err + } + sf.rtpPacketsSent = new(uint64) sf.rtcpSender = &rtcpsender.RTCPSender{ @@ -33,18 +74,26 @@ func (sf *serverStreamFormat) initialize() { }, } sf.rtcpSender.Initialize() + + return nil } -func (sf *serverStreamFormat) localSSRC() (uint32, bool) { - stats := sf.rtcpSender.Stats() - if stats != nil { - return stats.LocalSSRC, true +func (sf *serverStreamFormat) close() { + if sf.rtcpSender != nil { + sf.rtcpSender.Close() } - - return 0, false } -func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error { +func (sf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error { + pkt.SSRC = sf.localSSRC + + byts := make([]byte, sf.sm.st.Server.MaxPacketSize) + n, err := pkt.MarshalTo(byts) + if err != nil { + return err + } + byts = byts[:n] + sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) le := uint64(len(byts)) @@ -52,7 +101,7 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t // send unicast for r := range sf.sm.st.activeUnicastReaders { if _, ok := r.setuppedMedias[sf.sm.media]; ok { - err := r.writePacketRTP(sf.sm.media, pkt.PayloadType, byts) + err := r.writePacketRTPEncoded(sf.sm.media, pkt.PayloadType, byts) if err != nil { r.onStreamWriteError(err) continue diff --git a/server_stream_media.go b/server_stream_media.go index 26bfc8d7..5a6ed050 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -4,6 +4,7 @@ import ( "sync/atomic" "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/pion/rtcp" ) type serverStreamMedia struct { @@ -17,26 +18,34 @@ type serverStreamMedia struct { rtcpPacketsSent *uint64 } -func (sm *serverStreamMedia) initialize() { +func (sm *serverStreamMedia) initialize() error { sm.bytesSent = new(uint64) sm.rtcpPacketsSent = new(uint64) sm.formats = make(map[uint8]*serverStreamFormat) - for _, forma := range sm.media.Formats { + + for i, forma := range sm.media.Formats { sf := &serverStreamFormat{ sm: sm, format: forma, } - sf.initialize() + err := sf.initialize() + if err != nil { + for _, forma := range sm.media.Formats[:i] { + sm.formats[forma.PayloadType()].close() + } + return err + } + sm.formats[forma.PayloadType()] = sf } + + return nil } func (sm *serverStreamMedia) close() { - for _, tr := range sm.formats { - if tr.rtcpSender != nil { - tr.rtcpSender.Close() - } + for _, sf := range sm.formats { + sf.close() } if sm.multicastWriter != nil { @@ -44,13 +53,18 @@ func (sm *serverStreamMedia) close() { } } -func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error { +func (sm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error { + byts, err := pkt.Marshal() + if err != nil { + return err + } + le := len(byts) // send unicast for r := range sm.st.activeUnicastReaders { if _, ok := r.setuppedMedias[sm.media]; ok { - err := r.writePacketRTCP(sm.media, byts) + err := r.writePacketRTCPEncoded(sm.media, byts) if err != nil { r.onStreamWriteError(err) continue diff --git a/server_test.go b/server_test.go index a2f98072..1e74f0d7 100644 --- a/server_test.go +++ b/server_test.go @@ -754,6 +754,7 @@ func TestServerSetupMultipleTransports(t *testing.T) { Delivery: deliveryPtr(headers.TransportDeliveryUnicast), Protocol: headers.TransportProtocolTCP, InterleavedIDs: &[2]int{0, 1}, + SSRC: th.SSRC, }, th) }