From 29ddbbbbf5f10ee46c839805873cf714aa7f76a0 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 22 Aug 2023 20:53:23 +0200 Subject: [PATCH] align PacketPTS() and PacketNTP(); add example (#374) --- README.md | 1 + client.go | 6 +- examples/client-read-format-g711/main.go | 3 +- examples/client-read-format-g722/main.go | 3 +- .../h264_decoder.go | 4 +- .../main.go | 3 +- .../client-read-format-h264/h264_decoder.go | 4 +- examples/client-read-format-h264/main.go | 3 +- examples/client-read-format-h265/main.go | 3 +- examples/client-read-format-lpcm/main.go | 3 +- examples/client-read-format-mjpeg/main.go | 3 +- .../main.go | 3 +- .../client-read-format-mpeg4audio/main.go | 3 +- examples/client-read-format-opus/main.go | 3 +- examples/client-read-format-vp8/main.go | 3 +- examples/client-read-format-vp9/main.go | 3 +- examples/client-read-republish/main.go | 5 +- examples/client-read-timestamp/main.go | 65 +++++++++++++++++++ examples/server-h264-save-to-disk/main.go | 3 +- server_session.go | 6 +- server_stream.go | 3 +- server_stream_format.go | 43 ++++++++++++ server_stream_media.go | 45 +------------ 23 files changed, 155 insertions(+), 66 deletions(-) create mode 100644 examples/client-read-timestamp/main.go diff --git a/README.md b/README.md index 76963fb7..876a480f 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ Features: * [client-query](examples/client-query/main.go) * [client-read](examples/client-read/main.go) +* [client-read-timestamp](examples/client-read-timestamp/main.go) * [client-read-options](examples/client-read-options/main.go) * [client-read-pause](examples/client-read-pause/main.go) * [client-read-republish](examples/client-read-republish/main.go) diff --git a/client.go b/client.go index 429c573d..a131bcaf 100644 --- a/client.go +++ b/client.go @@ -1707,8 +1707,10 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error // PacketPTS returns the PTS of an incoming RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. -func (c *Client) PacketPTS(forma format.Format, pkt *rtp.Packet) (time.Duration, bool) { - return c.timeDecoder.Decode(forma, pkt) +func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { + cm := c.medias[medi] + ct := cm.formats[pkt.PayloadType] + return c.timeDecoder.Decode(ct.format, pkt) } // PacketNTP returns the NTP timestamp of an incoming RTP packet. diff --git a/examples/client-read-format-g711/main.go b/examples/client-read-format-g711/main.go index d6deadf7..fd370054 100644 --- a/examples/client-read-format-g711/main.go +++ b/examples/client-read-format-g711/main.go @@ -57,7 +57,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-g722/main.go b/examples/client-read-format-g722/main.go index 1d11ccf2..287792ac 100644 --- a/examples/client-read-format-g722/main.go +++ b/examples/client-read-format-g722/main.go @@ -57,7 +57,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-h264-convert-to-jpeg/h264_decoder.go b/examples/client-read-format-h264-convert-to-jpeg/h264_decoder.go index c24ed107..7362ad6e 100644 --- a/examples/client-read-format-h264-convert-to-jpeg/h264_decoder.go +++ b/examples/client-read-format-h264-convert-to-jpeg/h264_decoder.go @@ -20,7 +20,7 @@ func frameLineSize(frame *C.AVFrame) *C.int { return (*C.int)(unsafe.Pointer(&frame.linesize[0])) } -// h264Decoder is a wrapper around ffmpeg's H264 decoder. +// h264Decoder is a wrapper around FFmpeg's H264 decoder. type h264Decoder struct { codecCtx *C.AVCodecContext srcFrame *C.AVFrame @@ -122,7 +122,7 @@ func (d *h264Decoder) decode(nalu []byte) (image.Image, error) { d.dstFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.dstFrame.data[0]))[:dstFrameSize:dstFrameSize] } - // convert frame from YUV420 to RGB + // convert color space from YUV420 to RGB res = C.sws_scale(d.swsCtx, frameData(d.srcFrame), frameLineSize(d.srcFrame), 0, d.srcFrame.height, frameData(d.dstFrame), frameLineSize(d.dstFrame)) if res < 0 { diff --git a/examples/client-read-format-h264-save-to-disk/main.go b/examples/client-read-format-h264-save-to-disk/main.go index fbda0cf3..1fc35b0a 100644 --- a/examples/client-read-format-h264-save-to-disk/main.go +++ b/examples/client-read-format-h264-save-to-disk/main.go @@ -64,7 +64,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-h264/h264_decoder.go b/examples/client-read-format-h264/h264_decoder.go index c24ed107..7362ad6e 100644 --- a/examples/client-read-format-h264/h264_decoder.go +++ b/examples/client-read-format-h264/h264_decoder.go @@ -20,7 +20,7 @@ func frameLineSize(frame *C.AVFrame) *C.int { return (*C.int)(unsafe.Pointer(&frame.linesize[0])) } -// h264Decoder is a wrapper around ffmpeg's H264 decoder. +// h264Decoder is a wrapper around FFmpeg's H264 decoder. type h264Decoder struct { codecCtx *C.AVCodecContext srcFrame *C.AVFrame @@ -122,7 +122,7 @@ func (d *h264Decoder) decode(nalu []byte) (image.Image, error) { d.dstFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.dstFrame.data[0]))[:dstFrameSize:dstFrameSize] } - // convert frame from YUV420 to RGB + // convert color space from YUV420 to RGB res = C.sws_scale(d.swsCtx, frameData(d.srcFrame), frameLineSize(d.srcFrame), 0, d.srcFrame.height, frameData(d.dstFrame), frameLineSize(d.dstFrame)) if res < 0 { diff --git a/examples/client-read-format-h264/main.go b/examples/client-read-format-h264/main.go index 12c535f4..983353c0 100644 --- a/examples/client-read-format-h264/main.go +++ b/examples/client-read-format-h264/main.go @@ -76,7 +76,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-h265/main.go b/examples/client-read-format-h265/main.go index 06b8901d..325527fa 100644 --- a/examples/client-read-format-h265/main.go +++ b/examples/client-read-format-h265/main.go @@ -58,7 +58,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-lpcm/main.go b/examples/client-read-format-lpcm/main.go index 60c70f75..0c660a81 100644 --- a/examples/client-read-format-lpcm/main.go +++ b/examples/client-read-format-lpcm/main.go @@ -57,7 +57,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-mjpeg/main.go b/examples/client-read-format-mjpeg/main.go index e473840a..27aee492 100644 --- a/examples/client-read-format-mjpeg/main.go +++ b/examples/client-read-format-mjpeg/main.go @@ -61,7 +61,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-mpeg4audio-save-to-disk/main.go b/examples/client-read-format-mpeg4audio-save-to-disk/main.go index b66938e3..78dbf7e1 100644 --- a/examples/client-read-format-mpeg4audio-save-to-disk/main.go +++ b/examples/client-read-format-mpeg4audio-save-to-disk/main.go @@ -63,7 +63,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-mpeg4audio/main.go b/examples/client-read-format-mpeg4audio/main.go index a85a72e0..b12b785b 100644 --- a/examples/client-read-format-mpeg4audio/main.go +++ b/examples/client-read-format-mpeg4audio/main.go @@ -57,7 +57,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-opus/main.go b/examples/client-read-format-opus/main.go index e9729649..d613774f 100644 --- a/examples/client-read-format-opus/main.go +++ b/examples/client-read-format-opus/main.go @@ -57,7 +57,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-vp8/main.go b/examples/client-read-format-vp8/main.go index 194ebd44..6dbe6c36 100644 --- a/examples/client-read-format-vp8/main.go +++ b/examples/client-read-format-vp8/main.go @@ -58,7 +58,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-format-vp9/main.go b/examples/client-read-format-vp9/main.go index 655cc9af..8c9ebd71 100644 --- a/examples/client-read-format-vp9/main.go +++ b/examples/client-read-format-vp9/main.go @@ -58,7 +58,8 @@ func main() { // called when a RTP packet arrives c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(forma, pkt) + // decode timestamp + pts, ok := c.PacketPTS(medi, pkt) if !ok { return } diff --git a/examples/client-read-republish/main.go b/examples/client-read-republish/main.go index 92b719a4..148ba62b 100644 --- a/examples/client-read-republish/main.go +++ b/examples/client-read-republish/main.go @@ -11,8 +11,9 @@ import ( ) // This example shows how to -// 1. connect to a RTSP server and read all medias on a path -// 2. re-publish all medias on another path. +// 1. connect to a RTSP server +// 2. read all medias on a path +// 3. re-publish all medias on another path. func main() { reader := gortsplib.Client{} diff --git a/examples/client-read-timestamp/main.go b/examples/client-read-timestamp/main.go new file mode 100644 index 00000000..8d164f67 --- /dev/null +++ b/examples/client-read-timestamp/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "log" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/url" + "github.com/pion/rtp" +) + +// This example shows how to +// 1. connect to a RTSP server +// 2. read all media streams on a path +// 3. Get the PTS and NTP timestamp of incoming RTP packets + +func main() { + c := gortsplib.Client{} + + // parse URL + u, err := url.Parse("rtsp://localhost:8554/mystream") + if err != nil { + panic(err) + } + + // connect to the server + err = c.Start(u.Scheme, u.Host) + if err != nil { + panic(err) + } + defer c.Close() + + // find published medias + desc, _, err := c.Describe(u) + if err != nil { + panic(err) + } + + // setup all medias + err = c.SetupAll(desc.BaseURL, desc.Medias) + if err != nil { + panic(err) + } + + // called when a RTP packet arrives + c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { + // get the PTS timestamp of the packet, i.e. timestamp relative to the start of the session + pts, ptsAvailable := c.PacketPTS(medi, pkt) + log.Printf("PTS: available=%v, value=%v\n", ptsAvailable, pts) + + // get the NTP timestamp of the packet, i.e. the absolute timestamp + ntp, ntpAvailable := c.PacketNTP(medi, pkt) + log.Printf("NTP: available=%v, value=%v\n", ntpAvailable, ntp) + }) + + // start playing + _, err = c.Play(nil) + if err != nil { + panic(err) + } + + // wait until a fatal error + panic(c.Wait()) +} diff --git a/examples/server-h264-save-to-disk/main.go b/examples/server-h264-save-to-disk/main.go index 0ddeaaac..fed4889f 100644 --- a/examples/server-h264-save-to-disk/main.go +++ b/examples/server-h264-save-to-disk/main.go @@ -116,7 +116,8 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas // called when receiving a RTP packet ctx.Session.OnPacketRTP(sh.media, sh.format, func(pkt *rtp.Packet) { - pts, ok := ctx.Session.PacketPTS(sh.format, pkt) + // decode timestamp + pts, ok := ctx.Session.PacketPTS(sh.media, pkt) if !ok { return } diff --git a/server_session.go b/server_session.go index 7257fa65..a2efc8e2 100644 --- a/server_session.go +++ b/server_session.go @@ -1196,8 +1196,10 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe // PacketPTS returns the PTS of an incoming RTP packet. // It is computed by decoding the packet timestamp and sychronizing it with other tracks. -func (ss *ServerSession) PacketPTS(forma format.Format, pkt *rtp.Packet) (time.Duration, bool) { - return ss.timeDecoder.Decode(forma, pkt) +func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) { + sm := ss.setuppedMedias[medi] + sf := sm.formats[pkt.PayloadType] + return ss.timeDecoder.Decode(sf.format, pkt) } // PacketNTP returns the NTP timestamp of an incoming RTP packet. diff --git a/server_stream.go b/server_stream.go index 01b986f6..80083765 100644 --- a/server_stream.go +++ b/server_stream.go @@ -255,7 +255,8 @@ func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp. } sm := st.streamMedias[medi] - sm.writePacketRTP(byts, pkt, ntp) + sf := sm.formats[pkt.PayloadType] + sf.writePacketRTP(byts, pkt, ntp) return nil } diff --git a/server_stream_format.go b/server_stream_format.go index d6d45726..f1efc436 100644 --- a/server_stream_format.go +++ b/server_stream_format.go @@ -1,11 +1,54 @@ package gortsplib import ( + "time" + + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" ) type serverStreamFormat struct { + sm *serverStreamMedia format format.Format rtcpSender *rtcpsender.RTCPSender } + +func newServerStreamFormat(sm *serverStreamMedia, forma format.Format) *serverStreamFormat { + sf := &serverStreamFormat{ + sm: sm, + format: forma, + } + + sf.rtcpSender = rtcpsender.New( + forma.ClockRate(), + sm.st.s.senderReportPeriod, + sm.st.s.timeNow, + func(pkt rtcp.Packet) { + if !sm.st.s.DisableRTCPSenderReports { + sm.st.WritePacketRTCP(sm.media, pkt) //nolint:errcheck + } + }, + ) + + return sf +} + +func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) { + sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt)) + + // send unicast + for r := range sf.sm.st.activeUnicastReaders { + sm, ok := r.setuppedMedias[sf.sm.media] + if ok { + sm.writePacketRTP(byts) + } + } + + // send multicast + if sf.sm.multicastWriter != nil { + sf.sm.multicastWriter.writePacketRTP(byts) + } +} diff --git a/server_stream_media.go b/server_stream_media.go index c75f5d77..619c83c2 100644 --- a/server_stream_media.go +++ b/server_stream_media.go @@ -1,13 +1,7 @@ package gortsplib import ( - "time" - - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/bluenviron/gortsplib/v4/pkg/description" - "github.com/bluenviron/gortsplib/v4/pkg/rtcpsender" ) type serverStreamMedia struct { @@ -27,23 +21,9 @@ func newServerStreamMedia(st *ServerStream, medi *description.Media, trackID int sm.formats = make(map[uint8]*serverStreamFormat) for _, forma := range medi.Formats { - tr := &serverStreamFormat{ - format: forma, - } - - cmedia := medi - tr.rtcpSender = rtcpsender.New( - forma.ClockRate(), - st.s.senderReportPeriod, - st.s.timeNow, - func(pkt rtcp.Packet) { - if !st.s.DisableRTCPSenderReports { - st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck - } - }, - ) - - sm.formats[forma.PayloadType()] = tr + sm.formats[forma.PayloadType()] = newServerStreamFormat( + sm, + forma) } return sm @@ -73,25 +53,6 @@ func (sm *serverStreamMedia) allocateMulticastHandler(s *Server) error { return nil } -func (sm *serverStreamMedia) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) { - forma := sm.formats[pkt.PayloadType] - - forma.rtcpSender.ProcessPacket(pkt, ntp, forma.format.PTSEqualsDTS(pkt)) - - // send unicast - for r := range sm.st.activeUnicastReaders { - sm, ok := r.setuppedMedias[sm.media] - if ok { - sm.writePacketRTP(byts) - } - } - - // send multicast - if sm.multicastWriter != nil { - sm.multicastWriter.writePacketRTP(byts) - } -} - func (sm *serverStreamMedia) writePacketRTCP(byts []byte) { // send unicast for r := range sm.st.activeUnicastReaders {