- fix truncation to seconds of RTP time in RTP-Info
- add a small quantity to rtptime in RTP-Info
- add 1 to last sequence number in RTP-Info
This commit is contained in:
aler9
2022-05-14 10:37:50 +02:00
parent 3bd8ad810f
commit 206506a8f0
3 changed files with 48 additions and 33 deletions

View File

@@ -1743,7 +1743,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
Path: "/teststream/trackID=0", Path: "/teststream/trackID=0",
}).String(), }).String(),
SequenceNumber: func() *uint16 { SequenceNumber: func() *uint16 {
v := uint16(556) v := uint16(557)
return &v return &v
}(), }(),
Timestamp: (*rtpInfo)[0].Timestamp, Timestamp: (*rtpInfo)[0].Timestamp,
@@ -1777,7 +1777,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
Path: "/teststream/trackID=0", Path: "/teststream/trackID=0",
}).String(), }).String(),
SequenceNumber: func() *uint16 { SequenceNumber: func() *uint16 {
v := uint16(556) v := uint16(557)
return &v return &v
}(), }(),
Timestamp: (*rtpInfo)[0].Timestamp, Timestamp: (*rtpInfo)[0].Timestamp,
@@ -1789,7 +1789,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
Path: "/teststream/trackID=1", Path: "/teststream/trackID=1",
}).String(), }).String(),
SequenceNumber: func() *uint16 { SequenceNumber: func() *uint16 {
v := uint16(87) v := uint16(88)
return &v return &v
}(), }(),
Timestamp: (*rtpInfo)[1].Timestamp, Timestamp: (*rtpInfo)[1].Timestamp,

View File

@@ -881,18 +881,21 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.setuppedStream.readerSetActive(ss) ss.setuppedStream.readerSetActive(ss)
// add RTP-Info
var trackIDs []int var trackIDs []int
for trackID := range ss.setuppedTracks { for trackID := range ss.setuppedTracks {
trackIDs = append(trackIDs, trackID) trackIDs = append(trackIDs, trackID)
} }
sort.Slice(trackIDs, func(a, b int) bool { sort.Slice(trackIDs, func(a, b int) bool {
return trackIDs[a] < trackIDs[b] return trackIDs[a] < trackIDs[b]
}) })
var ri headers.RTPInfo var ri headers.RTPInfo
now := time.Now()
for _, trackID := range trackIDs { for _, trackID := range trackIDs {
ts := ss.setuppedStream.timestamp(trackID) seqNum, ts, ok := ss.setuppedStream.rtpInfo(trackID, now)
if ts == 0 { if !ok {
continue continue
} }
@@ -903,11 +906,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Path: "/" + *ss.setuppedPath + "/trackID=" + strconv.FormatInt(int64(trackID), 10), Path: "/" + *ss.setuppedPath + "/trackID=" + strconv.FormatInt(int64(trackID), 10),
} }
lsn := ss.setuppedStream.lastSequenceNumber(trackID)
ri = append(ri, &headers.RTPInfoEntry{ ri = append(ri, &headers.RTPInfoEntry{
URL: u.String(), URL: u.String(),
SequenceNumber: &lsn, SequenceNumber: &seqNum,
Timestamp: &ts, Timestamp: &ts,
}) })
} }

View File

@@ -2,7 +2,6 @@ package gortsplib
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
@@ -13,11 +12,11 @@ import (
) )
type serverStreamTrack struct { type serverStreamTrack struct {
padding uint32 //nolint:structcheck,unused firstPacketSent bool
lastSequenceNumber uint32 lastSequenceNumber uint16
lastSSRC uint32 lastSSRC uint32
lastTimeRTP uint32 lastTimeRTP uint32
lastTimeNTP int64 lastTimeNTP time.Time
rtcpSender *rtcpsender.RTCPSender rtcpSender *rtcpsender.RTCPSender
} }
@@ -84,23 +83,33 @@ func (st *ServerStream) Tracks() Tracks {
} }
func (st *ServerStream) ssrc(trackID int) uint32 { func (st *ServerStream) ssrc(trackID int) uint32 {
return atomic.LoadUint32(&st.stTracks[trackID].lastSSRC) st.mutex.Lock()
defer st.mutex.Unlock()
return st.stTracks[trackID].lastSSRC
} }
func (st *ServerStream) timestamp(trackID int) uint32 { func (st *ServerStream) rtpInfo(trackID int, now time.Time) (uint16, uint32, bool) {
lastTimeRTP := atomic.LoadUint32(&st.stTracks[trackID].lastTimeRTP) st.mutex.Lock()
lastTimeNTP := atomic.LoadInt64(&st.stTracks[trackID].lastTimeNTP) defer st.mutex.Unlock()
if lastTimeRTP == 0 || lastTimeNTP == 0 { track := st.stTracks[trackID]
return 0
if !track.firstPacketSent {
return 0, 0, false
} }
return uint32(uint64(lastTimeRTP) + // sequence number of the first packet of the stream
uint64(time.Since(time.Unix(lastTimeNTP, 0)).Seconds()*float64(st.tracks[trackID].ClockRate()))) seq := track.lastSequenceNumber + 1
}
func (st *ServerStream) lastSequenceNumber(trackID int) uint16 { // RTP timestamp corresponding to the time value in
return uint16(atomic.LoadUint32(&st.stTracks[trackID].lastSequenceNumber)) // the Range response header.
// remove a small quantity in order to avoid DTS > PTS
cr := st.tracks[trackID].ClockRate()
ts := uint32(uint64(track.lastTimeRTP) +
uint64(now.Sub(track.lastTimeNTP).Seconds()*float64(cr)) -
uint64(cr)/10)
return seq, ts, true
} }
func (st *ServerStream) readerAdd( func (st *ServerStream) readerAdd(
@@ -225,21 +234,26 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDT
} }
byts = byts[:n] byts = byts[:n]
st.mutex.RLock()
defer st.mutex.RUnlock()
track := st.stTracks[trackID] track := st.stTracks[trackID]
now := time.Now() now := time.Now()
atomic.StoreUint32(&track.lastSequenceNumber, if !track.firstPacketSent ||
uint32(pkt.Header.SequenceNumber)) ptsEqualsDTS ||
atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC) pkt.Header.SequenceNumber > track.lastSequenceNumber ||
(track.lastSequenceNumber-pkt.Header.SequenceNumber) > 0xFFF {
if !track.firstPacketSent || ptsEqualsDTS {
track.lastTimeRTP = pkt.Header.Timestamp
track.lastTimeNTP = now
}
if ptsEqualsDTS { track.firstPacketSent = true
atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp) track.lastSequenceNumber = pkt.Header.SequenceNumber
atomic.StoreInt64(&track.lastTimeNTP, now.Unix()) track.lastSSRC = pkt.Header.SSRC
} }
st.mutex.RLock()
defer st.mutex.RUnlock()
if track.rtcpSender != nil { if track.rtcpSender != nil {
track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS) track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS)
} }