support formats with dynamic SSRC (#687) (#940)

This commit is contained in:
Alessandro Ros
2025-11-11 12:27:05 +01:00
committed by GitHub
parent 9a257e65c8
commit b8acb24a71
8 changed files with 696 additions and 296 deletions

View File

@@ -1,6 +1,8 @@
package gortsplib
import (
"fmt"
"sync"
"sync/atomic"
"time"
@@ -19,6 +21,9 @@ type clientFormat struct {
localSSRC uint32
onPacketRTP OnPacketRTPFunc
remoteSSRCMutex sync.RWMutex // play
remoteSSRCFilled bool // play
remoteSSRCValue uint32 // play
rtpReceiver *rtpreceiver.Receiver // play
rtpSender *rtpsender.Sender // record or back channel
writePacketRTPInQueue func([]byte) error
@@ -82,24 +87,34 @@ func (cf *clientFormat) close() {
}
func (cf *clientFormat) remoteSSRC() (uint32, bool) {
if cf.rtpReceiver != nil {
stats := cf.rtpReceiver.Stats()
if stats != nil {
return stats.RemoteSSRC, true
}
}
return 0, false
cf.remoteSSRCMutex.RLock()
defer cf.remoteSSRCMutex.RUnlock()
return cf.remoteSSRCValue, cf.remoteSSRCFilled
}
func (cf *clientFormat) readPacketRTP(pkt *rtp.Packet) {
now := cf.cm.c.timeNow()
func (cf *clientFormat) readPacketRTP(payload []byte, header *rtp.Header, now time.Time) bool {
if !cf.remoteSSRCFilled {
cf.remoteSSRCMutex.Lock()
cf.remoteSSRCFilled = true
cf.remoteSSRCValue = header.SSRC
cf.remoteSSRCMutex.Unlock()
pkts, lost, err := cf.rtpReceiver.ProcessPacket(pkt, now, cf.format.PTSEqualsDTS(pkt))
// a wrong SSRC is an issue only when encryption is enabled, since it spams srtp.Context.DecryptRTP.
} else if cf.cm.srtpInCtx != nil &&
header.SSRC != cf.remoteSSRCValue {
cf.cm.onPacketRTPDecodeError(fmt.Errorf("received packet with wrong SSRC %d, expected %d",
header.SSRC, cf.remoteSSRCValue))
return false
}
pkt, err := cf.cm.decodeRTP(payload, header)
if err != nil {
cf.cm.onPacketRTPDecodeError(err)
return
return false
}
pkts, lost := cf.rtpReceiver.ProcessPacket2(pkt, now, cf.format.PTSEqualsDTS(pkt))
if lost != 0 {
atomic.AddUint64(cf.rtpPacketsLost, lost)
cf.cm.c.OnPacketsLost(lost)
@@ -110,6 +125,8 @@ func (cf *clientFormat) readPacketRTP(pkt *rtp.Packet) {
for _, pkt := range pkts {
cf.onPacketRTP(pkt)
}
return true
}
func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error {

View File

@@ -184,10 +184,10 @@ func (cm *clientMedia) findFormatByRemoteSSRC(ssrc uint32) *clientFormat {
return nil
}
func (cm *clientMedia) decodeRTP(payload []byte) (*rtp.Packet, error) {
func (cm *clientMedia) decodeRTP(payload []byte, header *rtp.Header) (*rtp.Packet, error) {
if cm.srtpInCtx != nil {
var err error
payload, err = cm.srtpInCtx.decryptRTP(payload, payload, nil)
payload, err = cm.srtpInCtx.decryptRTP(payload, payload, header)
if err != nil {
return nil, err
}
@@ -215,46 +215,32 @@ func (cm *clientMedia) decodeRTCP(payload []byte) ([]rtcp.Packet, error) {
return pkts, nil
}
func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
pkt, err := cm.decodeRTP(payload)
func (cm *clientMedia) readPacketRTP(payload []byte, now time.Time) bool {
var header rtp.Header
_, err := header.Unmarshal(payload)
if err != nil {
cm.onPacketRTPDecodeError(err)
return false
}
forma, ok := cm.formats[pkt.PayloadType]
forma, ok := cm.formats[header.PayloadType]
if !ok {
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: header.PayloadType})
return false
}
forma.readPacketRTP(pkt)
return true
return forma.readPacketRTP(payload, &header, now)
}
func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
if len(payload) > udpMaxPayloadSize {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
}
func (cm *clientMedia) readPacketRTCPPlay(payload []byte) bool {
packets, err := cm.decodeRTCP(payload)
if err != nil {
cm.onPacketRTCPDecodeError(err)
return false
}
now := cm.c.timeNow()
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
@@ -271,6 +257,45 @@ func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
return true
}
func (cm *clientMedia) readPacketRTCPRecord(payload []byte) bool {
packets, err := cm.decodeRTCP(payload)
if err != nil {
cm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
cm.onPacketRTCP(pkt)
}
return true
}
func (cm *clientMedia) readPacketRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
return cm.readPacketRTP(payload, now)
}
func (cm *clientMedia) readPacketRTCPTCPPlay(payload []byte) bool {
atomic.AddUint64(cm.bytesReceived, uint64(len(payload)))
now := cm.c.timeNow()
atomic.StoreInt64(cm.c.tcpLastFrameTime, now.Unix())
if len(payload) > udpMaxPayloadSize {
cm.onPacketRTCPDecodeError(liberrors.ErrClientRTCPPacketTooBig{L: len(payload), Max: udpMaxPayloadSize})
return false
}
return cm.readPacketRTCPPlay(payload)
}
func (cm *clientMedia) readPacketRTPTCPRecord(_ []byte) bool {
return false
}
@@ -283,19 +308,7 @@ func (cm *clientMedia) readPacketRTCPTCPRecord(payload []byte) bool {
return false
}
packets, err := cm.decodeRTCP(payload)
if err != nil {
cm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
cm.onPacketRTCP(pkt)
}
return true
return cm.readPacketRTCPRecord(payload)
}
func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool {
@@ -306,21 +319,7 @@ func (cm *clientMedia) readPacketRTPUDPPlay(payload []byte) bool {
return false
}
pkt, err := cm.decodeRTP(payload)
if err != nil {
cm.onPacketRTPDecodeError(err)
return false
}
forma, ok := cm.formats[pkt.PayloadType]
if !ok {
cm.onPacketRTPDecodeError(liberrors.ErrClientRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readPacketRTP(pkt)
return true
return cm.readPacketRTP(payload, cm.c.timeNow())
}
func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool {
@@ -331,28 +330,7 @@ func (cm *clientMedia) readPacketRTCPUDPPlay(payload []byte) bool {
return false
}
packets, err := cm.decodeRTCP(payload)
if err != nil {
cm.onPacketRTCPDecodeError(err)
return false
}
now := cm.c.timeNow()
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := cm.findFormatByRemoteSSRC(sr.SSRC)
if format != nil {
format.rtpReceiver.ProcessSenderReport(sr, now)
}
}
cm.onPacketRTCP(pkt)
}
return true
return cm.readPacketRTCPPlay(payload)
}
func (cm *clientMedia) readPacketRTPUDPRecord(_ []byte) bool {
@@ -367,19 +345,7 @@ func (cm *clientMedia) readPacketRTCPUDPRecord(payload []byte) bool {
return false
}
packets, err := cm.decodeRTCP(payload)
if err != nil {
cm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(cm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
cm.onPacketRTCP(pkt)
}
return true
return cm.readPacketRTCPRecord(payload)
}
func (cm *clientMedia) onPacketRTPDecodeError(err error) {

View File

@@ -3193,13 +3193,11 @@ func TestClientPlayDecodeErrors(t *testing.T) {
{"udp", "rtcp invalid"},
{"udp", "rtp packets lost"},
{"udp", "rtp unknown payload type"},
{"udp", "wrong ssrc"},
{"udp", "rtcp too big"},
{"udp", "rtp too big"},
{"tcp", "rtp invalid"},
{"tcp", "rtcp invalid"},
{"tcp", "rtp unknown payload type"},
{"tcp", "wrong ssrc"},
{"tcp", "rtcp too big"},
} {
t.Run(ca.proto+" "+ca.name, func(t *testing.T) {
@@ -3378,23 +3376,6 @@ func TestClientPlayDecodeErrors(t *testing.T) {
},
}))
case ca.name == "wrong ssrc":
writeRTP(mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
PayloadType: 97,
SequenceNumber: 1,
SSRC: 123,
},
}))
writeRTP(mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
PayloadType: 97,
SequenceNumber: 2,
SSRC: 456,
},
}))
case ca.proto == "udp" && ca.name == "rtp too big":
_, err2 = l1.WriteTo(bytes.Repeat([]byte{0x01, 0x02}, 2000/2), &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
@@ -3437,9 +3418,6 @@ func TestClientPlayDecodeErrors(t *testing.T) {
case ca.name == "rtp unknown payload type":
require.EqualError(t, err, "received RTP packet with unknown payload type: 111")
case ca.name == "wrong ssrc":
require.EqualError(t, err, "received packet with wrong SSRC 456, expected 123")
case ca.proto == "udp" && ca.name == "rtcp too big":
require.EqualError(t, err, "RTCP packet is too big to be read with UDP")
@@ -3979,3 +3957,280 @@ func TestClientPlaySetupErrorBackChannel(t *testing.T) {
}, 0, 0)
require.EqualError(t, err, "we are setupping a back channel but we did not request back channels")
}
func TestClientPlayDifferentSSRCs(t *testing.T) {
for _, ca := range []string{
"unsecure",
"secure",
} {
t.Run(ca, func(t *testing.T) {
cert, err := tls.X509KeyPair(serverCert, serverKey)
require.NoError(t, err)
l, err := tls.Listen("tcp", "127.0.0.1:8554", &tls.Config{Certificates: []tls.Certificate{cert}})
require.NoError(t, err)
defer l.Close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
nconn, err2 := l.Accept()
require.NoError(t, err2)
defer nconn.Close()
conn := conn.NewConn(bufio.NewReader(nconn), nconn)
req, err2 := conn.ReadRequest()
require.NoError(t, err2)
require.Equal(t, base.Options, req.Method)
err2 = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
})
require.NoError(t, err2)
req, err2 = conn.ReadRequest()
require.NoError(t, err2)
require.Equal(t, base.Describe, req.Method)
forma := &format.Generic{
PayloadTyp: 96,
RTPMa: "private/90000",
}
err2 = forma.Init()
require.NoError(t, err2)
var profile headers.TransportProfile
if ca == "secure" {
profile = headers.TransportProfileSAVP
} else {
profile = headers.TransportProfileAVP
}
medias := []*description.Media{{
Type: "application",
Formats: []format.Format{forma},
Profile: profile,
}}
err2 = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
"Content-Base": base.HeaderValue{"rtsps://127.0.0.1:8554/test/stream/"},
},
Body: mediasToSDP(medias),
})
require.NoError(t, err2)
req, err2 = conn.ReadRequest()
require.NoError(t, err2)
require.Equal(t, base.Setup, req.Method)
var inTH headers.Transport
err2 = inTH.Unmarshal(req.Header["Transport"])
require.NoError(t, err2)
require.Equal(t, (*headers.TransportMode)(nil), inTH.Mode)
h := base.Header{}
th := headers.Transport{
Profile: inTH.Profile,
Delivery: ptrOf(headers.TransportDeliveryUnicast),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}
var srtpOutCtx *wrappedSRTPContext
if ca == "secure" {
outKey := make([]byte, srtpKeyLength)
_, err2 = rand.Read(outKey)
require.NoError(t, err2)
srtpOutCtx = &wrappedSRTPContext{
key: outKey,
ssrcs: []uint32{0x38F27A2F},
startROCs: []uint32{10},
}
err2 = srtpOutCtx.initialize()
require.NoError(t, err2)
var mikeyMsg *mikey.Message
mikeyMsg, err = mikeyGenerate(srtpOutCtx)
require.NoError(t, err)
var enc base.HeaderValue
enc, err = headers.KeyMgmt{
URL: req.URL.String(),
MikeyMessage: mikeyMsg,
}.Marshal()
require.NoError(t, err)
h["KeyMgmt"] = enc
}
h["Transport"] = th.Marshal()
err2 = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: h,
})
require.NoError(t, err2)
req, err2 = conn.ReadRequest()
require.NoError(t, err2)
require.Equal(t, base.Play, req.Method)
require.Equal(t, base.HeaderValue{"npt=0-"}, req.Header["Range"])
err2 = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err2)
for i := range 2 { //nolint:dupl
var buf []byte
if i == 0 {
buf = mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 4326234,
SequenceNumber: 1,
},
Payload: []byte{1, 2, 3, 4},
})
} else {
buf = mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 1235762,
SequenceNumber: 2,
},
Payload: []byte{5, 6, 7, 8},
})
}
if ca == "secure" {
encr := make([]byte, 2000)
encr, err2 = srtpOutCtx.encryptRTP(encr, buf, nil)
require.NoError(t, err2)
buf = encr
}
err2 = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: buf,
}, make([]byte, 1024))
require.NoError(t, err2)
}
req, err2 = conn.ReadRequest()
require.NoError(t, err2)
require.Equal(t, base.Teardown, req.Method)
err2 = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err2)
}()
u, err := base.ParseURL("rtsps://127.0.0.1:8554/test/stream")
require.NoError(t, err)
c := Client{
Scheme: u.Scheme,
Host: u.Host,
TLSConfig: &tls.Config{InsecureSkipVerify: true},
Protocol: ptrOf(ProtocolTCP),
}
err = c.Start()
require.NoError(t, err)
defer c.Close()
sd, _, err := c.Describe(u)
require.NoError(t, err)
// test that properties can be accessed in parallel
go func() {
c.Stats()
c.Transport()
}()
err = c.SetupAll(sd.BaseURL, sd.Medias)
require.NoError(t, err)
n := 0
done := make(chan struct{})
c.OnDecodeError = func(err error) {
n++
if ca == "unsecure" {
t.Errorf("should not happen")
} else {
require.Equal(t, 2, n)
require.EqualError(t, err, "received packet with wrong SSRC 1235762, expected 4326234")
close(done)
}
}
c.OnPacketRTPAny(func(_ *description.Media, _ format.Format, pkt *rtp.Packet) {
n++
switch n {
case 1:
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 4326234,
SequenceNumber: 1,
},
Payload: []byte{1, 2, 3, 4},
}, pkt)
case 2:
if ca == "secure" {
t.Errorf("should not happen")
}
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 1235762,
SequenceNumber: 2,
},
Payload: []byte{5, 6, 7, 8},
}, pkt)
close(done)
default:
t.Errorf("should not happen")
}
})
_, err = c.Play(nil)
require.NoError(t, err)
<-done
})
}
}

View File

@@ -12,7 +12,6 @@ import (
)
// Receiver is a utility to receive RTP packets. It is in charge of:
// - removing packets with wrong SSRC
// - removing duplicate packets (when transport is unreliable)
// - reordering packets (when transport is unrealiable)
// - counting lost packets
@@ -164,11 +163,24 @@ func (rr *Receiver) report() rtcp.Packet {
// ProcessPacket processes an incoming RTP packet.
// It returns reordered packets and number of lost packets.
//
// Deprecated: replaced by ProcessPacket2.
func (rr *Receiver) ProcessPacket(
pkt *rtp.Packet,
system time.Time,
ptsEqualsDTS bool,
) ([]*rtp.Packet, uint64, error) {
pkts, lost := rr.ProcessPacket2(pkt, system, ptsEqualsDTS)
return pkts, lost, nil
}
// ProcessPacket2 processes an incoming RTP packet.
// It returns reordered packets and number of lost packets.
func (rr *Receiver) ProcessPacket2(
pkt *rtp.Packet,
system time.Time,
ptsEqualsDTS bool,
) ([]*rtp.Packet, uint64) {
rr.mutex.Lock()
defer rr.mutex.Unlock()
@@ -185,11 +197,7 @@ func (rr *Receiver) ProcessPacket(
rr.lastTimeSystem = system
}
return []*rtp.Packet{pkt}, 0, nil
}
if pkt.SSRC != rr.remoteSSRC {
return nil, 0, fmt.Errorf("received packet with wrong SSRC %d, expected %d", pkt.SSRC, rr.remoteSSRC)
return []*rtp.Packet{pkt}, 0
}
var pkts []*rtp.Packet
@@ -242,7 +250,7 @@ func (rr *Receiver) ProcessPacket(
}
}
return pkts, lost, nil
return pkts, lost
}
func (rr *Receiver) reorder(pkt *rtp.Packet) ([]*rtp.Packet, uint64) {
@@ -372,7 +380,9 @@ func (rr *Receiver) PacketNTP(ts uint32) (time.Time, bool) {
// Stats are statistics.
type Stats struct {
RemoteSSRC uint32
// Deprecated: will be removed in next version.
RemoteSSRC uint32
LastSequenceNumber uint16
LastRTP uint32
LastNTP time.Time

View File

@@ -56,7 +56,7 @@ func TestErrorDifferentSSRC(t *testing.T) {
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
_, _, err = rr.ProcessPacket(&rtpPkt, ts, true)
require.EqualError(t, err, "received packet with wrong SSRC 754623214, expected 1434523")
require.NoError(t, err)
}
func TestStatsBeforeData(t *testing.T) {

View File

@@ -1392,13 +1392,11 @@ func TestServerRecordDecodeErrors(t *testing.T) {
{"udp", "rtcp invalid"},
{"udp", "rtp packets lost"},
{"udp", "rtp unknown payload type"},
{"udp", "wrong ssrc"},
{"udp", "rtcp too big"},
{"udp", "rtp too big"},
{"tcp", "rtcp invalid"},
{"tcp", "rtp packets lost"},
{"tcp", "rtp unknown payload type"},
{"tcp", "wrong ssrc"},
{"tcp", "rtcp too big"},
} {
t.Run(ca.proto+" "+ca.name, func(t *testing.T) {
@@ -1436,9 +1434,6 @@ func TestServerRecordDecodeErrors(t *testing.T) {
case ca.name == "rtp unknown payload type":
require.EqualError(t, ctx.Error, "received RTP packet with unknown payload type: 111")
case ca.name == "wrong ssrc":
require.EqualError(t, ctx.Error, "received packet with wrong SSRC 456, expected 123")
case ca.proto == "udp" && ca.name == "rtcp too big":
require.EqualError(t, ctx.Error, "RTCP packet is too big to be read with UDP")
@@ -1579,23 +1574,6 @@ func TestServerRecordDecodeErrors(t *testing.T) {
},
}))
case ca.name == "wrong ssrc":
writeRTP(mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
PayloadType: 97,
SequenceNumber: 1,
SSRC: 123,
},
}))
writeRTP(mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
PayloadType: 97,
SequenceNumber: 2,
SSRC: 456,
},
}))
case ca.proto == "udp" && ca.name == "rtp too big":
_, err = l1.WriteTo(bytes.Repeat([]byte{0x01, 0x02}, 2000/2), &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
@@ -1804,3 +1782,218 @@ func TestServerRecordPausePause(t *testing.T) {
doPause(t, conn, "rtsp://localhost:8554/teststream", session)
}
func TestServerRecordDifferentSSRCs(t *testing.T) {
for _, ca := range []string{
"unsecure",
"secure",
} {
t.Run(ca, func(t *testing.T) {
n := 0
done := make(chan struct{})
s := &Server{
Handler: &testServerHandler{
onAnnounce: func(_ *ServerHandlerOnAnnounceCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil, nil
},
onDecodeError: func(ctx *ServerHandlerOnDecodeErrorCtx) {
n++
if ca == "unsecure" {
t.Errorf("should not happen")
} else {
require.Equal(t, 2, n)
require.EqualError(t, ctx.Error, "received packet with wrong SSRC 1235762, expected 4326234")
close(done)
}
},
onRecord: func(ctx *ServerHandlerOnRecordCtx) (*base.Response, error) {
ctx.Session.OnPacketRTP(
ctx.Session.AnnouncedDescription().Medias[0],
ctx.Session.AnnouncedDescription().Medias[0].Formats[0],
func(pkt *rtp.Packet) {
n++
switch n {
case 1:
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 4326234,
SequenceNumber: 1,
},
Payload: []byte{1, 2, 3, 4},
}, pkt)
case 2:
if ca == "secure" {
t.Errorf("should not happen")
}
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 1235762,
SequenceNumber: 2,
},
Payload: []byte{5, 6, 7, 8},
}, pkt)
close(done)
default:
t.Errorf("should not happen")
}
})
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "localhost:8554",
}
cert, err := tls.X509KeyPair(serverCert, serverKey)
require.NoError(t, err)
s.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}}
err = s.Start()
require.NoError(t, err)
defer s.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
nconn = tls.Client(nconn, &tls.Config{InsecureSkipVerify: true})
conn := conn.NewConn(bufio.NewReader(nconn), nconn)
medias := []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
SPS: testH264Media.Formats[0].(*format.H264).SPS,
PPS: testH264Media.Formats[0].(*format.H264).PPS,
PacketizationMode: 1,
}},
}}
doAnnounce(t, conn, "rtsps://localhost:8554/teststream", medias)
var srtpOutCtx *wrappedSRTPContext
inTH := &headers.Transport{
Delivery: ptrOf(headers.TransportDeliveryUnicast),
Mode: ptrOf(headers.TransportModeRecord),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}
h := base.Header{
"CSeq": base.HeaderValue{"1"},
}
if ca == "secure" {
inTH.Profile = headers.TransportProfileSAVP
key := make([]byte, srtpKeyLength)
_, err = rand.Read(key)
require.NoError(t, err)
srtpOutCtx = &wrappedSRTPContext{
key: key,
ssrcs: []uint32{2345423},
}
err = srtpOutCtx.initialize()
require.NoError(t, err)
var mikeyMsg *mikey.Message
mikeyMsg, err = mikeyGenerate(srtpOutCtx)
require.NoError(t, err)
var enc base.HeaderValue
enc, err = headers.KeyMgmt{
URL: "rtsp://localhost:8554/teststream/" + medias[0].Control,
MikeyMessage: mikeyMsg,
}.Marshal()
require.NoError(t, err)
h["KeyMgmt"] = enc
}
h["Transport"] = inTH.Marshal()
var res *base.Response
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/teststream/" + medias[0].Control),
Header: h,
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var th headers.Transport
err = th.Unmarshal(res.Header["Transport"])
require.NoError(t, err)
session := readSession(t, res)
doRecord(t, conn, "rtsp://localhost:8554/teststream", session)
for i := range 2 { //nolint:dupl
var buf []byte
if i == 0 {
buf = mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 4326234,
SequenceNumber: 1,
},
Payload: []byte{1, 2, 3, 4},
})
} else {
buf = mustMarshalPacketRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
CSRC: []uint32{},
SSRC: 1235762,
SequenceNumber: 2,
},
Payload: []byte{5, 6, 7, 8},
})
}
if ca == "secure" {
encr := make([]byte, 2000)
encr, err = srtpOutCtx.encryptRTP(encr, buf, nil)
require.NoError(t, err)
buf = encr
}
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: buf,
}, make([]byte, 1024))
require.NoError(t, err)
}
<-done
nconn.Close()
})
}
}

View File

@@ -1,7 +1,9 @@
package gortsplib
import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"
@@ -19,7 +21,10 @@ type serverSessionFormat struct {
localSSRC uint32
onPacketRTP OnPacketRTPFunc
rtpReceiver *rtpreceiver.Receiver
remoteSSRCMutex sync.RWMutex // record
remoteSSRCFilled bool // record
remoteSSRCValue uint32 // record
rtpReceiver *rtpreceiver.Receiver // record
writePacketRTPInQueue func([]byte) error
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
@@ -68,22 +73,34 @@ func (sf *serverSessionFormat) close() {
}
func (sf *serverSessionFormat) remoteSSRC() (uint32, bool) {
if sf.rtpReceiver != nil {
stats := sf.rtpReceiver.Stats()
if stats != nil {
return stats.RemoteSSRC, true
}
}
return 0, false
sf.remoteSSRCMutex.RLock()
defer sf.remoteSSRCMutex.RUnlock()
return sf.remoteSSRCValue, sf.remoteSSRCFilled
}
func (sf *serverSessionFormat) readPacketRTP(pkt *rtp.Packet, now time.Time) {
pkts, lost, err := sf.rtpReceiver.ProcessPacket(pkt, now, sf.format.PTSEqualsDTS(pkt))
func (sf *serverSessionFormat) readPacketRTP(payload []byte, header *rtp.Header, now time.Time) bool {
if !sf.remoteSSRCFilled {
sf.remoteSSRCMutex.Lock()
sf.remoteSSRCFilled = true
sf.remoteSSRCValue = header.SSRC
sf.remoteSSRCMutex.Unlock()
// a wrong SSRC is an issue only when encryption is enabled, since it spams srtp.Context.DecryptRTP.
} else if sf.sm.srtpInCtx != nil &&
header.SSRC != sf.remoteSSRCValue {
sf.sm.onPacketRTPDecodeError(fmt.Errorf("received packet with wrong SSRC %d, expected %d",
header.SSRC, sf.remoteSSRCValue))
return false
}
pkt, err := sf.sm.decodeRTP(payload, header)
if err != nil {
sf.sm.onPacketRTPDecodeError(err)
return
return false
}
pkts, lost := sf.rtpReceiver.ProcessPacket2(pkt, now, sf.format.PTSEqualsDTS(pkt))
if lost != 0 {
atomic.AddUint64(sf.rtpPacketsLost, lost)
@@ -109,6 +126,8 @@ func (sf *serverSessionFormat) readPacketRTP(pkt *rtp.Packet, now time.Time) {
for _, pkt := range pkts {
sf.onPacketRTP(pkt)
}
return true
}
func (sf *serverSessionFormat) writePacketRTP(pkt *rtp.Packet) error {

View File

@@ -143,19 +143,18 @@ func (sm *serverSessionMedia) stop() {
}
func (sm *serverSessionMedia) findFormatByRemoteSSRC(ssrc uint32) *serverSessionFormat {
for _, format := range sm.formats {
stats := format.rtpReceiver.Stats()
if stats != nil && stats.RemoteSSRC == ssrc {
return format
for _, sf := range sm.formats {
if v, ok := sf.remoteSSRC(); ok && v == ssrc {
return sf
}
}
return nil
}
func (sm *serverSessionMedia) decodeRTP(payload []byte) (*rtp.Packet, error) {
func (sm *serverSessionMedia) decodeRTP(payload []byte, header *rtp.Header) (*rtp.Packet, error) {
if sm.srtpInCtx != nil {
var err error
payload, err = sm.srtpInCtx.decryptRTP(payload, payload, nil)
payload, err = sm.srtpInCtx.decryptRTP(payload, payload, header)
if err != nil {
return nil, err
}
@@ -183,48 +182,30 @@ func (sm *serverSessionMedia) decodeRTCP(payload []byte) ([]rtcp.Packet, error)
return pkts, nil
}
func (sm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return false
}
pkt, err := sm.decodeRTP(payload)
func (sm *serverSessionMedia) readPacketRTP(payload []byte, now time.Time) bool {
var header rtp.Header
_, err := header.Unmarshal(payload)
if err != nil {
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
forma, ok := sm.formats[header.PayloadType]
if !ok {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: header.PayloadType})
return false
}
forma.readPacketRTP(pkt, sm.ss.s.timeNow())
return true
return forma.readPacketRTP(payload, &header, now)
}
func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return false
}
func (sm *serverSessionMedia) readPacketRTCPPlay(payload []byte) bool {
packets, err := sm.decodeRTCP(payload)
if err != nil {
sm.onPacketRTCPDecodeError(err)
return false
}
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
@@ -234,42 +215,7 @@ func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return false
}
pkt, err := sm.decodeRTP(payload)
if err != nil {
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
forma.readPacketRTP(pkt, now)
return true
}
func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return false
}
func (sm *serverSessionMedia) readPacketRTCPRecord(payload []byte) bool {
packets, err := sm.decodeRTCP(payload)
if err != nil {
sm.onPacketRTCPDecodeError(err)
@@ -277,7 +223,6 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
}
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
@@ -295,6 +240,62 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return false
}
return sm.readPacketRTP(payload, sm.ss.s.timeNow())
}
func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return false
}
return sm.readPacketRTCPPlay(payload)
}
func (sm *serverSessionMedia) readPacketRTPUDPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return false
}
return sm.readPacketRTP(payload, sm.ss.s.timeNow())
}
func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
now := sm.ss.s.timeNow()
atomic.StoreInt64(sm.ss.udpLastPacketTime, now.Unix())
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTCPDecodeError(liberrors.ErrServerRTCPPacketTooBigUDP{})
return false
}
return sm.readPacketRTCPRecord(payload)
}
func (sm *serverSessionMedia) readPacketRTPTCPPlay(payload []byte) bool {
if !sm.media.IsBackChannel {
return false
@@ -302,21 +303,7 @@ func (sm *serverSessionMedia) readPacketRTPTCPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
pkt, err := sm.decodeRTP(payload)
if err != nil {
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readPacketRTP(pkt, sm.ss.s.timeNow())
return true
return sm.readPacketRTP(payload, sm.ss.s.timeNow())
}
func (sm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool {
@@ -327,39 +314,13 @@ func (sm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool {
return false
}
packets, err := sm.decodeRTCP(payload)
if err != nil {
sm.onPacketRTCPDecodeError(err)
return false
}
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
sm.onPacketRTCP(pkt)
}
return true
return sm.readPacketRTCPPlay(payload)
}
func (sm *serverSessionMedia) readPacketRTPTCPRecord(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
pkt, err := sm.decodeRTP(payload)
if err != nil {
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readPacketRTP(pkt, sm.ss.s.timeNow())
return true
return sm.readPacketRTP(payload, sm.ss.s.timeNow())
}
func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
@@ -370,28 +331,7 @@ func (sm *serverSessionMedia) readPacketRTCPTCPRecord(payload []byte) bool {
return false
}
packets, err := sm.decodeRTCP(payload)
if err != nil {
sm.onPacketRTCPDecodeError(err)
return false
}
now := sm.ss.s.timeNow()
atomic.AddUint64(sm.rtcpPacketsReceived, uint64(len(packets)))
for _, pkt := range packets {
if sr, ok := pkt.(*rtcp.SenderReport); ok {
format := sm.findFormatByRemoteSSRC(sr.SSRC)
if format != nil {
format.rtpReceiver.ProcessSenderReport(sr, now)
}
}
sm.onPacketRTCP(pkt)
}
return true
return sm.readPacketRTCPRecord(payload)
}
func (sm *serverSessionMedia) onPacketRTPDecodeError(err error) {