diff --git a/server_read_test.go b/server_read_test.go index fee7c476..ddd63424 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -1743,7 +1743,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { Path: "/teststream/trackID=0", }).String(), SequenceNumber: func() *uint16 { - v := uint16(556) + v := uint16(557) return &v }(), Timestamp: (*rtpInfo)[0].Timestamp, @@ -1777,7 +1777,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { Path: "/teststream/trackID=0", }).String(), SequenceNumber: func() *uint16 { - v := uint16(556) + v := uint16(557) return &v }(), Timestamp: (*rtpInfo)[0].Timestamp, @@ -1789,7 +1789,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { Path: "/teststream/trackID=1", }).String(), SequenceNumber: func() *uint16 { - v := uint16(87) + v := uint16(88) return &v }(), Timestamp: (*rtpInfo)[1].Timestamp, diff --git a/serversession.go b/serversession.go index 51ba9a76..bcf4a6ae 100644 --- a/serversession.go +++ b/serversession.go @@ -881,18 +881,21 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base ss.setuppedStream.readerSetActive(ss) - // add RTP-Info var trackIDs []int for trackID := range ss.setuppedTracks { trackIDs = append(trackIDs, trackID) } + sort.Slice(trackIDs, func(a, b int) bool { return trackIDs[a] < trackIDs[b] }) + var ri headers.RTPInfo + now := time.Now() + for _, trackID := range trackIDs { - ts := ss.setuppedStream.timestamp(trackID) - if ts == 0 { + seqNum, ts, ok := ss.setuppedStream.rtpInfo(trackID, now) + if !ok { continue } @@ -903,11 +906,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base Path: "/" + *ss.setuppedPath + "/trackID=" + strconv.FormatInt(int64(trackID), 10), } - lsn := ss.setuppedStream.lastSequenceNumber(trackID) - ri = append(ri, &headers.RTPInfoEntry{ URL: u.String(), - SequenceNumber: &lsn, + SequenceNumber: &seqNum, Timestamp: &ts, }) } diff --git a/serverstream.go b/serverstream.go index ac67a6ed..556eaac3 100644 --- a/serverstream.go +++ b/serverstream.go @@ -2,7 +2,6 @@ package gortsplib import ( "sync" - "sync/atomic" "time" "github.com/pion/rtcp" @@ -13,11 +12,11 @@ import ( ) type serverStreamTrack struct { - padding uint32 //nolint:structcheck,unused - lastSequenceNumber uint32 + firstPacketSent bool + lastSequenceNumber uint16 lastSSRC uint32 lastTimeRTP uint32 - lastTimeNTP int64 + lastTimeNTP time.Time rtcpSender *rtcpsender.RTCPSender } @@ -84,23 +83,33 @@ func (st *ServerStream) Tracks() Tracks { } func (st *ServerStream) ssrc(trackID int) uint32 { - return atomic.LoadUint32(&st.stTracks[trackID].lastSSRC) + st.mutex.Lock() + defer st.mutex.Unlock() + return st.stTracks[trackID].lastSSRC } -func (st *ServerStream) timestamp(trackID int) uint32 { - lastTimeRTP := atomic.LoadUint32(&st.stTracks[trackID].lastTimeRTP) - lastTimeNTP := atomic.LoadInt64(&st.stTracks[trackID].lastTimeNTP) +func (st *ServerStream) rtpInfo(trackID int, now time.Time) (uint16, uint32, bool) { + st.mutex.Lock() + defer st.mutex.Unlock() - if lastTimeRTP == 0 || lastTimeNTP == 0 { - return 0 + track := st.stTracks[trackID] + + if !track.firstPacketSent { + return 0, 0, false } - return uint32(uint64(lastTimeRTP) + - uint64(time.Since(time.Unix(lastTimeNTP, 0)).Seconds()*float64(st.tracks[trackID].ClockRate()))) -} + // sequence number of the first packet of the stream + seq := track.lastSequenceNumber + 1 -func (st *ServerStream) lastSequenceNumber(trackID int) uint16 { - return uint16(atomic.LoadUint32(&st.stTracks[trackID].lastSequenceNumber)) + // RTP timestamp corresponding to the time value in + // the Range response header. + // remove a small quantity in order to avoid DTS > PTS + cr := st.tracks[trackID].ClockRate() + ts := uint32(uint64(track.lastTimeRTP) + + uint64(now.Sub(track.lastTimeNTP).Seconds()*float64(cr)) - + uint64(cr)/10) + + return seq, ts, true } func (st *ServerStream) readerAdd( @@ -225,21 +234,26 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDT } byts = byts[:n] + st.mutex.RLock() + defer st.mutex.RUnlock() + track := st.stTracks[trackID] now := time.Now() - atomic.StoreUint32(&track.lastSequenceNumber, - uint32(pkt.Header.SequenceNumber)) - atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC) + if !track.firstPacketSent || + ptsEqualsDTS || + pkt.Header.SequenceNumber > track.lastSequenceNumber || + (track.lastSequenceNumber-pkt.Header.SequenceNumber) > 0xFFF { + if !track.firstPacketSent || ptsEqualsDTS { + track.lastTimeRTP = pkt.Header.Timestamp + track.lastTimeNTP = now + } - if ptsEqualsDTS { - atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp) - atomic.StoreInt64(&track.lastTimeNTP, now.Unix()) + track.firstPacketSent = true + track.lastSequenceNumber = pkt.Header.SequenceNumber + track.lastSSRC = pkt.Header.SSRC } - st.mutex.RLock() - defer st.mutex.RUnlock() - if track.rtcpSender != nil { track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS) }