diff --git a/client.go b/client.go index ab9e7804..8e8a313d 100644 --- a/client.go +++ b/client.go @@ -161,9 +161,8 @@ type clientRes struct { // ClientOnPacketRTPCtx is the context of a RTP packet. type ClientOnPacketRTPCtx struct { - TrackID int - Packet *rtp.Packet - PTSEqualsDTS bool + TrackID int + Packet *rtp.Packet } // ClientOnPacketRTCPCtx is the context of a RTCP packet. @@ -814,9 +813,8 @@ func (c *Client) runReader() { } c.OnPacketRTP(&ClientOnPacketRTPCtx{ - TrackID: track.id, - Packet: pkt, - PTSEqualsDTS: ptsEqualsDTS(track.track, pkt), + TrackID: track.id, + Packet: pkt, }) } else { if len(payload) > maxPacketSize { @@ -1872,8 +1870,15 @@ func (c *Client) runWriter() { } } -// WritePacketRTP writes a RTP packet. -func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool) error { +// WritePacketRTP writes a RTP packet to all the readers of the stream. +func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet) error { + return c.WritePacketRTPWithNTP(trackID, pkt, time.Now()) +} + +// WritePacketRTPWithNTP writes a RTP packet. +// ntp is the absolute time of the packet, and is needed to generate RTCP sender reports +// that allows the receiver to reconstruct the absolute time of the packet. +func (c *Client) WritePacketRTPWithNTP(trackID int, pkt *rtp.Packet, ntp time.Time) error { c.writeMutex.RLock() defer c.writeMutex.RUnlock() @@ -1895,7 +1900,7 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool) t := c.tracks[trackID] if t.rtcpSender != nil { - t.rtcpSender.ProcessPacketRTP(time.Now(), pkt, ptsEqualsDTS) + t.rtcpSender.ProcessPacketRTP(ntp, pkt, ptsEqualsDTS(c.tracks[trackID].track, pkt)) } c.writeBuffer.Push(trackTypePayload{ diff --git a/client_publish_test.go b/client_publish_test.go index 32fea9f8..1513c8cf 100644 --- a/client_publish_test.go +++ b/client_publish_test.go @@ -246,14 +246,14 @@ func TestClientPublishSerial(t *testing.T) { c.Wait() }() - err = c.WritePacketRTP(0, &testRTPPacket, true) + err = c.WritePacketRTP(0, &testRTPPacket) require.NoError(t, err) <-recvDone c.Close() <-done - err = c.WritePacketRTP(0, &testRTPPacket, true) + err = c.WritePacketRTP(0, &testRTPPacket) require.Error(t, err) }) } @@ -403,7 +403,7 @@ func TestClientPublishParallel(t *testing.T) { defer t.Stop() for range t.C { - err := c.WritePacketRTP(0, &testRTPPacket, true) + err := c.WritePacketRTP(0, &testRTPPacket) if err != nil { return } @@ -552,7 +552,7 @@ func TestClientPublishPauseSerial(t *testing.T) { require.NoError(t, err) defer c.Close() - err = c.WritePacketRTP(0, &testRTPPacket, true) + err = c.WritePacketRTP(0, &testRTPPacket) require.NoError(t, err) _, err = c.Pause() @@ -561,7 +561,7 @@ func TestClientPublishPauseSerial(t *testing.T) { _, err = c.Record() require.NoError(t, err) - err = c.WritePacketRTP(0, &testRTPPacket, true) + err = c.WritePacketRTP(0, &testRTPPacket) require.NoError(t, err) }) } @@ -693,7 +693,7 @@ func TestClientPublishPauseParallel(t *testing.T) { defer t.Stop() for range t.C { - err := c.WritePacketRTP(0, &testRTPPacket, true) + err := c.WritePacketRTP(0, &testRTPPacket) if err != nil { return } @@ -829,7 +829,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) { require.NoError(t, err) defer c.Close() - err = c.WritePacketRTP(0, &testRTPPacket, true) + err = c.WritePacketRTP(0, &testRTPPacket) require.NoError(t, err) } @@ -952,7 +952,7 @@ func TestClientPublishRTCPReport(t *testing.T) { NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, PacketCount: 2, - OctetCount: 8, + OctetCount: 2, }, packets[0]) close(reportReceived) @@ -988,11 +988,17 @@ func TestClientPublishRTCPReport(t *testing.T) { require.NoError(t, err) defer c.Close() - err = c.WritePacketRTP(0, &testRTPPacket, true) - require.NoError(t, err) - - err = c.WritePacketRTP(0, &testRTPPacket, true) - require.NoError(t, err) + for i := 0; i < 2; i++ { + err = c.WritePacketRTP(0, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SSRC: 0x38F27A2F, + }, + Payload: []byte{0x05}, // IDR + }) + require.NoError(t, err) + } <-reportReceived }) diff --git a/clientudpl.go b/clientudpl.go index be4e5136..f2d9bb52 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -218,9 +218,8 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { u.ct.udpRTCPReceiver.ProcessPacketRTP(time.Now(), pkt, ptsEqualsDTS) u.c.OnPacketRTP(&ClientOnPacketRTPCtx{ - TrackID: u.ct.id, - Packet: pkt, - PTSEqualsDTS: ptsEqualsDTS, + TrackID: u.ct.id, + Packet: pkt, }) } } diff --git a/examples/client-publish-codec-h264/main.go b/examples/client-publish-codec-h264/main.go index cad65398..f87545d5 100644 --- a/examples/client-publish-codec-h264/main.go +++ b/examples/client-publish-codec-h264/main.go @@ -57,7 +57,7 @@ func main() { } // route RTP packet to the server - err = c.WritePacketRTP(0, &pkt, true) + err = c.WritePacketRTP(0, &pkt) if err != nil { panic(err) } diff --git a/examples/client-publish-codec-mpeg4audio/main.go b/examples/client-publish-codec-mpeg4audio/main.go index c072386a..6435a912 100644 --- a/examples/client-publish-codec-mpeg4audio/main.go +++ b/examples/client-publish-codec-mpeg4audio/main.go @@ -65,7 +65,7 @@ func main() { } // route RTP packet to the server - err = c.WritePacketRTP(0, &pkt, true) + err = c.WritePacketRTP(0, &pkt) if err != nil { panic(err) } diff --git a/examples/client-publish-codec-opus/main.go b/examples/client-publish-codec-opus/main.go index 6ff99028..df771b4a 100644 --- a/examples/client-publish-codec-opus/main.go +++ b/examples/client-publish-codec-opus/main.go @@ -59,7 +59,7 @@ func main() { } // route RTP packet to the server - err = c.WritePacketRTP(0, &pkt, true) + err = c.WritePacketRTP(0, &pkt) if err != nil { panic(err) } diff --git a/examples/client-publish-codec-pcma/main.go b/examples/client-publish-codec-pcma/main.go index d5900435..becafd2e 100644 --- a/examples/client-publish-codec-pcma/main.go +++ b/examples/client-publish-codec-pcma/main.go @@ -55,7 +55,7 @@ func main() { } // route RTP packet to the server - err = c.WritePacketRTP(0, &pkt, true) + err = c.WritePacketRTP(0, &pkt) if err != nil { panic(err) } diff --git a/examples/client-publish-codec-pcmu/main.go b/examples/client-publish-codec-pcmu/main.go index 10176243..13e1adb5 100644 --- a/examples/client-publish-codec-pcmu/main.go +++ b/examples/client-publish-codec-pcmu/main.go @@ -55,7 +55,7 @@ func main() { } // route RTP packet to the server - err = c.WritePacketRTP(0, &pkt, true) + err = c.WritePacketRTP(0, &pkt) if err != nil { panic(err) } diff --git a/examples/client-publish-options/main.go b/examples/client-publish-options/main.go index 86278038..bb48906e 100644 --- a/examples/client-publish-options/main.go +++ b/examples/client-publish-options/main.go @@ -67,7 +67,7 @@ func main() { } // route RTP packet to the server - err = c.WritePacketRTP(0, &pkt, true) + err = c.WritePacketRTP(0, &pkt) if err != nil { panic(err) } diff --git a/examples/client-publish-pause/main.go b/examples/client-publish-pause/main.go index 34328d40..177b0ae9 100644 --- a/examples/client-publish-pause/main.go +++ b/examples/client-publish-pause/main.go @@ -61,7 +61,7 @@ func main() { } // route RTP packet to the server - c.WritePacketRTP(0, &pkt, true) + c.WritePacketRTP(0, &pkt) // read another RTP packet from source n, _, err = pc.ReadFrom(buf) diff --git a/examples/client-read-republish/main.go b/examples/client-read-republish/main.go index 4c43b928..cdd7f032 100644 --- a/examples/client-read-republish/main.go +++ b/examples/client-read-republish/main.go @@ -46,7 +46,7 @@ func main() { // called when a RTP packet arrives reader.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) { - publisher.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS) + publisher.WritePacketRTP(ctx.TrackID, ctx.Packet) } // setup and read all tracks diff --git a/examples/server-tls/main.go b/examples/server-tls/main.go index 674c8621..6dbcd403 100644 --- a/examples/server-tls/main.go +++ b/examples/server-tls/main.go @@ -134,7 +134,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) // if we are the publisher, route the RTP packet to all readers if ctx.Session == sh.publisher { - sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS) + sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet) } } diff --git a/examples/server/main.go b/examples/server/main.go index 680eab66..9f399bac 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -133,7 +133,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) // if we are the publisher, route the RTP packet to all readers if ctx.Session == sh.publisher { - sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet, ctx.PTSEqualsDTS) + sh.stream.WritePacketRTP(ctx.TrackID, ctx.Packet) } } diff --git a/internal/highleveltests/server_test.go b/internal/highleveltests/server_test.go index d1317ea2..576f3e88 100644 --- a/internal/highleveltests/server_test.go +++ b/internal/highleveltests/server_test.go @@ -405,7 +405,7 @@ func TestServerPublishRead(t *testing.T) { defer mutex.Unlock() if ctx.Session == publisher { - stream.WritePacketRTP(ctx.TrackID, ctx.Packet, true) + stream.WritePacketRTP(ctx.TrackID, ctx.Packet) } }, }, diff --git a/server_read_test.go b/server_read_test.go index b470cfea..9811bc36 100644 --- a/server_read_test.go +++ b/server_read_test.go @@ -415,7 +415,7 @@ func TestServerRead(t *testing.T) { go func() { time.Sleep(1 * time.Second) stream.WritePacketRTCP(0, &testRTCPPacket) - stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket) }() return &base.Response{ @@ -803,8 +803,16 @@ func TestServerReadRTCPReport(t *testing.T) { require.NoError(t, err) require.Equal(t, base.StatusOK, res.StatusCode) - stream.WritePacketRTP(0, &testRTPPacket, true) - stream.WritePacketRTP(0, &testRTPPacket, true) + for i := 0; i < 2; i++ { + stream.WritePacketRTP(0, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SSRC: 0x38F27A2F, + }, + Payload: []byte{0x05}, // IDR + }) + } var buf []byte @@ -833,7 +841,7 @@ func TestServerReadRTCPReport(t *testing.T) { NTPTime: packets[0].(*rtcp.SenderReport).NTPTime, RTPTime: packets[0].(*rtcp.SenderReport).RTPTime, PacketCount: 2, - OctetCount: 8, + OctetCount: 2, }, packets[0]) res, err = writeReqReadRes(conn, base.Request{ @@ -931,7 +939,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { go func() { defer close(writerDone) - stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket) t := time.NewTicker(50 * time.Millisecond) defer t.Stop() @@ -939,7 +947,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) { for { select { case <-t.C: - stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket) case <-writerTerminate: return } @@ -1128,7 +1136,7 @@ func TestServerReadPlayPausePlay(t *testing.T) { for { select { case <-t.C: - stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket) case <-writerTerminate: return } @@ -1252,7 +1260,7 @@ func TestServerReadPlayPausePause(t *testing.T) { for { select { case <-t.C: - stream.WritePacketRTP(0, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket) case <-writerTerminate: return } @@ -1708,8 +1716,8 @@ func TestServerReadPartialTracks(t *testing.T) { onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { go func() { time.Sleep(1 * time.Second) - stream.WritePacketRTP(0, &testRTPPacket, true) - stream.WritePacketRTP(1, &testRTPPacket, true) + stream.WritePacketRTP(0, &testRTPPacket) + stream.WritePacketRTP(1, &testRTPPacket) }() return &base.Response{ @@ -1864,11 +1872,15 @@ func TestServerReadAdditionalInfos(t *testing.T) { return &ri, ssrcs } - track := &TrackH264{ - PayloadType: 96, - SPS: []byte{0x01, 0x02, 0x03, 0x04}, - PPS: []byte{0x01, 0x02, 0x03, 0x04}, + track := &TrackGeneric{ + Media: "application", + Payloads: []TrackGenericPayload{{ + Type: 96, + RTPMap: "private/90000", + }}, } + err := track.Init() + require.NoError(t, err) stream := NewServerStream(Tracks{track, track}) defer stream.Close() @@ -1889,7 +1901,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { RTSPAddress: "localhost:8554", } - err := s.Start() + err = s.Start() require.NoError(t, err) defer s.Close() @@ -1902,7 +1914,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { SSRC: 96342362, }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, - }, true) + }) rtpInfo, ssrcs := getInfos() require.Equal(t, &headers.RTPInfo{ @@ -1936,7 +1948,7 @@ func TestServerReadAdditionalInfos(t *testing.T) { SSRC: 536474323, }, Payload: []byte{0x01, 0x02, 0x03, 0x04}, - }, true) + }) rtpInfo, ssrcs = getInfos() require.Equal(t, &headers.RTPInfo{ diff --git a/serverconn.go b/serverconn.go index 089f4068..e742eab3 100644 --- a/serverconn.go +++ b/serverconn.go @@ -280,10 +280,9 @@ func (sc *ServerConn) readFuncTCP(readRequest chan readReq) error { if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok { h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ - Session: sc.session, - TrackID: track.id, - Packet: pkt, - PTSEqualsDTS: ptsEqualsDTS(track.track, pkt), + Session: sc.session, + TrackID: track.id, + Packet: pkt, }) } } else { diff --git a/serverhandler.go b/serverhandler.go index d90dfadf..62421266 100644 --- a/serverhandler.go +++ b/serverhandler.go @@ -199,10 +199,9 @@ type ServerHandlerOnSetParameter interface { // ServerHandlerOnPacketRTPCtx is the context of OnPacketRTP. type ServerHandlerOnPacketRTPCtx struct { - Session *ServerSession - TrackID int - Packet *rtp.Packet - PTSEqualsDTS bool + Session *ServerSession + TrackID int + Packet *rtp.Packet } // ServerHandlerOnPacketRTP can be implemented by a ServerHandler. diff --git a/serverstream.go b/serverstream.go index 08556d15..808cd95a 100644 --- a/serverstream.go +++ b/serverstream.go @@ -232,7 +232,14 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) { } // WritePacketRTP writes a RTP packet to all the readers of the stream. -func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDTS bool) { +func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet) { + st.WritePacketRTPWithNTP(trackID, pkt, time.Now()) +} + +// WritePacketRTPWithNTP writes a RTP packet to all the readers of the stream. +// ntp is the absolute time of the packet, and is needed to generate RTCP sender reports +// that allows the receiver to reconstruct the absolute time of the packet. +func (st *ServerStream) WritePacketRTPWithNTP(trackID int, pkt *rtp.Packet, ntp time.Time) { byts := make([]byte, maxPacketSize) n, err := pkt.MarshalTo(byts) if err != nil { @@ -248,19 +255,19 @@ func (st *ServerStream) WritePacketRTP(trackID int, pkt *rtp.Packet, ptsEqualsDT } track := st.streamTracks[trackID] - now := time.Now() + ptsEqualsDTS := ptsEqualsDTS(st.tracks[trackID], pkt) if ptsEqualsDTS { track.lastTimeFilled = true track.lastTimeRTP = pkt.Header.Timestamp - track.lastTimeNTP = now + track.lastTimeNTP = ntp } track.lastSequenceNumber = pkt.Header.SequenceNumber track.lastSSRC = pkt.Header.SSRC if track.rtcpSender != nil { - track.rtcpSender.ProcessPacketRTP(now, pkt, ptsEqualsDTS) + track.rtcpSender.ProcessPacketRTP(ntp, pkt, ptsEqualsDTS) } // send unicast diff --git a/serverudpl.go b/serverudpl.go index c8e551a9..f5fb741d 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -234,10 +234,9 @@ func (u *serverUDPListener) processRTP(clientData *clientData, payload []byte) { if h, ok := clientData.session.s.Handler.(ServerHandlerOnPacketRTP); ok { h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ - Session: clientData.session, - TrackID: clientData.track.id, - Packet: pkt, - PTSEqualsDTS: ptsEqualsDTS, + Session: clientData.session, + TrackID: clientData.track.id, + Packet: pkt, }) } }