diff --git a/client.go b/client.go index 4b52b142..6c0eedac 100644 --- a/client.go +++ b/client.go @@ -1842,7 +1842,7 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet) error { } if c.tracks[trackID].rtcpSender != nil { - c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt) + c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt, true) } c.writeBuffer.Push(trackTypePayload{ diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index 488815ec..4b8af115 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -131,7 +131,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) // if we are the publisher, route the RTP packet to readers if ctx.Session == sh.publisher { - sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet) + sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS) } } diff --git a/examples/server/main.go b/examples/server/main.go index 952b0e51..531daf0c 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -130,7 +130,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) // if we are the publisher, route the RTP packet to readers if ctx.Session == sh.publisher { - sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet) + sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS) } } diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index f5fea5f1..ebfc7d20 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -99,7 +99,7 @@ func (rs *RTCPSender) report(ts time.Time) rtcp.Packet { } // ProcessPacketRTP extracts the needed data from RTP packets. -func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet) { +func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet, ptsEqualsDTS bool) { rs.mutex.Lock() defer rs.mutex.Unlock() @@ -108,9 +108,10 @@ func (rs *RTCPSender) ProcessPacketRTP(ts time.Time, pkt *rtp.Packet) { rs.senderSSRC = pkt.SSRC } - // always update time to minimize errors - rs.lastRTPTimeRTP = pkt.Timestamp - rs.lastRTPTimeTime = ts + if ptsEqualsDTS { + rs.lastRTPTimeRTP = pkt.Timestamp + rs.lastRTPTimeTime = ts + } rs.packetCount++ rs.octetCount += uint32(len(pkt.Payload)) diff --git a/pkg/rtcpsender/rtcpsender_test.go b/pkg/rtcpsender/rtcpsender_test.go index f50d310e..0b179bcc 100644 --- a/pkg/rtcpsender/rtcpsender_test.go +++ b/pkg/rtcpsender/rtcpsender_test.go @@ -20,8 +20,8 @@ func TestRTCPSender(t *testing.T) { SSRC: 0xba9da416, NTPTime: 0xcbddcc34999997ff, RTPTime: 0x4d185ae8, - PacketCount: 2, - OctetCount: 4, + PacketCount: 3, + OctetCount: 6, }, pkt) close(done) }) @@ -39,7 +39,7 @@ func TestRTCPSender(t *testing.T) { Payload: []byte("\x00\x00"), } ts := time.Date(2008, 0o5, 20, 22, 15, 20, 0, time.UTC) - rs.ProcessPacketRTP(ts, &rtpPkt) + rs.ProcessPacketRTP(ts, &rtpPkt, true) rtpPkt = rtp.Packet{ Header: rtp.Header{ @@ -53,7 +53,21 @@ func TestRTCPSender(t *testing.T) { Payload: []byte("\x00\x00"), } ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC) - rs.ProcessPacketRTP(ts, &rtpPkt) + rs.ProcessPacketRTP(ts, &rtpPkt, true) + + rtpPkt = rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 948, + Timestamp: 1287987768 + 90000, + SSRC: 0xba9da416, + }, + Payload: []byte("\x00\x00"), + } + ts = time.Date(2008, 0o5, 20, 22, 15, 20, 500000000, time.UTC) + rs.ProcessPacketRTP(ts, &rtpPkt, false) <-done } diff --git a/server_read_test.go b/server_read_test.go index 44ba5fdb..2904580f 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -303,7 +303,7 @@ func TestServerRead(t *testing.T) { go func() { time.Sleep(1 * time.Second) stream.WritePacketRTCP(0, &testRTCPPacket) - stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) }() return &base.Response{ @@ -686,8 +686,8 @@ func TestServerReadRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - stream.WritePacketRTP(0, &testRTPPacket) - stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket, true) buf := make([]byte, 2048) n, _, err := l2.ReadFrom(buf) @@ -790,7 +790,7 @@ func TestServerReadNonStandardFrameSize(t *testing.T) { onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { time.Sleep(1 * time.Second) - stream.WritePacketRTP(0, &packet) + stream.WritePacketRTP(0, &packet, true) }() return &base.Response{ @@ -882,7 +882,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { go func() { defer close(writerDone) - stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) t := time.NewTicker(50 * time.Millisecond) defer t.Stop() @@ -890,7 +890,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { for { select { case <-t.C: - stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) case <-writerTerminate: return } @@ -1075,7 +1075,7 @@ func TestServerReadPlayPausePlay(t *testing.T) { for { select { case <-t.C: - stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) case <-writerTerminate: return } @@ -1196,7 +1196,7 @@ func TestServerReadPlayPausePause(t *testing.T) { for { select { case <-t.C: - stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) case <-writerTerminate: return } @@ -1640,8 +1640,8 @@ func TestServerReadPartialTracks(t *testing.T) { onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { time.Sleep(1 * time.Second) - stream.WritePacketRTP(0, &testRTPPacket) - stream.WritePacketRTP(1, &testRTPPacket) + stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(1, &testRTPPacket, true) }() return &base.Response{ @@ -1833,7 +1833,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { SSRC: 96342362, }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, - }) + }, true) rtpInfo, ssrcs := getInfos() require.Equal(t, &headers.RTPInfo{ @@ -1867,7 +1867,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { SSRC: 536474323, }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, - }) + }, true) rtpInfo, ssrcs = getInfos() require.Equal(t, &headers.RTPInfo{ diff --git a/server_test.go b/server_test.go index 4e0de20c..868a3c02 100644 --- a/server_test.go +++ b/server_test.go @@ -412,7 +412,7 @@ func TestServerHighLevelPublishRead(t *testing.T) { defer mutex.Unlock() if ctx.Session == publisher { - stream.WritePacketRTP(ctx.TrackID, ctx.Packet) + stream.WritePacketRTP(ctx.TrackID, ctx.Packet, true) } }, }, diff --git a/serverstream.go b/serverstream.go index 1e353f66..537238a5 100644 --- a/serverstream.go +++ b/serverstream.go @@ -14,9 +14,9 @@ import ( type serverStreamTrack struct { lastSequenceNumber uint32 + lastSSRC uint32 lastTimeRTP uint32 lastTimeNTP int64 - lastSSRC uint32 rtcpSender *rtcpsender.RTCPSender } @@ -216,7 +216,7 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { } // WritePacketRTP writes a RTP packet to all the readers of the stream. -func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) { +func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool) { byts, err := pkt.Marshal() if err != nil { return @@ -227,15 +227,18 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) { atomic.StoreUint32(&track.lastSequenceNumber, uint32(pkt.Header.SequenceNumber)) - atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp) - atomic.StoreInt64(&track.lastTimeNTP, now.Unix()) atomic.StoreUint32(&track.lastSSRC, pkt.Header.SSRC) + if ptsEqualsDTS { + atomic.StoreUint32(&track.lastTimeRTP, pkt.Header.Timestamp) + atomic.StoreInt64(&track.lastTimeNTP, now.Unix()) + } + st.mutex.RLock() defer st.mutex.RUnlock() if track.rtcpSender != nil { - track.rtcpSender.ProcessPacketRTP(now, pkt) + track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS) } // send unicast