use native timestamps instead of time.Duration (#627)

this improves timestamp precision
This commit is contained in:
Alessandro Ros
2024-10-07 15:58:43 +02:00
committed by GitHub
parent 5ec470c827
commit 2ca0bffa20
33 changed files with 220 additions and 109 deletions

View File

@@ -317,6 +317,7 @@ type Client struct {
writer asyncProcessor
reader *clientReader
timeDecoder *rtptime.GlobalDecoder
timeDecoder2 *rtptime.GlobalDecoder2
mustClose bool
// in
@@ -799,6 +800,7 @@ func (c *Client) startReadRoutines() {
}
c.timeDecoder = rtptime.NewGlobalDecoder()
c.timeDecoder2 = rtptime.NewGlobalDecoder2()
for _, cm := range c.medias {
cm.start()
@@ -1879,12 +1881,22 @@ func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error
// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
//
// Deprecated: replaced by PacketPTS2.
func (c *Client) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder.Decode(ct.format, pkt)
}
// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (c *Client) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return c.timeDecoder2.Decode(ct.format, pkt)
}
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from RTCP sender reports.
func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) {

View File

@@ -59,7 +59,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -58,7 +58,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -58,7 +58,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -84,7 +84,7 @@ func main() {
// called when a H264/RTP packet arrives
c.OnPacketRTP(h264Media, h264Format, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(h264Media, pkt)
pts, ok := c.PacketPTS2(h264Media, pkt)
if !ok {
log.Printf("waiting for timestamp")
return
@@ -112,7 +112,7 @@ func main() {
// called when a MPEG-4 audio / RTP packet arrives
c.OnPacketRTP(mpeg4AudioMedia, mpeg4AudioFormat, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(mpeg4AudioMedia, pkt)
pts, ok := c.PacketPTS2(mpeg4AudioMedia, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -4,15 +4,16 @@ import (
"bufio"
"os"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}
// mpegtsMuxer allows to save a H264 / MPEG-4 audio stream into a MPEG-TS file.
@@ -26,7 +27,7 @@ type mpegtsMuxer struct {
w *mpegts.Writer
h264Track *mpegts.Track
mpeg4AudioTrack *mpegts.Track
dtsExtractor *h264.DTSExtractor
dtsExtractor *h264.DTSExtractor2
mutex sync.Mutex
}
@@ -61,7 +62,7 @@ func (e *mpegtsMuxer) close() {
}
// writeH264 writes a H264 access unit into MPEG-TS.
func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error {
func (e *mpegtsMuxer) writeH264(au [][]byte, pts int64) error {
e.mutex.Lock()
defer e.mutex.Unlock()
@@ -105,30 +106,27 @@ func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error {
au = append([][]byte{e.h264Format.SPS, e.h264Format.PPS}, au...)
}
var dts time.Duration
if e.dtsExtractor == nil {
// skip samples silently until we find one with a IDR
if !idrPresent {
return nil
}
e.dtsExtractor = h264.NewDTSExtractor()
e.dtsExtractor = h264.NewDTSExtractor2()
}
var err error
dts, err = e.dtsExtractor.Extract(au, pts)
dts, err := e.dtsExtractor.Extract(au, pts)
if err != nil {
return err
}
// encode into MPEG-TS
return e.w.WriteH264(e.h264Track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, au)
return e.w.WriteH264(e.h264Track, pts, dts, idrPresent, au)
}
// writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS.
func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts time.Duration) error {
func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error {
e.mutex.Lock()
defer e.mutex.Unlock()
return e.w.WriteMPEG4Audio(e.mpeg4AudioTrack, durationGoToMPEGTS(pts), aus)
return e.w.WriteMPEG4Audio(e.mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.mpeg4AudioFormat.ClockRate())), aus)
}

View File

@@ -71,7 +71,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -3,16 +3,11 @@ package main
import (
"bufio"
"os"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
// mpegtsMuxer allows to save a H264 stream into a MPEG-TS file.
type mpegtsMuxer struct {
fileName string
@@ -23,7 +18,7 @@ type mpegtsMuxer struct {
b *bufio.Writer
w *mpegts.Writer
track *mpegts.Track
dtsExtractor *h264.DTSExtractor
dtsExtractor *h264.DTSExtractor2
}
// initialize initializes a mpegtsMuxer.
@@ -51,7 +46,7 @@ func (e *mpegtsMuxer) close() {
}
// writeH264 writes a H264 access unit into MPEG-TS.
func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error {
func (e *mpegtsMuxer) writeH264(au [][]byte, pts int64) error {
var filteredAU [][]byte
nonIDRPresent := false
@@ -92,22 +87,19 @@ func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error {
au = append([][]byte{e.sps, e.pps}, au...)
}
var dts time.Duration
if e.dtsExtractor == nil {
// skip samples silently until we find one with a IDR
if !idrPresent {
return nil
}
e.dtsExtractor = h264.NewDTSExtractor()
e.dtsExtractor = h264.NewDTSExtractor2()
}
var err error
dts, err = e.dtsExtractor.Extract(au, pts)
dts, err := e.dtsExtractor.Extract(au, pts)
if err != nil {
return err
}
// encode into MPEG-TS
return e.w.WriteH264(e.track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, au)
return e.w.WriteH264(e.track, pts, dts, idrPresent, au)
}

View File

@@ -78,7 +78,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -72,7 +72,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -3,16 +3,11 @@ package main
import (
"bufio"
"os"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
// mpegtsMuxer allows to save a H265 stream into a MPEG-TS file.
type mpegtsMuxer struct {
fileName string
@@ -24,7 +19,7 @@ type mpegtsMuxer struct {
b *bufio.Writer
w *mpegts.Writer
track *mpegts.Track
dtsExtractor *h265.DTSExtractor
dtsExtractor *h265.DTSExtractor2
}
// initialize initializes a mpegtsMuxer.
@@ -52,7 +47,7 @@ func (e *mpegtsMuxer) close() {
}
// writeH265 writes a H265 access unit into MPEG-TS.
func (e *mpegtsMuxer) writeH265(au [][]byte, pts time.Duration) error {
func (e *mpegtsMuxer) writeH265(au [][]byte, pts int64) error {
var filteredAU [][]byte
isRandomAccess := false
@@ -93,22 +88,19 @@ func (e *mpegtsMuxer) writeH265(au [][]byte, pts time.Duration) error {
au = append([][]byte{e.vps, e.sps, e.pps}, au...)
}
var dts time.Duration
if e.dtsExtractor == nil {
// skip samples silently until we find one with a IDR
if !isRandomAccess {
return nil
}
e.dtsExtractor = h265.NewDTSExtractor()
e.dtsExtractor = h265.NewDTSExtractor2()
}
var err error
dts, err = e.dtsExtractor.Extract(au, pts)
dts, err := e.dtsExtractor.Extract(au, pts)
if err != nil {
return err
}
// encode into MPEG-TS
return e.w.WriteH265(e.track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), isRandomAccess, au)
return e.w.WriteH265(e.track, pts, dts, isRandomAccess, au)
}

View File

@@ -81,7 +81,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -58,7 +58,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -62,7 +62,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -53,6 +53,7 @@ func main() {
// setup MPEG-4 audio -> MPEG-TS muxer
mpegtsMuxer := &mpegtsMuxer{
fileName: "mystream.ts",
format: forma,
track: &mpegts.Track{
Codec: &mpegts.CodecMPEG4Audio{
Config: *forma.Config,
@@ -74,7 +75,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -3,18 +3,21 @@ package main
import (
"bufio"
"os"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}
// mpegtsMuxer allows to save a MPEG-4 audio stream into a MPEG-TS file.
type mpegtsMuxer struct {
fileName string
format format.Format
track *mpegts.Track
f *os.File
@@ -43,6 +46,6 @@ func (e *mpegtsMuxer) close() {
}
// writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS.
func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts time.Duration) error {
return e.w.WriteMPEG4Audio(e.track, durationGoToMPEGTS(pts), aus)
func (e *mpegtsMuxer) writeMPEG4Audio(aus [][]byte, pts int64) error {
return e.w.WriteMPEG4Audio(e.track, multiplyAndDivide(pts, 90000, int64(e.format.ClockRate())), aus)
}

View File

@@ -58,7 +58,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -53,6 +53,7 @@ func main() {
// setup Opus -> MPEG-TS muxer
mpegtsMuxer := &mpegtsMuxer{
fileName: "mystream.ts",
format: forma,
track: &mpegts.Track{
Codec: &mpegts.CodecOpus{
ChannelCount: forma.ChannelCount,
@@ -74,7 +75,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -3,18 +3,21 @@ package main
import (
"bufio"
"os"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}
// mpegtsMuxer allows to save a MPEG-4 audio stream into a MPEG-TS file.
type mpegtsMuxer struct {
fileName string
format format.Format
track *mpegts.Track
f *os.File
@@ -43,6 +46,6 @@ func (e *mpegtsMuxer) close() {
}
// writeOpus writes Opus packets into MPEG-TS.
func (e *mpegtsMuxer) writeOpus(pkt []byte, pts time.Duration) error {
return e.w.WriteOpus(e.track, durationGoToMPEGTS(pts), [][]byte{pkt})
func (e *mpegtsMuxer) writeOpus(pkt []byte, pts int64) error {
return e.w.WriteOpus(e.track, multiplyAndDivide(pts, 90000, int64(e.format.ClockRate())), [][]byte{pkt})
}

View File

@@ -58,7 +58,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -59,7 +59,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -59,7 +59,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := c.PacketPTS(medi, pkt)
pts, ok := c.PacketPTS2(medi, pkt)
if !ok {
log.Printf("waiting for timestamp")
return

View File

@@ -46,7 +46,7 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
// get the PTS timestamp of the packet, i.e. timestamp relative to the start of the session
pts, ptsAvailable := c.PacketPTS(medi, pkt)
pts, ptsAvailable := c.PacketPTS2(medi, pkt)
log.Printf("PTS: available=%v, value=%v\n", ptsAvailable, pts)
// get the NTP timestamp of the packet, i.e. the absolute timestamp

View File

@@ -9,7 +9,6 @@ import (
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
@@ -74,18 +73,15 @@ func main() {
panic(err)
}
// setup RTP timestamp generator
rtpTime := &rtptime.Encoder{ClockRate: forma.ClockRate()}
err = rtpTime.Initialize()
if err != nil {
panic(err)
}
timeDecoder := mpegts.NewTimeDecoder2()
var firstDTS *int64
var startTime time.Time
// setup a callback that is called whenever a H264 access unit is read from the file
r.OnDataH264(track, func(pts, dts int64, au [][]byte) error {
dts = timeDecoder.Decode(dts)
pts = timeDecoder.Decode(pts)
// sleep between access units
if firstDTS != nil {
timeDrift := time.Duration(dts-*firstDTS)*time.Second/90000 - time.Since(startTime)
@@ -105,10 +101,11 @@ func main() {
return err
}
// set timestamp
rtpTime := rtpTime.Encode(time.Duration(pts) * time.Second / 90000)
// set packet timestamp
// we don't have to perform any conversion
// since H264 clock rate is the same in both MPEG-TS and RTSP
for _, packet := range packets {
packet.Timestamp = rtpTime
packet.Timestamp = uint32(pts)
}
// write packets to the server

View File

@@ -10,7 +10,6 @@ import (
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
)
// This example shows how to
@@ -20,6 +19,12 @@ import (
// 4. generate RTP packets from the JPEG image
// 5. write packets to the server
func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}
func createRandomImage(i int) *image.RGBA {
img := image.NewRGBA(image.Rect(0, 0, 640, 480))
@@ -66,12 +71,6 @@ func main() {
panic(err)
}
// setup RTP timestamp generator
rtpTime := &rtptime.Encoder{ClockRate: forma.ClockRate()}
err = rtpTime.Initialize()
if err != nil {
panic(err)
}
start := time.Now()
// setup a ticker to sleep between frames
@@ -99,11 +98,11 @@ func main() {
}
// get current timestamp
ts := rtpTime.Encode(time.Since(start))
pts := uint32(multiplyAndDivide(int64(time.Since(start)), int64(forma.ClockRate()), int64(time.Second)))
// write packets to the server
for _, pkt := range pkts {
pkt.Timestamp = ts
pkt.Timestamp = pts
err = c.WritePacketRTP(desc.Medias[0], pkt)
if err != nil {

View File

@@ -122,7 +122,7 @@ func (sh *serverHandler) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*bas
// called when receiving a RTP packet
ctx.Session.OnPacketRTP(sh.media, sh.format, func(pkt *rtp.Packet) {
// decode timestamp
pts, ok := ctx.Session.PacketPTS(sh.media, pkt)
pts, ok := ctx.Session.PacketPTS2(sh.media, pkt)
if !ok {
return
}

View File

@@ -3,16 +3,11 @@ package main
import (
"bufio"
"os"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
// mpegtsMuxer allows to save a H264 stream into a MPEG-TS file.
type mpegtsMuxer struct {
fileName string
@@ -23,7 +18,7 @@ type mpegtsMuxer struct {
b *bufio.Writer
w *mpegts.Writer
track *mpegts.Track
dtsExtractor *h264.DTSExtractor
dtsExtractor *h264.DTSExtractor2
}
// initialize initializes a mpegtsMuxer.
@@ -51,7 +46,7 @@ func (e *mpegtsMuxer) close() {
}
// writeH264 writes a H264 access unit into MPEG-TS.
func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error {
func (e *mpegtsMuxer) writeH264(au [][]byte, pts int64) error {
var filteredAU [][]byte
nonIDRPresent := false
@@ -92,22 +87,19 @@ func (e *mpegtsMuxer) writeH264(au [][]byte, pts time.Duration) error {
au = append([][]byte{e.sps, e.pps}, au...)
}
var dts time.Duration
if e.dtsExtractor == nil {
// skip samples silently until we find one with a IDR
if !idrPresent {
return nil
}
e.dtsExtractor = h264.NewDTSExtractor()
e.dtsExtractor = h264.NewDTSExtractor2()
}
var err error
dts, err = e.dtsExtractor.Extract(au, pts)
dts, err := e.dtsExtractor.Extract(au, pts)
if err != nil {
return err
}
// encode into MPEG-TS
return e.w.WriteH265(e.track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, au)
return e.w.WriteH265(e.track, pts, dts, idrPresent, au)
}

2
go.mod
View File

@@ -3,7 +3,7 @@ module github.com/bluenviron/gortsplib/v4
go 1.20
require (
github.com/bluenviron/mediacommon v1.12.4
github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc
github.com/google/uuid v1.6.0
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.7-0.20240429002300-bc5124c9d0d0

4
go.sum
View File

@@ -2,8 +2,8 @@ github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflx
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c=
github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/mediacommon v1.12.4 h1:7VrA/W/iDB7VELquXqRjgjzUSJT3llZYgXjFN9WkByo=
github.com/bluenviron/mediacommon v1.12.4/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec=
github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc h1:walYSlRh0oE5Vn+H8dHoZCAOX/XjPUhy9umlckpsn3k=
github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -23,6 +23,8 @@ func randUint32() (uint32, error) {
}
// Encoder is a RTP timestamp encoder.
//
// Deprecated: not used anymore.
type Encoder struct {
// Clock rate.
ClockRate int

View File

@@ -51,6 +51,8 @@ type GlobalDecoderTrack interface {
}
// GlobalDecoder is a RTP timestamp decoder.
//
// Deprecated: replaced by GlobalDecoder2.
type GlobalDecoder struct {
mutex sync.Mutex
leadingTrack GlobalDecoderTrack
@@ -60,6 +62,8 @@ type GlobalDecoder struct {
}
// NewGlobalDecoder allocates a GlobalDecoder.
//
// Deprecated: replaced by NewGlobalDecoder2.
func NewGlobalDecoder() *GlobalDecoder {
return &GlobalDecoder{
tracks: make(map[GlobalDecoderTrack]*globalDecoderTrackData),
@@ -104,16 +108,14 @@ func (d *GlobalDecoder) Decode(
return df.startPTS, true
}
// update startNTP / startPTS
if d.leadingTrack == track && track.PTSEqualsDTS(pkt) {
pts := df.decode(pkt.Timestamp)
// update startNTP / startPTS
if d.leadingTrack == track && track.PTSEqualsDTS(pkt) {
now := timeNow()
d.startNTP = now
d.startPTS = pts
}
return pts, true
}
return df.decode(pkt.Timestamp), true
}

View File

@@ -0,0 +1,103 @@
package rtptime
import (
"sync"
"time"
"github.com/pion/rtp"
)
// avoid an int64 overflow and preserve resolution by splitting division into two parts:
// first add the integer part, then the decimal part.
func multiplyAndDivide2(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}
type globalDecoder2TrackData struct {
overall int64
prev uint32
}
func (d *globalDecoder2TrackData) decode(ts uint32) int64 {
d.overall += int64(int32(ts - d.prev))
d.prev = ts
return d.overall
}
// GlobalDecoder2Track is a track (RTSP format or WebRTC track) of GlobalDecoder2.
type GlobalDecoder2Track interface {
ClockRate() int
PTSEqualsDTS(*rtp.Packet) bool
}
// GlobalDecoder2 is a RTP timestamp decoder.
type GlobalDecoder2 struct {
mutex sync.Mutex
leadingTrack GlobalDecoderTrack
startNTP time.Time
startPTS int64
startPTSClockRate int64
tracks map[GlobalDecoder2Track]*globalDecoder2TrackData
}
// NewGlobalDecoder2 allocates a GlobalDecoder.
func NewGlobalDecoder2() *GlobalDecoder2 {
return &GlobalDecoder2{
tracks: make(map[GlobalDecoder2Track]*globalDecoder2TrackData),
}
}
// Decode decodes a timestamp.
func (d *GlobalDecoder2) Decode(
track GlobalDecoder2Track,
pkt *rtp.Packet,
) (int64, bool) {
if track.ClockRate() == 0 {
return 0, false
}
d.mutex.Lock()
defer d.mutex.Unlock()
df, ok := d.tracks[track]
// never seen before track
if !ok {
if !track.PTSEqualsDTS(pkt) {
return 0, false
}
now := timeNow()
if d.leadingTrack == nil {
d.leadingTrack = track
d.startNTP = now
d.startPTS = 0
d.startPTSClockRate = int64(track.ClockRate())
}
// start from the PTS of the leading track
startPTS := multiplyAndDivide2(d.startPTS, int64(track.ClockRate()), d.startPTSClockRate)
startPTS += multiplyAndDivide2(int64(now.Sub(d.startNTP)), int64(track.ClockRate()), int64(time.Second))
d.tracks[track] = &globalDecoder2TrackData{
overall: startPTS,
prev: pkt.Timestamp,
}
return startPTS, true
}
pts := df.decode(pkt.Timestamp)
// update startNTP / startPTS
if d.leadingTrack == track && track.PTSEqualsDTS(pkt) {
now := timeNow()
d.startNTP = now
d.startPTS = pts
}
return pts, true
}

View File

@@ -248,6 +248,7 @@ type ServerSession struct {
udpCheckStreamTimer *time.Timer
writer asyncProcessor
timeDecoder *rtptime.GlobalDecoder
timeDecoder2 *rtptime.GlobalDecoder2
// in
chHandleRequest chan sessionRequestReq
@@ -948,6 +949,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
ss.udpLastPacketTime = &v
ss.timeDecoder = rtptime.NewGlobalDecoder()
ss.timeDecoder2 = rtptime.NewGlobalDecoder2()
for _, sm := range ss.setuppedMedias {
sm.start()
@@ -1034,6 +1036,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
ss.udpLastPacketTime = &v
ss.timeDecoder = rtptime.NewGlobalDecoder()
ss.timeDecoder2 = rtptime.NewGlobalDecoder2()
for _, sm := range ss.setuppedMedias {
sm.start()
@@ -1088,6 +1091,7 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}
ss.timeDecoder = nil
ss.timeDecoder2 = nil
switch ss.state {
case ServerSessionStatePlay:
@@ -1255,12 +1259,22 @@ func (ss *ServerSession) WritePacketRTCP(medi *description.Media, pkt rtcp.Packe
// PacketPTS returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
//
// Deprecated: replaced by PacketPTS2.
func (ss *ServerSession) PacketPTS(medi *description.Media, pkt *rtp.Packet) (time.Duration, bool) {
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return ss.timeDecoder.Decode(sf.format, pkt)
}
// PacketPTS2 returns the PTS of an incoming RTP packet.
// It is computed by decoding the packet timestamp and sychronizing it with other tracks.
func (ss *ServerSession) PacketPTS2(medi *description.Media, pkt *rtp.Packet) (int64, bool) {
sm := ss.setuppedMedias[medi]
sf := sm.formats[pkt.PayloadType]
return ss.timeDecoder2.Decode(sf.format, pkt)
}
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from RTCP sender reports.
func (ss *ServerSession) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) {