server: add parameter 'containsKeyFrame' to WritePacketRTP; fix RTCP sender reports and RTP-Info

This commit is contained in:
aler9
2022-04-07 18:41:48 +02:00
committed by Alessandro Ros
parent 0463e6b510
commit d0cab3c8bd
8 changed files with 47 additions and 29 deletions

View File

@@ -1842,7 +1842,7 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet) error {
}
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt)
c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt, true)
}
c.writeBuffer.Push(trackTypePayload{

View File

@@ -131,7 +131,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx)
// if we are the publisher, route the RTP packet to readers
if ctx.Session == sh.publisher {
sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet)
sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS)
}
}

View File

@@ -130,7 +130,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx)
// if we are the publisher, route the RTP packet to readers
if ctx.Session == sh.publisher {
sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet)
sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS)
}
}

View File

@@ -99,7 +99,7 @@ func (rs *RTCPSender) report(ts time.Time) rtcp.Packet {
}
// ProcessPacketRTP extracts the needed data from RTP packets.
func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet) {
func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqualsDTS bool) {
rs.mutex.Lock()
defer rs.mutex.Unlock()
@@ -108,9 +108,10 @@ func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet) {
rs.senderSSRC = pkt.SSRC
}
// always update time to minimize errors
if ptsEqualsDTS {
rs.lastRTPTimeRTP = pkt.Timestamp
rs.lastRTPTimeTime = ts
}
rs.packetCount++
rs.octetCount += uint32(len(pkt.Payload))

View File

@@ -20,8 +20,8 @@ func TestRTCPSender(t *testing.T) {
SSRC: 0xba9da416,
NTPTime: 0xcbddcc34999997ff,
RTPTime: 0x4d185ae8,
PacketCount: 2,
OctetCount: 4,
PacketCount: 3,
OctetCount: 6,
}, pkt)
close(done)
})
@@ -39,7 +39,7 @@ func TestRTCPSender(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC)
rs.ProcessPacketRTP(ts, &rtpPkt)
rs.ProcessPacketRTP(ts, &rtpPkt, true)
rtpPkt = rtp.Packet{
Header: rtp.Header{
@@ -53,7 +53,21 @@ func TestRTCPSender(t *testing.T) {
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC)
rs.ProcessPacketRTP(ts, &rtpPkt)
rs.ProcessPacketRTP(ts, &rtpPkt, true)
rtpPkt = rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 948,
Timestamp: 1287987768 + 90000,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC)
rs.ProcessPacketRTP(ts, &rtpPkt, false)
<-done
}

View File

@@ -303,7 +303,7 @@ func TestServerRead(t *testing.T) {
go func() {
time.Sleep(1 * time.Second)
stream.WritePacketRTCP(0, &testRTCPPacket)
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
}()
return &base.Response{
@@ -686,8 +686,8 @@ func TestServerReadRTCPReport(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
stream.WritePacketRTP(0, &testRTPPacket, true)
buf := make([]byte, 2048)
n, _, err := l2.ReadFrom(buf)
@@ -790,7 +790,7 @@ func TestServerReadNonStandardFrameSize(t *testing.T) {
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(1 * time.Second)
stream.WritePacketRTP(0, &packet)
stream.WritePacketRTP(0, &packet, true)
}()
return &base.Response{
@@ -882,7 +882,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) {
go func() {
defer close(writerDone)
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
@@ -890,7 +890,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) {
for {
select {
case <-t.C:
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
case <-writerTerminate:
return
}
@@ -1075,7 +1075,7 @@ func TestServerReadPlayPausePlay(t *testing.T) {
for {
select {
case <-t.C:
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
case <-writerTerminate:
return
}
@@ -1196,7 +1196,7 @@ func TestServerReadPlayPausePause(t *testing.T) {
for {
select {
case <-t.C:
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
case <-writerTerminate:
return
}
@@ -1640,8 +1640,8 @@ func TestServerReadPartialTracks(t *testing.T) {
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(1 * time.Second)
stream.WritePacketRTP(0, &testRTPPacket)
stream.WritePacketRTP(1, &testRTPPacket)
stream.WritePacketRTP(0, &testRTPPacket, true)
stream.WritePacketRTP(1, &testRTPPacket, true)
}()
return &base.Response{
@@ -1833,7 +1833,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
}, true)
rtpInfo, ssrcs := getInfos()
require.Equal(t, &headers.RTPInfo{
@@ -1867,7 +1867,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
SSRC: 536474323,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
})
}, true)
rtpInfo, ssrcs = getInfos()
require.Equal(t, &headers.RTPInfo{

View File

@@ -412,7 +412,7 @@ func TestServerHighLevelPublishRead(t *testing.T) {
defer mutex.Unlock()
if ctx.Session == publisher {
stream.WritePacketRTP(ctx.TrackID, ctx.Packet)
stream.WritePacketRTP(ctx.TrackID, ctx.Packet, true)
}
},
},

View File

@@ -14,9 +14,9 @@ import (
type serverStreamTrack struct {
lastSequenceNumber uint32
lastSSRC uint32
lastTimeRTP uint32
lastTimeNTP int64
lastSSRC uint32
rtcpSender *rtcpsender.RTCPSender
}
@@ -216,7 +216,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
}
// WritePacketRTP writes a RTP packet to all the readers of the stream.
func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) {
func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool) {
byts, err := pkt.Marshal()
if err != nil {
return
@@ -227,15 +227,18 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) {
atomic.StoreUint32(&track.lastSequenceNumber,
uint32(pkt.Header.SequenceNumber))
atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC)
if ptsEqualsDTS {
atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp)
atomic.StoreInt64(&track.lastTimeNTP, now.Unix())
atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC)
}
st.mutex.RLock()
defer st.mutex.RUnlock()
if track.rtcpSender != nil {
track.rtcpSender.ProcessPacketRTP(now, pkt)
track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS)
}
// send unicast