set SSRC of outgoing packets (#803)

In client and server, each format now has a fixed, unique, known in
advance SSRC, that is applied to outgoing packets belonging to each
format.

This is needed to support SRTP/MIKEY, that require each format to have
a fixed, unique, and known in advance SSRC.

A secondary effect is that SETUP responses now always contain SSRCs of
each format, regardless of the fact that the first packet has been
produced or not (previously we needed at least one packet, from which
the SSRC was extracted).
This commit is contained in:
Alessandro Ros
2025-07-05 11:08:57 +02:00
committed by GitHub
parent 319fd4bf97
commit 4f3337f56c
16 changed files with 554 additions and 301 deletions

View File

@@ -1423,7 +1423,10 @@ func (c *Client) doSetup(
c: c,
media: medi,
}
cm.initialize()
err = cm.initialize()
if err != nil {
return nil, err
}
if c.effectiveTransport == nil {
if c.connURL.Scheme == "rtsps" { // always use TCP if encrypted
@@ -1981,13 +1984,6 @@ func (c *Client) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error
// WritePacketRTPWithNTP writes a RTP packet to the server.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, c.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
select {
case <-c.done:
return c.closeError
@@ -2003,26 +1999,11 @@ func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet,
cm := c.setuppedMedias[medi]
cf := cm.formats[pkt.PayloadType]
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
ok := c.writer.push(func() error {
return cf.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
return cf.writePacketRTP(pkt, ntp)
}
// WritePacketRTCP writes a RTCP packet to the server.
func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return err
}
select {
case <-c.done:
return c.closeError
@@ -2037,15 +2018,7 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
}
cm := c.setuppedMedias[medi]
ok := c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
return cm.writePacketRTCP(pkt)
}
// PacketPTS returns the PTS of an incoming RTP packet.
@@ -2208,18 +2181,10 @@ func (c *Client) Stats() *ClientStats {
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: func() uint32 {
if fo.rtcpReceiver != nil {
return *fo.rtcpReceiver.LocalSSRC
}
if sentStats != nil {
return sentStats.LocalSSRC
}
return 0
}(),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if recvStats != nil {
return recvStats.RemoteSSRC
if v, ok := fo.remoteSSRC(); ok {
return v
}
return 0
}(),

View File

@@ -8,17 +8,43 @@ import (
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer"
)
func isClientLocalSSRCTaken(ssrc uint32, c *Client, exclude *clientFormat) bool {
for _, cm := range c.setuppedMedias {
for _, cf := range cm.formats {
if cf != exclude && cf.localSSRC == ssrc {
return true
}
}
}
return false
}
func clientPickLocalSSRC(cf *clientFormat) (uint32, error) {
for {
ssrc, err := randUint32()
if err != nil {
return 0, err
}
if ssrc != 0 && !isClientLocalSSRCTaken(ssrc, cf.cm.c, cf) {
return ssrc, nil
}
}
}
type clientFormat struct {
cm *clientMedia
format format.Format
onPacketRTP OnPacketRTPFunc
localSSRC uint32
udpReorderer *rtpreorderer.Reorderer // play
tcpLossDetector *rtplossdetector.LossDetector // play
rtcpReceiver *rtcpreceiver.RTCPReceiver // play
@@ -29,10 +55,18 @@ type clientFormat struct {
rtpPacketsLost *uint64
}
func (cf *clientFormat) initialize() {
func (cf *clientFormat) initialize() error {
var err error
cf.localSSRC, err = clientPickLocalSSRC(cf)
if err != nil {
return err
}
cf.rtpPacketsReceived = new(uint64)
cf.rtpPacketsSent = new(uint64)
cf.rtpPacketsLost = new(uint64)
return nil
}
func (cf *clientFormat) start() {
@@ -64,6 +98,7 @@ func (cf *clientFormat) start() {
cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{
ClockRate: cf.format.ClockRate(),
LocalSSRC: &cf.localSSRC,
Period: cf.cm.c.receiverReportPeriod,
TimeNow: cf.cm.c.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
@@ -90,6 +125,16 @@ func (cf *clientFormat) stop() {
}
}
func (cf *clientFormat) remoteSSRC() (uint32, bool) {
if cf.rtcpReceiver != nil {
stats := cf.rtcpReceiver.Stats()
if stats != nil {
return stats.RemoteSSRC, true
}
}
return 0, false
}
func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {
@@ -133,6 +178,28 @@ func (cf *clientFormat) handlePacketsLost(lost uint64) {
cf.cm.c.OnPacketsLost(lost)
}
func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {
pkt.SSRC = cf.localSSRC
byts := make([]byte, cf.cm.c.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
cf.rtcpSender.ProcessPacket(pkt, ntp, cf.format.PTSEqualsDTS(pkt))
ok := cf.cm.c.writer.push(func() error {
return cf.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
}
func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := cf.cm.udpRTPListener.write(payload)
if err != nil {

View File

@@ -30,7 +30,7 @@ type clientMedia struct {
rtcpPacketsInError *uint64
}
func (cm *clientMedia) initialize() {
func (cm *clientMedia) initialize() error {
cm.onPacketRTCP = func(rtcp.Packet) {}
cm.bytesReceived = new(uint64)
cm.bytesSent = new(uint64)
@@ -47,9 +47,15 @@ func (cm *clientMedia) initialize() {
format: forma,
onPacketRTP: func(*rtp.Packet) {},
}
f.initialize()
err := f.initialize()
if err != nil {
return err
}
cm.formats[forma.PayloadType()] = f
}
return nil
}
func (cm *clientMedia) close() {
@@ -146,41 +152,15 @@ func (cm *clientMedia) stop() {
}
}
func (cm *clientMedia) findFormatBySSRC(ssrc uint32) *clientFormat {
for _, format := range cm.formats {
stats := format.rtcpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
return format
func (cm *clientMedia) findFormatByRemoteSSRC(ssrc uint32) *clientFormat {
for _, cf := range cm.formats {
if v, ok := cf.remoteSSRC(); ok && v == ssrc {
return cf
}
}
return nil
}
func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
err := cm.udpRTCPListener.write(payload)
if err != nil {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
cm.c.tcpFrame.Channel = cm.tcpChannel + 1
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
err := cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
@@ -226,7 +206,7 @@ func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatBySSRC(sr.SSRC)
format := cm.findFormatByRemoteSSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
@@ -311,7 +291,7 @@ func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatBySSRC(sr.SSRC)
format := cm.findFormatByRemoteSSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
@@ -359,3 +339,44 @@ func (cm *clientMedia) onPacketRTCPDecodeError(err error) {
atomic.AddUint64(cm.rtcpPacketsInError, 1)
cm.c.OnDecodeError(err)
}
func (cm *clientMedia) writePacketRTCP(pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return err
}
ok := cm.c.writer.push(func() error {
return cm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrClientWriteQueueFull{}
}
return nil
}
func (cm *clientMedia) writePacketRTCPInQueueUDP(payload []byte) error {
err := cm.udpRTCPListener.write(payload)
if err != nil {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}
func (cm *clientMedia) writePacketRTCPInQueueTCP(payload []byte) error {
cm.c.tcpFrame.Channel = cm.tcpChannel + 1
cm.c.tcpFrame.Payload = payload
cm.c.nconn.SetWriteDeadline(time.Now().Add(cm.c.WriteTimeout))
err := cm.c.conn.WriteInterleavedFrame(cm.c.tcpFrame, cm.c.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cm.rtcpPacketsSent, 1)
return nil
}

View File

@@ -558,6 +558,48 @@ func TestClientPlay(t *testing.T) {
<-packetRecv
s := c.Stats()
require.Equal(t, &ClientStats{
Conn: StatsConn{
BytesReceived: s.Conn.BytesReceived,
BytesSent: s.Conn.BytesSent,
},
Session: StatsSession{
BytesReceived: s.Session.BytesReceived,
BytesSent: s.Session.BytesSent,
RTPPacketsReceived: s.Session.RTPPacketsReceived,
RTCPPacketsReceived: s.Session.RTCPPacketsReceived,
RTCPPacketsSent: s.Session.RTCPPacketsSent,
Medias: map[*description.Media]StatsSessionMedia{
sd.Medias[0]: { //nolint:dupl
BytesReceived: s.Session.Medias[sd.Medias[0]].BytesReceived,
BytesSent: s.Session.Medias[sd.Medias[0]].BytesSent,
RTCPPacketsReceived: s.Session.Medias[sd.Medias[0]].RTCPPacketsReceived,
RTCPPacketsSent: s.Session.Medias[sd.Medias[0]].RTCPPacketsSent,
Formats: map[format.Format]StatsSessionFormat{
sd.Medias[0].Formats[0]: {
RTPPacketsReceived: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RTPPacketsReceived,
LocalSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].LocalSSRC,
RemoteSSRC: s.Session.Medias[sd.Medias[0]].Formats[sd.Medias[0].Formats[0]].RemoteSSRC,
},
},
},
sd.Medias[1]: { //nolint:dupl
BytesReceived: s.Session.Medias[sd.Medias[1]].BytesReceived,
BytesSent: s.Session.Medias[sd.Medias[1]].BytesSent,
RTCPPacketsReceived: s.Session.Medias[sd.Medias[1]].RTCPPacketsReceived,
RTCPPacketsSent: s.Session.Medias[sd.Medias[1]].RTCPPacketsSent,
Formats: map[format.Format]StatsSessionFormat{
sd.Medias[1].Formats[0]: {
RTPPacketsReceived: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RTPPacketsReceived,
LocalSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].LocalSSRC,
RemoteSSRC: s.Session.Medias[sd.Medias[1]].Formats[sd.Medias[1].Formats[0]].RemoteSSRC,
},
},
},
},
},
}, s)
require.Greater(t, s.Session.BytesSent, uint64(19))
require.Less(t, s.Session.BytesSent, uint64(41))
require.Greater(t, s.Session.BytesReceived, uint64(31))

View File

@@ -364,6 +364,38 @@ func TestClientRecord(t *testing.T) {
<-recvDone
s := c.Stats()
require.Equal(t, &ClientStats{
Conn: StatsConn{
BytesReceived: s.Conn.BytesReceived,
BytesSent: s.Conn.BytesSent,
},
Session: StatsSession{
BytesReceived: s.Session.BytesReceived,
BytesSent: s.Session.BytesSent,
RTPPacketsSent: s.Session.RTPPacketsSent,
RTPPacketsReceived: s.Session.RTPPacketsReceived,
RTCPPacketsReceived: s.Session.RTCPPacketsReceived,
RTCPPacketsSent: s.Session.RTCPPacketsSent,
Medias: map[*description.Media]StatsSessionMedia{
medias[0]: {
BytesReceived: s.Session.Medias[medias[0]].BytesReceived,
BytesSent: s.Session.Medias[medias[0]].BytesSent,
RTCPPacketsReceived: s.Session.Medias[medias[0]].RTCPPacketsReceived,
RTCPPacketsSent: s.Session.Medias[medias[0]].RTCPPacketsSent,
Formats: map[format.Format]StatsSessionFormat{
medias[0].Formats[0]: {
RTPPacketsSent: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsSent,
RTPPacketsReceived: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsReceived,
LocalSSRC: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].LocalSSRC,
RemoteSSRC: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RemoteSSRC,
RTPPacketsLastNTP: s.Session.Medias[medias[0]].Formats[medias[0].Formats[0]].RTPPacketsLastNTP,
},
},
},
},
},
}, s)
require.Greater(t, s.Session.BytesSent, uint64(15))
require.Less(t, s.Session.BytesSent, uint64(17))
require.Greater(t, s.Session.BytesReceived, uint64(19))
@@ -1256,13 +1288,15 @@ func TestClientRecordRTCPReport(t *testing.T) {
packets, err2 := rtcp.Unmarshal(buf)
require.NoError(t, err2)
require.Equal(t, &rtcp.SenderReport{
SSRC: 0x38F27A2F,
NTPTime: ntpTimeGoToRTCP(time.Date(1996, 2, 13, 14, 33, 5, 0, time.UTC)),
RTPTime: 1300000 + 60*90000,
PacketCount: 1,
OctetCount: 1,
}, packets[0])
require.Equal(t, []rtcp.Packet{
&rtcp.SenderReport{
SSRC: packets[0].(*rtcp.SenderReport).SSRC,
NTPTime: ntpTimeGoToRTCP(time.Date(1996, 2, 13, 14, 33, 5, 0, time.UTC)),
RTPTime: 1300000 + 60*90000,
PacketCount: 1,
OctetCount: 1,
},
}, packets)
close(reportReceived)

View File

@@ -87,6 +87,8 @@ func New(
// Initialize initializes RTCPReceiver.
func (rr *RTCPReceiver) Initialize() error {
// Deprecated: passing a nil LocalSSRC will be deprecated from next version.
// Please use a fixed LocalSSRC.
if rr.LocalSSRC == nil {
v, err := randUint32()
if err != nil {

View File

@@ -163,7 +163,10 @@ func (rs *RTCPSender) LastPacketData() (uint16, uint32, time.Time, bool) {
// Stats are statistics.
type Stats struct {
LocalSSRC uint32
// Deprecated: this is not a statistics anymore but a fixed parameter.
// it will be removed in next version.
LocalSSRC uint32
LastSequenceNumber uint16
LastRTP uint32
LastNTP time.Time

View File

@@ -32,10 +32,6 @@ func uint16Ptr(v uint16) *uint16 {
return &v
}
func uint32Ptr(v uint32) *uint32 {
return &v
}
func multicastCapableIP(t *testing.T) string {
intfs, err := net.Interfaces()
require.NoError(t, err)
@@ -848,7 +844,12 @@ func TestServerPlay(t *testing.T) {
var n int
n, _, err = l1.ReadFrom(buf)
require.NoError(t, err)
require.Equal(t, testRTPPacketMarshaled, buf[:n])
var pkt rtp.Packet
err = pkt.Unmarshal(buf[:n])
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
buf = make([]byte, 2048)
n, _, err = l2.ReadFrom(buf)
@@ -864,7 +865,11 @@ func TestServerPlay(t *testing.T) {
f, err = conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 5, f.Channel)
require.Equal(t, testRTPPacketMarshaled, f.Payload)
var pkt rtp.Packet
err = pkt.Unmarshal(f.Payload)
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
}
// client -> server
@@ -1385,13 +1390,15 @@ func TestServerPlayRTCPReport(t *testing.T) {
packets, err := rtcp.Unmarshal(buf)
require.NoError(t, err)
require.Equal(t, &rtcp.SenderReport{
SSRC: 0x38F27A2F,
NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)),
RTPTime: 240000 + 90000*30,
PacketCount: 1,
OctetCount: 1,
}, packets[0])
require.Equal(t, []rtcp.Packet{
&rtcp.SenderReport{
SSRC: packets[0].(*rtcp.SenderReport).SSRC,
NTPTime: ntpTimeGoToRTCP(time.Date(2017, 8, 10, 12, 22, 30, 0, time.UTC)),
RTPTime: 240000 + 90000*30,
PacketCount: 1,
OctetCount: 1,
},
}, packets)
doTeardown(t, conn, "rtsp://localhost:8554/teststream", session)
})
@@ -2072,7 +2079,12 @@ func TestServerPlayPartialMedias(t *testing.T) {
f, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, 4, f.Channel)
require.Equal(t, testRTPPacketMarshaled, f.Payload)
var pkt rtp.Packet
err = pkt.Unmarshal(f.Payload)
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
}
func TestServerPlayAdditionalInfos(t *testing.T) {
@@ -2203,10 +2215,9 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
}).String(),
},
}, rtpInfo)
require.Equal(t, []*uint32{
uint32Ptr(96342362),
nil,
}, ssrcs)
require.Len(t, ssrcs, 2)
require.NotNil(t, ssrcs[0])
require.NotNil(t, ssrcs[1])
err = stream.WritePacketRTP(stream.Description().Medias[1], &rtp.Packet{
Header: rtp.Header{
@@ -2242,10 +2253,9 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
Timestamp: (*rtpInfo)[1].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
uint32Ptr(96342362),
uint32Ptr(536474323),
}, ssrcs)
require.Len(t, ssrcs, 2)
require.NotNil(t, ssrcs[0])
require.NotNil(t, ssrcs[1])
}
func TestServerPlayNoInterleavedIDs(t *testing.T) {
@@ -2327,14 +2337,19 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
doPlay(t, conn, "rtsp://localhost:8554/teststream", session)
for i := 0; i < 2; i++ {
for i := range 2 {
err := stream.WritePacketRTP(stream.Description().Medias[i], &testRTPPacket)
require.NoError(t, err)
f, err := conn.ReadInterleavedFrame()
require.NoError(t, err)
require.Equal(t, i*2, f.Channel)
require.Equal(t, testRTPPacketMarshaled, f.Payload)
var pkt rtp.Packet
err = pkt.Unmarshal(f.Payload)
require.NoError(t, err)
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, testRTPPacket, pkt)
}
}
@@ -2422,7 +2437,8 @@ func TestServerPlayStreamStats(t *testing.T) {
Formats: map[format.Format]ServerStreamStatsFormat{
stream.Description().Medias[0].Formats[0]: {
RTPPacketsSent: 2,
LocalSSRC: 955415087,
LocalSSRC: st.Medias[stream.Description().Medias[0]].
Formats[stream.Description().Medias[0].Formats[0]].LocalSSRC,
},
},
},

View File

@@ -606,6 +606,7 @@ func TestServerRecord(t *testing.T) {
ctx.Session.AnnouncedDescription().Medias[i],
ctx.Session.AnnouncedDescription().Medias[i].Formats[0],
func(pkt *rtp.Packet) {
pkt.SSRC = testRTPPacket.SSRC
require.Equal(t, &testRTPPacket, pkt)
})

View File

@@ -180,26 +180,65 @@ func findFirstSupportedTransportHeader(s *Server, tsh headers.Transports) *heade
return nil
}
func generateRTPInfoEntry(ssm *serverStreamMedia, now time.Time) *headers.RTPInfoEntry {
// do not generate a RTP-Info entry when
// there are multiple formats inside a single media stream,
// since RTP-Info does not support multiple sequence numbers / timestamps.
if len(ssm.media.Formats) > 1 {
return nil
}
format := ssm.formats[ssm.media.Formats[0].PayloadType()]
stats := format.rtcpSender.Stats()
if stats == nil {
return nil
}
clockRate := format.format.ClockRate()
if clockRate == 0 {
return nil
}
// sequence number of the first packet of the stream
seqNum := stats.LastSequenceNumber + 1
// RTP timestamp corresponding to the time value in
// the Range response header.
// remove a small quantity in order to avoid DTS > PTS
ts := uint32(uint64(stats.LastRTP) +
uint64(now.Sub(stats.LastNTP).Seconds()*float64(clockRate)) -
uint64(clockRate)/10)
return &headers.RTPInfoEntry{
SequenceNumber: &seqNum,
Timestamp: &ts,
}
}
func generateRTPInfo(
now time.Time,
setuppedMediasOrdered []*serverSessionMedia,
setuppedStream *ServerStream,
setuppedPath string,
mediasOrdered []*serverSessionMedia,
stream *ServerStream,
path string,
u *base.URL,
) (headers.RTPInfo, bool) {
var ri headers.RTPInfo
for _, sm := range setuppedMediasOrdered {
entry := setuppedStream.rtpInfoEntry(sm.media, now)
for _, sm := range mediasOrdered {
ssm := stream.medias[sm.media]
entry := generateRTPInfoEntry(ssm, now)
if entry == nil {
entry = &headers.RTPInfoEntry{}
}
entry.URL = (&base.URL{
Scheme: u.Scheme,
Host: u.Host,
Path: setuppedPath + "/trackID=" +
strconv.FormatInt(int64(setuppedStream.medias[sm.media].trackID), 10),
Path: path + "/trackID=" +
strconv.FormatInt(int64(ssm.trackID), 10),
}).String()
ri = append(ri, entry)
}
@@ -500,18 +539,10 @@ func (ss *ServerSession) Stats() *StatsSession {
RTPPacketsReceived: atomic.LoadUint64(fo.rtpPacketsReceived),
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
RTPPacketsLost: atomic.LoadUint64(fo.rtpPacketsLost),
LocalSSRC: func() uint32 {
if fo.rtcpReceiver != nil {
return *fo.rtcpReceiver.LocalSSRC
}
if sentStats != nil {
return sentStats.LocalSSRC
}
return 0
}(),
LocalSSRC: fo.localSSRC,
RemoteSSRC: func() uint32 {
if recvStats != nil {
return recvStats.RemoteSSRC
if v, ok := fo.remoteSSRC(); ok {
return v
}
return 0
}(),
@@ -1089,10 +1120,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
// since the Transport header does not support multiple SSRCs.
if len(stream.medias[medi].formats) == 1 {
format := stream.medias[medi].formats[medi.Formats[0].PayloadType()]
ssrc, ok := format.localSSRC()
if ok {
th.SSRC = &ssrc
}
th.SSRC = &format.localSSRC
}
}
@@ -1105,7 +1133,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
media: medi,
onPacketRTCP: func(_ rtcp.Packet) {},
}
sm.initialize()
err = sm.initialize()
if err != nil {
return &base.Response{
StatusCode: base.StatusInternalServerError,
}, err
}
switch transport {
case TransportUDP:
@@ -1490,57 +1523,22 @@ func (ss *ServerSession) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFu
sm.onPacketRTCP = cb
}
func (ss *ServerSession) writePacketRTP(medi *description.Media, payloadType uint8, byts []byte) error {
func (ss *ServerSession) writePacketRTPEncoded(medi *description.Media, payloadType uint8, byts []byte) error {
sm := ss.setuppedMedias[medi]
sf := sm.formats[payloadType]
ss.writerMutex.RLock()
defer ss.writerMutex.RUnlock()
if ss.writer == nil {
return nil
}
ok := ss.writer.push(func() error {
return sf.writePacketRTPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
return sf.writePacketRTPEncoded(byts)
}
// WritePacketRTP writes a RTP packet to the session.
func (ss *ServerSession) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error {
byts := make([]byte, ss.s.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
return ss.writePacketRTP(medi, pkt.PayloadType, byts)
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return sf.writePacketRTP(pkt)
}
func (ss *ServerSession) writePacketRTCP(medi *description.Media, byts []byte) error {
func (ss *ServerSession) writePacketRTCPEncoded(medi *description.Media, byts []byte) error {
sm := ss.setuppedMedias[medi]
ss.writerMutex.RLock()
defer ss.writerMutex.RUnlock()
if ss.writer == nil {
return nil
}
ok := ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(byts)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
return sm.writePacketRTCPEncoded(byts)
}
// WritePacketRTCP writes a RTCP packet to the session.
@@ -1550,7 +1548,7 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe
return err
}
return ss.writePacketRTCP(medi, byts)
return ss.writePacketRTCPEncoded(medi, byts)
}
// PacketPTS returns the PTS of an incoming RTP packet.

View File

@@ -15,11 +15,36 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/rtpreorderer"
)
func isServerSessionLocalSSRCTaken(ssrc uint32, ss *ServerSession, exclude *serverSessionFormat) bool {
for _, sm := range ss.setuppedMedias {
for _, sf := range sm.formats {
if sf != exclude && sf.localSSRC == ssrc {
return true
}
}
}
return false
}
func serverSessionPickLocalSSRC(sf *serverSessionFormat) (uint32, error) {
for {
ssrc, err := randUint32()
if err != nil {
return 0, err
}
if ssrc != 0 && !isServerSessionLocalSSRCTaken(ssrc, sf.sm.ss, sf) {
return ssrc, nil
}
}
}
type serverSessionFormat struct {
sm *serverSessionMedia
format format.Format
onPacketRTP OnPacketRTPFunc
localSSRC uint32
udpReorderer *rtpreorderer.Reorderer // publish or back channel
tcpLossDetector *rtplossdetector.LossDetector
rtcpReceiver *rtcpreceiver.RTCPReceiver
@@ -29,10 +54,22 @@ type serverSessionFormat struct {
rtpPacketsLost *uint64
}
func (sf *serverSessionFormat) initialize() {
func (sf *serverSessionFormat) initialize() error {
if sf.sm.ss.state == ServerSessionStatePreRecord || sf.sm.media.IsBackChannel {
var err error
sf.localSSRC, err = serverSessionPickLocalSSRC(sf)
if err != nil {
return err
}
} else {
sf.localSSRC = sf.sm.ss.setuppedStream.medias[sf.sm.media].formats[sf.format.PayloadType()].localSSRC
}
sf.rtpPacketsReceived = new(uint64)
sf.rtpPacketsSent = new(uint64)
sf.rtpPacketsLost = new(uint64)
return nil
}
func (sf *serverSessionFormat) start() {
@@ -54,6 +91,7 @@ func (sf *serverSessionFormat) start() {
sf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{
ClockRate: sf.format.ClockRate(),
LocalSSRC: &sf.localSSRC,
Period: sf.sm.ss.s.receiverReportPeriod,
TimeNow: sf.sm.ss.s.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
@@ -76,6 +114,16 @@ func (sf *serverSessionFormat) stop() {
}
}
func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) {
if sf.rtcpReceiver != nil {
stats := sf.rtcpReceiver.Stats()
if stats != nil {
return stats.RemoteSSRC, true
}
}
return 0, false
}
func (sf *serverSessionFormat) readPacketRTPUDP(pkt *rtp.Packet, now time.Time) {
packets, lost := sf.udpReorderer.Process(pkt)
if lost != 0 {
@@ -137,6 +185,37 @@ func (sf *serverSessionFormat) onPacketRTPLost(lost uint64) {
}
}
func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error {
pkt.SSRC = sf.localSSRC
byts := make([]byte, sf.sm.ss.s.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
return sf.writePacketRTPEncoded(byts)
}
func (sf *serverSessionFormat) writePacketRTPEncoded(payload []byte) error {
sf.sm.ss.writerMutex.RLock()
defer sf.sm.ss.writerMutex.RUnlock()
if sf.sm.ss.writer == nil {
return nil
}
ok := sf.sm.ss.writer.push(func() error {
return sf.writePacketRTPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
}
func (sf *serverSessionFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := sf.sm.ss.s.udpRTPListener.write(payload, sf.sm.udpRTPWriteAddr)
if err != nil {

View File

@@ -33,7 +33,7 @@ type serverSessionMedia struct {
rtcpPacketsInError *uint64
}
func (sm *serverSessionMedia) initialize() {
func (sm *serverSessionMedia) initialize() error {
sm.bytesReceived = new(uint64)
sm.bytesSent = new(uint64)
sm.rtpPacketsInError = new(uint64)
@@ -49,9 +49,14 @@ func (sm *serverSessionMedia) initialize() {
format: forma,
onPacketRTP: func(*rtp.Packet) {},
}
f.initialize()
err := f.initialize()
if err != nil {
return err
}
sm.formats[forma.PayloadType()] = f
}
return nil
}
func (sm *serverSessionMedia) start() {
@@ -114,7 +119,7 @@ func (sm *serverSessionMedia) stop() {
}
}
func (sm *serverSessionMedia) findFormatBySSRC(ssrc uint32) *serverSessionFormat {
func (sm *serverSessionMedia) findFormatByRemoteSSRC(ssrc uint32) *serverSessionFormat {
for _, format := range sm.formats {
stats := format.rtcpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
@@ -124,31 +129,6 @@ func (sm *serverSessionMedia) findFormatBySSRC(ssrc uint32) *serverSessionFormat
return nil
}
func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {
err := sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr)
if err != nil {
return err
}
atomic.AddUint64(sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
return nil
}
func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
return nil
}
func (sm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
@@ -253,7 +233,7 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatBySSRC(sr.SSRC)
format := sm.findFormatByRemoteSSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
@@ -354,7 +334,7 @@ func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatBySSRC(sr.SSRC)
format := sm.findFormatByRemoteSSRC(sr.SSRC)
if format != nil {
format.rtcpReceiver.ProcessSenderReport(sr, now)
}
@@ -391,3 +371,46 @@ func (sm *serverSessionMedia) onPacketRTCPDecodeError(err error) {
log.Println(err.Error())
}
}
func (sm *serverSessionMedia) writePacketRTCPEncoded(payload []byte) error {
sm.ss.writerMutex.RLock()
defer sm.ss.writerMutex.RUnlock()
if sm.ss.writer == nil {
return nil
}
ok := sm.ss.writer.push(func() error {
return sm.writePacketRTCPInQueue(payload)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
}
func (sm *serverSessionMedia) writePacketRTCPInQueueUDP(payload []byte) error {
err := sm.ss.s.udpRTCPListener.write(payload, sm.udpRTCPWriteAddr)
if err != nil {
return err
}
atomic.AddUint64(sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
return nil
}
func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
sm.ss.tcpFrame.Channel = sm.tcpChannel + 1
sm.ss.tcpFrame.Payload = payload
sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout))
err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(sm.bytesSent, uint64(len(payload)))
atomic.AddUint64(sm.rtcpPacketsSent, 1)
return nil
}

View File

@@ -11,20 +11,9 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
)
func firstFormat(formats map[uint8]*serverStreamFormat) *serverStreamFormat {
var firstKey uint8
for key := range formats {
firstKey = key
break
}
return formats[firstKey]
}
// NewServerStream allocates a ServerStream.
//
// Deprecated: replaced by ServerStream.Initialize().
@@ -42,9 +31,9 @@ func NewServerStream(s *Server, desc *description.Session) *ServerStream {
// ServerStream represents a data stream.
// This is in charge of
// - storing stream description and statistics
// - distributing the stream to each reader
// - allocating multicast listeners
// - gathering infos about the stream in order to generate SSRC and RTP-Info
type ServerStream struct {
Server *Server
Desc *description.Session
@@ -73,7 +62,14 @@ func (st *ServerStream) Initialize() error {
media: medi,
trackID: i,
}
sm.initialize()
err := sm.initialize()
if err != nil {
for _, medi := range st.Desc.Medias[:i] {
st.medias[medi].close()
}
return err
}
st.medias[medi] = sm
}
@@ -152,12 +148,7 @@ func (st *ServerStream) Stats() *ServerStreamStats {
for _, fo := range sm.formats {
ret[fo.format] = ServerStreamStatsFormat{
RTPPacketsSent: atomic.LoadUint64(fo.rtpPacketsSent),
LocalSSRC: func() uint32 {
if v, ok := fo.localSSRC(); ok {
return v
}
return 0
}(),
LocalSSRC: fo.localSSRC,
}
}
@@ -171,47 +162,6 @@ func (st *ServerStream) Stats() *ServerStreamStats {
}
}
func (st *ServerStream) rtpInfoEntry(medi *description.Media, now time.Time) *headers.RTPInfoEntry {
st.mutex.Lock()
defer st.mutex.Unlock()
sm := st.medias[medi]
// if there are multiple formats inside a single media stream,
// do not generate a RTP-Info entry, since RTP-Info doesn't support
// multiple sequence numbers / timestamps.
if len(sm.formats) > 1 {
return nil
}
format := firstFormat(sm.formats)
stats := format.rtcpSender.Stats()
if stats == nil {
return nil
}
clockRate := format.format.ClockRate()
if clockRate == 0 {
return nil
}
// sequence number of the first packet of the stream
seqNum := stats.LastSequenceNumber + 1
// RTP timestamp corresponding to the time value in
// the Range response header.
// remove a small quantity in order to avoid DTS > PTS
ts := uint32(uint64(stats.LastRTP) +
uint64(now.Sub(stats.LastNTP).Seconds()*float64(clockRate)) -
uint64(clockRate)/10)
return &headers.RTPInfoEntry{
SequenceNumber: &seqNum,
Timestamp: &ts,
}
}
func (st *ServerStream) readerAdd(
ss *ServerSession,
clientPorts *[2]int,
@@ -325,13 +275,6 @@ func (st *ServerStream) WritePacketRTP(medi *description.Media, pkt *rtp.Packet)
// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, st.Server.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
st.mutex.RLock()
defer st.mutex.RUnlock()
@@ -341,16 +284,11 @@ func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.
sm := st.medias[medi]
sf := sm.formats[pkt.PayloadType]
return sf.writePacketRTP(byts, pkt, ntp)
return sf.writePacketRTP(pkt, ntp)
}
// WritePacketRTCP writes a RTCP packet to all the readers of the stream.
func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return err
}
st.mutex.RLock()
defer st.mutex.RUnlock()
@@ -359,5 +297,5 @@ func (st *ServerStream) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet
}
sm := st.medias[medi]
return sm.writePacketRTCP(byts)
return sm.writePacketRTCP(pkt)
}

View File

@@ -1,6 +1,7 @@
package gortsplib
import (
"crypto/rand"
"sync/atomic"
"time"
@@ -11,15 +12,55 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
)
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
func isServerStreamLocalSSRCTaken(ssrc uint32, stream *ServerStream, exclude *serverStreamFormat) bool {
for _, sm := range stream.medias {
for _, sf := range sm.formats {
if sf != exclude && sf.localSSRC == ssrc {
return true
}
}
}
return false
}
func serverStreamPickLocalSSRC(sf *serverStreamFormat) (uint32, error) {
for {
ssrc, err := randUint32()
if err != nil {
return 0, err
}
if ssrc != 0 && !isServerStreamLocalSSRCTaken(ssrc, sf.sm.st, sf) {
return ssrc, nil
}
}
}
type serverStreamFormat struct {
sm *serverStreamMedia
format format.Format
localSSRC uint32
rtcpSender *rtcpsender.RTCPSender
rtpPacketsSent *uint64
}
func (sf *serverStreamFormat) initialize() {
func (sf *serverStreamFormat) initialize() error {
var err error
sf.localSSRC, err = serverStreamPickLocalSSRC(sf)
if err != nil {
return err
}
sf.rtpPacketsSent = new(uint64)
sf.rtcpSender = &rtcpsender.RTCPSender{
@@ -33,18 +74,26 @@ func (sf *serverStreamFormat) initialize() {
},
}
sf.rtcpSender.Initialize()
return nil
}
func (sf *serverStreamFormat) localSSRC() (uint32, bool) {
stats := sf.rtcpSender.Stats()
if stats != nil {
return stats.LocalSSRC, true
func (sf *serverStreamFormat) close() {
if sf.rtcpSender != nil {
sf.rtcpSender.Close()
}
return 0, false
}
func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) error {
func (sf *serverStreamFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {
pkt.SSRC = sf.localSSRC
byts := make([]byte, sf.sm.st.Server.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
return err
}
byts = byts[:n]
sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
le := uint64(len(byts))
@@ -52,7 +101,7 @@ func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp t
// send unicast
for r := range sf.sm.st.activeUnicastReaders {
if _, ok := r.setuppedMedias[sf.sm.media]; ok {
err := r.writePacketRTP(sf.sm.media, pkt.PayloadType, byts)
err := r.writePacketRTPEncoded(sf.sm.media, pkt.PayloadType, byts)
if err != nil {
r.onStreamWriteError(err)
continue

View File

@@ -4,6 +4,7 @@ import (
"sync/atomic"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/pion/rtcp"
)
type serverStreamMedia struct {
@@ -17,26 +18,34 @@ type serverStreamMedia struct {
rtcpPacketsSent *uint64
}
func (sm *serverStreamMedia) initialize() {
func (sm *serverStreamMedia) initialize() error {
sm.bytesSent = new(uint64)
sm.rtcpPacketsSent = new(uint64)
sm.formats = make(map[uint8]*serverStreamFormat)
for _, forma := range sm.media.Formats {
for i, forma := range sm.media.Formats {
sf := &serverStreamFormat{
sm: sm,
format: forma,
}
sf.initialize()
err := sf.initialize()
if err != nil {
for _, forma := range sm.media.Formats[:i] {
sm.formats[forma.PayloadType()].close()
}
return err
}
sm.formats[forma.PayloadType()] = sf
}
return nil
}
func (sm *serverStreamMedia) close() {
for _, tr := range sm.formats {
if tr.rtcpSender != nil {
tr.rtcpSender.Close()
}
for _, sf := range sm.formats {
sf.close()
}
if sm.multicastWriter != nil {
@@ -44,13 +53,18 @@ func (sm *serverStreamMedia) close() {
}
}
func (sm *serverStreamMedia) writePacketRTCP(byts []byte) error {
func (sm *serverStreamMedia) writePacketRTCP(pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return err
}
le := len(byts)
// send unicast
for r := range sm.st.activeUnicastReaders {
if _, ok := r.setuppedMedias[sm.media]; ok {
err := r.writePacketRTCP(sm.media, byts)
err := r.writePacketRTCPEncoded(sm.media, byts)
if err != nil {
r.onStreamWriteError(err)
continue

View File

@@ -754,6 +754,7 @@ func TestServerSetupMultipleTransports(t *testing.T) {
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
SSRC: th.SSRC,
}, th)
}