align PacketPTS() and PacketNTP(); add example (#374)

This commit is contained in:
Alessandro Ros
2023-08-22 20:53:23 +02:00
committed by GitHub
parent cab426e350
commit 29ddbbbbf5
23 changed files with 155 additions and 66 deletions

View File

@@ -57,6 +57,7 @@ Features:
* [client-query](examples/client-query/main.go)
* [client-read](examples/client-read/main.go)
* [client-read-timestamp](examples/client-read-timestamp/main.go)
* [client-read-options](examples/client-read-options/main.go)
* [client-read-pause](examples/client-read-pause/main.go)
* [client-read-republish](examples/client-read-republish/main.go)

View File

@@ -1707,8 +1707,10 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (c *Client) PacketPTS(forma format.Format, pkt *rtp.Packet) (time.Duration, bool) {
return c.timeDecoder.Decode(forma, pkt)
func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder.Decode(ct.format, pkt)
}
// PacketNTP returns the NTP timestamp of an incoming RTP packet.

View File

@@ -57,7 +57,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -57,7 +57,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -20,7 +20,7 @@ func frameLineSize(frame *C.AVFrame) *C.int {
return (*C.int)(unsafe.Pointer(&frame.linesize[0]))
}
// h264Decoder is a wrapper around ffmpeg's H264 decoder.
// h264Decoder is a wrapper around FFmpeg's H264 decoder.
type h264Decoder struct {
codecCtx *C.AVCodecContext
srcFrame *C.AVFrame
@@ -122,7 +122,7 @@ func (d *h264Decoder) decode(nalu []byte) (image.Image, error) {
d.dstFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.dstFrame.data[0]))[:dstFrameSize:dstFrameSize]
}
// convert frame from YUV420 to RGB
// convert color space from YUV420 to RGB
res = C.sws_scale(d.swsCtx, frameData(d.srcFrame), frameLineSize(d.srcFrame),
0, d.srcFrame.height, frameData(d.dstFrame), frameLineSize(d.dstFrame))
if res < 0 {

View File

@@ -64,7 +64,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -20,7 +20,7 @@ func frameLineSize(frame *C.AVFrame) *C.int {
return (*C.int)(unsafe.Pointer(&frame.linesize[0]))
}
// h264Decoder is a wrapper around ffmpeg's H264 decoder.
// h264Decoder is a wrapper around FFmpeg's H264 decoder.
type h264Decoder struct {
codecCtx *C.AVCodecContext
srcFrame *C.AVFrame
@@ -122,7 +122,7 @@ func (d *h264Decoder) decode(nalu []byte) (image.Image, error) {
d.dstFramePtr = (*[1 << 30]uint8)(unsafe.Pointer(d.dstFrame.data[0]))[:dstFrameSize:dstFrameSize]
}
// convert frame from YUV420 to RGB
// convert color space from YUV420 to RGB
res = C.sws_scale(d.swsCtx, frameData(d.srcFrame), frameLineSize(d.srcFrame),
0, d.srcFrame.height, frameData(d.dstFrame), frameLineSize(d.dstFrame))
if res < 0 {

View File

@@ -76,7 +76,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -58,7 +58,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -57,7 +57,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -61,7 +61,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -63,7 +63,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -57,7 +57,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -57,7 +57,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -58,7 +58,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -58,7 +58,8 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(forma, pkt)
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
if !ok {
return
}

View File

@@ -11,8 +11,9 @@ import (
)
// This example shows how to
// 1. connect to a RTSP server and read all medias on a path
// 2. re-publish all medias on another path.
// 1. connect to a RTSP server
// 2. read all medias on a path
// 3. re-publish all medias on another path.
func main() {
reader := gortsplib.Client{}

View File

@@ -0,0 +1,65 @@
package main
import (
"log"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/pion/rtp"
)
// This example shows how to
// 1. connect to a RTSP server
// 2. read all media streams on a path
// 3. Get the PTS and NTP timestamp of incoming RTP packets
func main() {
c := gortsplib.Client{}
// parse URL
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
// connect to the server
err = c.Start(u.Scheme, u.Host)
if err != nil {
panic(err)
}
defer c.Close()
// find published medias
desc, _, err := c.Describe(u)
if err != nil {
panic(err)
}
// setup all medias
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
panic(err)
}
// called when a RTP packet arrives
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
// get the PTS timestamp of the packet, i.e. timestamp relative to the start of the session
pts, ptsAvailable := c.PacketPTS(medi, pkt)
log.Printf("PTS: available=%v, value=%v\n", ptsAvailable, pts)
// get the NTP timestamp of the packet, i.e. the absolute timestamp
ntp, ntpAvailable := c.PacketNTP(medi, pkt)
log.Printf("NTP: available=%v, value=%v\n", ntpAvailable, ntp)
})
// start playing
_, err = c.Play(nil)
if err != nil {
panic(err)
}
// wait until a fatal error
panic(c.Wait())
}

View File

@@ -116,7 +116,8 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
// called when receiving a RTP packet
ctx.Session.OnPacketRTP(sh.media, sh.format, func(pkt *rtp.Packet) {
pts, ok := ctx.Session.PacketPTS(sh.format, pkt)
// decode timestamp
pts, ok := ctx.Session.PacketPTS(sh.media, pkt)
if !ok {
return
}

View File

@@ -1196,8 +1196,10 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe
// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (ss *ServerSession) PacketPTS(forma format.Format, pkt *rtp.Packet) (time.Duration, bool) {
return ss.timeDecoder.Decode(forma, pkt)
func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return ss.timeDecoder.Decode(sf.format, pkt)
}
// PacketNTP returns the NTP timestamp of an incoming RTP packet.

View File

@@ -255,7 +255,8 @@ func (st *ServerStream) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.
}
sm := st.streamMedias[medi]
sm.writePacketRTP(byts, pkt, ntp)
sf := sm.formats[pkt.PayloadType]
sf.writePacketRTP(byts, pkt, ntp)
return nil
}

View File

@@ -1,11 +1,54 @@
package gortsplib
import (
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
)
type serverStreamFormat struct {
sm *serverStreamMedia
format format.Format
rtcpSender *rtcpsender.RTCPSender
}
func newServerStreamFormat(sm *serverStreamMedia, forma format.Format) *serverStreamFormat {
sf := &serverStreamFormat{
sm: sm,
format: forma,
}
sf.rtcpSender = rtcpsender.New(
forma.ClockRate(),
sm.st.s.senderReportPeriod,
sm.st.s.timeNow,
func(pkt rtcp.Packet) {
if !sm.st.s.DisableRTCPSenderReports {
sm.st.WritePacketRTCP(sm.media, pkt) //nolint:errcheck
}
},
)
return sf
}
func (sf *serverStreamFormat) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) {
sf.rtcpSender.ProcessPacket(pkt, ntp, sf.format.PTSEqualsDTS(pkt))
// send unicast
for r := range sf.sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sf.sm.media]
if ok {
sm.writePacketRTP(byts)
}
}
// send multicast
if sf.sm.multicastWriter != nil {
sf.sm.multicastWriter.writePacketRTP(byts)
}
}

View File

@@ -1,13 +1,7 @@
package gortsplib
import (
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/rtcpsender"
)
type serverStreamMedia struct {
@@ -27,23 +21,9 @@ func newServerStreamMedia(st *ServerStream, medi *description.Media, trackID int
sm.formats = make(map[uint8]*serverStreamFormat)
for _, forma := range medi.Formats {
tr := &serverStreamFormat{
format: forma,
}
cmedia := medi
tr.rtcpSender = rtcpsender.New(
forma.ClockRate(),
st.s.senderReportPeriod,
st.s.timeNow,
func(pkt rtcp.Packet) {
if !st.s.DisableRTCPSenderReports {
st.WritePacketRTCP(cmedia, pkt) //nolint:errcheck
}
},
)
sm.formats[forma.PayloadType()] = tr
sm.formats[forma.PayloadType()] = newServerStreamFormat(
sm,
forma)
}
return sm
@@ -73,25 +53,6 @@ func (sm *serverStreamMedia) allocateMulticastHandler(s *Server) error {
return nil
}
func (sm *serverStreamMedia) writePacketRTP(byts []byte, pkt *rtp.Packet, ntp time.Time) {
forma := sm.formats[pkt.PayloadType]
forma.rtcpSender.ProcessPacket(pkt, ntp, forma.format.PTSEqualsDTS(pkt))
// send unicast
for r := range sm.st.activeUnicastReaders {
sm, ok := r.setuppedMedias[sm.media]
if ok {
sm.writePacketRTP(byts)
}
}
// send multicast
if sm.multicastWriter != nil {
sm.multicastWriter.writePacketRTP(byts)
}
}
func (sm *serverStreamMedia) writePacketRTCP(byts []byte) {
// send unicast
for r := range sm.st.activeUnicastReaders {