diff --git a/README.md b/README.md index bfd81a63..a9401a9c 100644 --- a/README.md +++ b/README.md @@ -32,8 +32,6 @@ Live streams can be published to the server with: |[UDP/MPEG-TS](#udpmpeg-ts)|Unicast, broadcast, multicast|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3| |[Raspberry Pi Cameras](#raspberry-pi-cameras)||H264|| -Instructions are provided for publishing through these protocols by using [FFmpeg](#ffmpeg), [GStreamer](#gstreamer), [OBS Studio](#obs-studio), [OpenCV](#opencv), [Unity](#unity), [Web browsers](#web-browsers). - Live streams can be read from the server with: |protocol|variants|video codecs|audio codecs| @@ -44,8 +42,6 @@ Live streams can be read from the server with: |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS](#hls)|Low-Latency HLS, MP4-based HLS, legacy HLS|AV1, VP9, H265, H264|Opus, MPEG-4 Audio (AAC)| -Instructions are provided for reading through these protocols by using [FFmpeg](#ffmpeg-1), [GStreamer](#gstreamer-1), [VLC](#vlc), [Unity](#unity-1), [Web browsers](#web-browsers-1). - Live streams be recorded and played back with: |format|video codecs|audio codecs| diff --git a/go.mod b/go.mod index ff39673e..a4e8b17f 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,9 @@ require ( github.com/abema/go-mp4 v1.2.0 github.com/alecthomas/kong v1.2.1 github.com/asticode/go-astits v1.13.0 - github.com/bluenviron/gohlslib/v2 v2.0.0-20241003172246-076f27fbe0f8 - github.com/bluenviron/gortsplib/v4 v4.10.6 - github.com/bluenviron/mediacommon v1.12.4 + github.com/bluenviron/gohlslib/v2 v2.0.0-20241007134735-ed88408cd4de + github.com/bluenviron/gortsplib/v4 v4.10.7-0.20241007135843-2ca0bffa20a2 + github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc github.com/datarhei/gosrt v0.7.0 github.com/fsnotify/fsnotify v1.7.0 github.com/gin-gonic/gin v1.10.0 @@ -87,7 +87,7 @@ require ( github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.29.0 // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/go.sum b/go.sum index e3fb701d..6b60a551 100644 --- a/go.sum +++ b/go.sum @@ -31,12 +31,12 @@ github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwf github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= -github.com/bluenviron/gohlslib/v2 v2.0.0-20241003172246-076f27fbe0f8 h1:OQeYfxJg5otVKa33HWJ63E+IxCJ5Ty0qwCBPD2JcIso= -github.com/bluenviron/gohlslib/v2 v2.0.0-20241003172246-076f27fbe0f8/go.mod h1:DVvQIj+MjYydWuYDCgP+s0/GplDgUSpDNXCA/BVLhu4= -github.com/bluenviron/gortsplib/v4 v4.10.6 h1:KMvVcU21xxQQu1Jqn6D/z/FoIMn+QEKE1dBDWt4aWvg= -github.com/bluenviron/gortsplib/v4 v4.10.6/go.mod h1:/7C8qoGEsIQupuVw8YnXANpqBMNBpZ+51xFreLGiN2g= -github.com/bluenviron/mediacommon v1.12.4 h1:7VrA/W/iDB7VELquXqRjgjzUSJT3llZYgXjFN9WkByo= -github.com/bluenviron/mediacommon v1.12.4/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec= +github.com/bluenviron/gohlslib/v2 v2.0.0-20241007134735-ed88408cd4de h1:LPxwCQuLE89QMDfGw6qs6sfaRF1R6cmFVsh7uxcWR04= +github.com/bluenviron/gohlslib/v2 v2.0.0-20241007134735-ed88408cd4de/go.mod h1:MY9p+71KQFHNpEUzYATH9ZMVZfCkj6R+SaGFQEIEAS8= +github.com/bluenviron/gortsplib/v4 v4.10.7-0.20241007135843-2ca0bffa20a2 h1:oR8bRYadOQmb2wxBZau96zTsHocd+5vrQaM7B+sjRrk= +github.com/bluenviron/gortsplib/v4 v4.10.7-0.20241007135843-2ca0bffa20a2/go.mod h1:QN1e+ueF5aa8LNQUaBUBbNWUEUUidl3toldYp7rZuhU= +github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc h1:walYSlRh0oE5Vn+H8dHoZCAOX/XjPUhy9umlckpsn3k= +github.com/bluenviron/mediacommon v1.12.5-0.20241007134151-8883ba897cfc/go.mod h1:HDyW2CzjvhYJXtdxstdFPio3G0qSocPhqkhUt/qffec= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= @@ -292,8 +292,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/internal/core/versiongetter/main.go b/internal/core/versiongetter/main.go index 7918094b..91602ef5 100644 --- a/internal/core/versiongetter/main.go +++ b/internal/core/versiongetter/main.go @@ -34,7 +34,6 @@ func gitDescribeTags(repo *git.Repository) (string, error) { } return nil }) - if err != nil { return "", err } diff --git a/internal/formatprocessor/ac3.go b/internal/formatprocessor/ac3.go index f52c9fc8..587fa5e8 100644 --- a/internal/formatprocessor/ac3.go +++ b/internal/formatprocessor/ac3.go @@ -1,24 +1,33 @@ package formatprocessor import ( + "crypto/rand" "errors" "fmt" "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpac3" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" ) +func randUint32() (uint32, error) { + var b [4]byte + _, err := rand.Read(b[:]) + if err != nil { + return 0, err + } + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil +} + type formatProcessorAC3 struct { udpMaxPayloadSize int format *format.AC3 - timeEncoder *rtptime.Encoder encoder *rtpac3.Encoder decoder *rtpac3.Decoder + randomStart uint32 } func newAC3( @@ -37,10 +46,7 @@ func newAC3( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -65,9 +71,8 @@ func (t *formatProcessorAC3) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -76,7 +81,7 @@ func (t *formatProcessorAC3) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorAC3) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.AC3{ diff --git a/internal/formatprocessor/av1.go b/internal/formatprocessor/av1.go index e7646bdd..16741aca 100644 --- a/internal/formatprocessor/av1.go +++ b/internal/formatprocessor/av1.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -23,9 +22,9 @@ var ( type formatProcessorAV1 struct { udpMaxPayloadSize int format *format.AV1 - timeEncoder *rtptime.Encoder encoder *rtpav1.Encoder decoder *rtpav1.Decoder + randomStart uint32 } func newAV1( @@ -44,10 +43,7 @@ func newAV1( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -73,9 +69,8 @@ func (t *formatProcessorAV1) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -84,7 +79,7 @@ func (t *formatProcessorAV1) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorAV1) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.AV1{ diff --git a/internal/formatprocessor/g711.go b/internal/formatprocessor/g711.go index e9644de3..a7b63925 100644 --- a/internal/formatprocessor/g711.go +++ b/internal/formatprocessor/g711.go @@ -6,7 +6,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -15,9 +14,9 @@ import ( type formatProcessorG711 struct { udpMaxPayloadSize int format *format.G711 - timeEncoder *rtptime.Encoder encoder *rtplpcm.Encoder decoder *rtplpcm.Decoder + randomStart uint32 } func newG711( @@ -36,10 +35,7 @@ func newG711( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -67,9 +63,8 @@ func (t *formatProcessorG711) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -78,7 +73,7 @@ func (t *formatProcessorG711) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorG711) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.G711{ diff --git a/internal/formatprocessor/generic.go b/internal/formatprocessor/generic.go index c5bca81d..64380685 100644 --- a/internal/formatprocessor/generic.go +++ b/internal/formatprocessor/generic.go @@ -35,7 +35,7 @@ func (t *formatProcessorGeneric) ProcessUnit(_ unit.Unit) error { func (t *formatProcessorGeneric) ProcessRTPPacket( pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, _ bool, ) (unit.Unit, error) { u := &unit.Generic{ diff --git a/internal/formatprocessor/h264.go b/internal/formatprocessor/h264.go index d43726e4..4da80ca2 100644 --- a/internal/formatprocessor/h264.go +++ b/internal/formatprocessor/h264.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/pion/rtp" @@ -85,9 +84,9 @@ func rtpH264ExtractParams(payload []byte) ([]byte, []byte) { type formatProcessorH264 struct { udpMaxPayloadSize int format *format.H264 - timeEncoder *rtptime.Encoder encoder *rtph264.Encoder decoder *rtph264.Decoder + randomStart uint32 } func newH264( @@ -106,10 +105,7 @@ func newH264( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -246,9 +242,8 @@ func (t *formatProcessorH264) ProcessUnit(uu unit.Unit) error { } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } } @@ -258,7 +253,7 @@ func (t *formatProcessorH264) ProcessUnit(uu unit.Unit) error { func (t *formatProcessorH264) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.H264{ diff --git a/internal/formatprocessor/h265.go b/internal/formatprocessor/h265.go index 9f433168..79dd7e81 100644 --- a/internal/formatprocessor/h265.go +++ b/internal/formatprocessor/h265.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtph265" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/mediacommon/pkg/codecs/h265" "github.com/pion/rtp" @@ -105,9 +104,9 @@ func rtpH265ExtractParams(payload []byte) ([]byte, []byte, []byte) { type formatProcessorH265 struct { udpMaxPayloadSize int format *format.H265 - timeEncoder *rtptime.Encoder encoder *rtph265.Encoder decoder *rtph265.Decoder + randomStart uint32 } func newH265( @@ -126,10 +125,7 @@ func newH265( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -278,9 +274,8 @@ func (t *formatProcessorH265) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } } @@ -290,7 +285,7 @@ func (t *formatProcessorH265) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorH265) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.H265{ diff --git a/internal/formatprocessor/lpcm.go b/internal/formatprocessor/lpcm.go index f326fd70..f8a4fed0 100644 --- a/internal/formatprocessor/lpcm.go +++ b/internal/formatprocessor/lpcm.go @@ -6,7 +6,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtplpcm" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -15,9 +14,9 @@ import ( type formatProcessorLPCM struct { udpMaxPayloadSize int format *format.LPCM - timeEncoder *rtptime.Encoder encoder *rtplpcm.Encoder decoder *rtplpcm.Decoder + randomStart uint32 } func newLPCM( @@ -36,10 +35,7 @@ func newLPCM( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -67,9 +63,8 @@ func (t *formatProcessorLPCM) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -78,7 +73,7 @@ func (t *formatProcessorLPCM) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorLPCM) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.LPCM{ diff --git a/internal/formatprocessor/mjpeg.go b/internal/formatprocessor/mjpeg.go index 5071b861..651ea6df 100644 --- a/internal/formatprocessor/mjpeg.go +++ b/internal/formatprocessor/mjpeg.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmjpeg" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -16,9 +15,9 @@ import ( type formatProcessorMJPEG struct { udpMaxPayloadSize int format *format.MJPEG - timeEncoder *rtptime.Encoder encoder *rtpmjpeg.Encoder decoder *rtpmjpeg.Decoder + randomStart uint32 } func newMJPEG( @@ -37,10 +36,7 @@ func newMJPEG( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -66,9 +62,8 @@ func (t *formatProcessorMJPEG) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -77,7 +72,7 @@ func (t *formatProcessorMJPEG) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorMJPEG) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.MJPEG{ diff --git a/internal/formatprocessor/mpeg1_audio.go b/internal/formatprocessor/mpeg1_audio.go index b3428abe..0664c30a 100644 --- a/internal/formatprocessor/mpeg1_audio.go +++ b/internal/formatprocessor/mpeg1_audio.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1audio" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -16,9 +15,9 @@ import ( type formatProcessorMPEG1Audio struct { udpMaxPayloadSize int format *format.MPEG1Audio - timeEncoder *rtptime.Encoder encoder *rtpmpeg1audio.Encoder decoder *rtpmpeg1audio.Decoder + randomStart uint32 } func newMPEG1Audio( @@ -37,10 +36,7 @@ func newMPEG1Audio( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -65,9 +61,8 @@ func (t *formatProcessorMPEG1Audio) ProcessUnit(uu unit.Unit) error { //nolint:d } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -76,7 +71,7 @@ func (t *formatProcessorMPEG1Audio) ProcessUnit(uu unit.Unit) error { //nolint:d func (t *formatProcessorMPEG1Audio) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.MPEG1Audio{ diff --git a/internal/formatprocessor/mpeg1_video.go b/internal/formatprocessor/mpeg1_video.go index 5bf0ef10..145282b3 100644 --- a/internal/formatprocessor/mpeg1_video.go +++ b/internal/formatprocessor/mpeg1_video.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg1video" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -25,9 +24,9 @@ var ( type formatProcessorMPEG1Video struct { udpMaxPayloadSize int format *format.MPEG1Video - timeEncoder *rtptime.Encoder encoder *rtpmpeg1video.Encoder decoder *rtpmpeg1video.Decoder + randomStart uint32 } func newMPEG1Video( @@ -46,10 +45,7 @@ func newMPEG1Video( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -75,9 +71,8 @@ func (t *formatProcessorMPEG1Video) ProcessUnit(uu unit.Unit) error { //nolint:d } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -86,7 +81,7 @@ func (t *formatProcessorMPEG1Video) ProcessUnit(uu unit.Unit) error { //nolint:d func (t *formatProcessorMPEG1Video) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.MPEG1Video{ diff --git a/internal/formatprocessor/mpeg4_audio.go b/internal/formatprocessor/mpeg4_audio.go index f91ba1c1..c6880e99 100644 --- a/internal/formatprocessor/mpeg4_audio.go +++ b/internal/formatprocessor/mpeg4_audio.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4audio" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -16,9 +15,9 @@ import ( type formatProcessorMPEG4Audio struct { udpMaxPayloadSize int format *format.MPEG4Audio - timeEncoder *rtptime.Encoder encoder *rtpmpeg4audio.Encoder decoder *rtpmpeg4audio.Decoder + randomStart uint32 } func newMPEG4Audio( @@ -37,10 +36,7 @@ func newMPEG4Audio( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -69,9 +65,8 @@ func (t *formatProcessorMPEG4Audio) ProcessUnit(uu unit.Unit) error { //nolint:d } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -80,7 +75,7 @@ func (t *formatProcessorMPEG4Audio) ProcessUnit(uu unit.Unit) error { //nolint:d func (t *formatProcessorMPEG4Audio) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.MPEG4Audio{ diff --git a/internal/formatprocessor/mpeg4_video.go b/internal/formatprocessor/mpeg4_video.go index c646f23f..796b4dd7 100644 --- a/internal/formatprocessor/mpeg4_video.go +++ b/internal/formatprocessor/mpeg4_video.go @@ -8,7 +8,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpmpeg4video" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video" "github.com/pion/rtp" @@ -30,9 +29,9 @@ var ( type formatProcessorMPEG4Video struct { udpMaxPayloadSize int format *format.MPEG4Video - timeEncoder *rtptime.Encoder encoder *rtpmpeg4video.Encoder decoder *rtpmpeg4video.Decoder + randomStart uint32 } func newMPEG4Video( @@ -51,10 +50,7 @@ func newMPEG4Video( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -115,9 +111,8 @@ func (t *formatProcessorMPEG4Video) ProcessUnit(uu unit.Unit) error { //nolint:d return err } - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } u.RTPPackets = pkts @@ -129,7 +124,7 @@ func (t *formatProcessorMPEG4Video) ProcessUnit(uu unit.Unit) error { //nolint:d func (t *formatProcessorMPEG4Video) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.MPEG4Video{ diff --git a/internal/formatprocessor/opus.go b/internal/formatprocessor/opus.go index 464f2d13..eed02057 100644 --- a/internal/formatprocessor/opus.go +++ b/internal/formatprocessor/opus.go @@ -6,7 +6,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpsimpleaudio" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/bluenviron/mediacommon/pkg/codecs/opus" "github.com/pion/rtp" @@ -16,9 +15,9 @@ import ( type formatProcessorOpus struct { udpMaxPayloadSize int format *format.Opus - timeEncoder *rtptime.Encoder encoder *rtpsimpleaudio.Encoder decoder *rtpsimpleaudio.Decoder + randomStart uint32 } func newOpus( @@ -37,10 +36,7 @@ func newOpus( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -69,11 +65,10 @@ func (t *formatProcessorOpus) ProcessUnit(uu unit.Unit) error { //nolint:dupl return err } - ts := t.timeEncoder.Encode(pts) - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(pts) rtpPackets = append(rtpPackets, pkt) - pts += opus.PacketDuration(packet) + pts += int64(opus.PacketDuration(packet)) * int64(t.format.ClockRate()) / int64(time.Second) } u.RTPPackets = rtpPackets @@ -84,7 +79,7 @@ func (t *formatProcessorOpus) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorOpus) ProcessRTPPacket( pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.Opus{ diff --git a/internal/formatprocessor/processor.go b/internal/formatprocessor/processor.go index 2d1ec256..95f93aad 100644 --- a/internal/formatprocessor/processor.go +++ b/internal/formatprocessor/processor.go @@ -19,7 +19,7 @@ type Processor interface { ProcessRTPPacket( pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) } diff --git a/internal/formatprocessor/vp8.go b/internal/formatprocessor/vp8.go index 17ca15fb..3be605ca 100644 --- a/internal/formatprocessor/vp8.go +++ b/internal/formatprocessor/vp8.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -16,9 +15,9 @@ import ( type formatProcessorVP8 struct { udpMaxPayloadSize int format *format.VP8 - timeEncoder *rtptime.Encoder encoder *rtpvp8.Encoder decoder *rtpvp8.Decoder + randomStart uint32 } func newVP8( @@ -37,10 +36,7 @@ func newVP8( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -66,9 +62,8 @@ func (t *formatProcessorVP8) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -77,7 +72,7 @@ func (t *formatProcessorVP8) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorVP8) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.VP8{ diff --git a/internal/formatprocessor/vp9.go b/internal/formatprocessor/vp9.go index 079bedcc..ee638b02 100644 --- a/internal/formatprocessor/vp9.go +++ b/internal/formatprocessor/vp9.go @@ -7,7 +7,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9" - "github.com/bluenviron/gortsplib/v4/pkg/rtptime" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/unit" @@ -16,9 +15,9 @@ import ( type formatProcessorVP9 struct { udpMaxPayloadSize int format *format.VP9 - timeEncoder *rtptime.Encoder encoder *rtpvp9.Encoder decoder *rtpvp9.Decoder + randomStart uint32 } func newVP9( @@ -37,10 +36,7 @@ func newVP9( return nil, err } - t.timeEncoder = &rtptime.Encoder{ - ClockRate: forma.ClockRate(), - } - err = t.timeEncoder.Initialize() + t.randomStart, err = randUint32() if err != nil { return nil, err } @@ -66,9 +62,8 @@ func (t *formatProcessorVP9) ProcessUnit(uu unit.Unit) error { //nolint:dupl } u.RTPPackets = pkts - ts := t.timeEncoder.Encode(u.PTS) for _, pkt := range u.RTPPackets { - pkt.Timestamp += ts + pkt.Timestamp += t.randomStart + uint32(u.PTS) } return nil @@ -77,7 +72,7 @@ func (t *formatProcessorVP9) ProcessUnit(uu unit.Unit) error { //nolint:dupl func (t *formatProcessorVP9) ProcessRTPPacket( //nolint:dupl pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, hasNonRTSPReaders bool, ) (unit.Unit, error) { u := &unit.VP9{ diff --git a/internal/protocols/hls/from_stream.go b/internal/protocols/hls/from_stream.go index f08facb3..96323d75 100644 --- a/internal/protocols/hls/from_stream.go +++ b/internal/protocols/hls/from_stream.go @@ -39,7 +39,10 @@ func setupVideoTrack( videoMedia := strea.Desc().FindFormat(&videoFormatAV1) if videoFormatAV1 != nil { - track := &gohlslib.Track{Codec: &codecs.AV1{}} + track := &gohlslib.Track{ + Codec: &codecs.AV1{}, + ClockRate: videoFormatAV1.ClockRate(), + } addTrack( videoMedia, @@ -52,7 +55,11 @@ func setupVideoTrack( return nil } - err := muxer.WriteAV1(track, tunit.NTP, tunit.PTS, tunit.TU) + err := muxer.WriteAV1( + track, + tunit.NTP, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate + tunit.TU) if err != nil { return fmt.Errorf("muxer error: %w", err) } @@ -67,7 +74,10 @@ func setupVideoTrack( videoMedia = strea.Desc().FindFormat(&videoFormatVP9) if videoFormatVP9 != nil { - track := &gohlslib.Track{Codec: &codecs.VP9{}} + track := &gohlslib.Track{ + Codec: &codecs.VP9{}, + ClockRate: videoFormatVP9.ClockRate(), + } addTrack( videoMedia, @@ -80,7 +90,11 @@ func setupVideoTrack( return nil } - err := muxer.WriteVP9(track, tunit.NTP, tunit.PTS, tunit.Frame) + err := muxer.WriteVP9( + track, + tunit.NTP, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate + tunit.Frame) if err != nil { return fmt.Errorf("muxer error: %w", err) } @@ -96,11 +110,14 @@ func setupVideoTrack( if videoFormatH265 != nil { vps, sps, pps := videoFormatH265.SafeParams() - track := &gohlslib.Track{Codec: &codecs.H265{ - VPS: vps, - SPS: sps, - PPS: pps, - }} + track := &gohlslib.Track{ + Codec: &codecs.H265{ + VPS: vps, + SPS: sps, + PPS: pps, + }, + ClockRate: videoFormatH265.ClockRate(), + } addTrack( videoMedia, @@ -113,7 +130,11 @@ func setupVideoTrack( return nil } - err := muxer.WriteH265(track, tunit.NTP, tunit.PTS, tunit.AU) + err := muxer.WriteH265( + track, + tunit.NTP, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate + tunit.AU) if err != nil { return fmt.Errorf("muxer error: %w", err) } @@ -129,10 +150,13 @@ func setupVideoTrack( if videoFormatH264 != nil { sps, pps := videoFormatH264.SafeParams() - track := &gohlslib.Track{Codec: &codecs.H264{ - SPS: sps, - PPS: pps, - }} + track := &gohlslib.Track{ + Codec: &codecs.H264{ + SPS: sps, + PPS: pps, + }, + ClockRate: videoFormatH264.ClockRate(), + } addTrack( videoMedia, @@ -145,7 +169,11 @@ func setupVideoTrack( return nil } - err := muxer.WriteH264(track, tunit.NTP, tunit.PTS, tunit.AU) + err := muxer.WriteH264( + track, + tunit.NTP, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate + tunit.AU) if err != nil { return fmt.Errorf("muxer error: %w", err) } @@ -178,9 +206,12 @@ func setupAudioTracks( for _, forma := range media.Formats { switch forma := forma.(type) { case *format.Opus: - track := &gohlslib.Track{Codec: &codecs.Opus{ - ChannelCount: forma.ChannelCount, - }} + track := &gohlslib.Track{ + Codec: &codecs.Opus{ + ChannelCount: forma.ChannelCount, + }, + ClockRate: forma.ClockRate(), + } addTrack( media, @@ -192,7 +223,7 @@ func setupAudioTracks( err := muxer.WriteOpus( track, tunit.NTP, - tunit.PTS, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate tunit.Packets) if err != nil { return fmt.Errorf("muxer error: %w", err) @@ -204,9 +235,12 @@ func setupAudioTracks( case *format.MPEG4Audio: co := forma.GetConfig() if co != nil { - track := &gohlslib.Track{Codec: &codecs.MPEG4Audio{ - Config: *co, - }} + track := &gohlslib.Track{ + Codec: &codecs.MPEG4Audio{ + Config: *co, + }, + ClockRate: forma.ClockRate(), + } addTrack( media, @@ -222,7 +256,7 @@ func setupAudioTracks( err := muxer.WriteMPEG4Audio( track, tunit.NTP, - tunit.PTS, + tunit.PTS, // no conversion is needed since we set gohlslib.Track.ClockRate = format.ClockRate tunit.AUs) if err != nil { return fmt.Errorf("muxer error: %w", err) diff --git a/internal/protocols/hls/to_stream.go b/internal/protocols/hls/to_stream.go index 9b003e18..28ef7ecb 100644 --- a/internal/protocols/hls/to_stream.go +++ b/internal/protocols/hls/to_stream.go @@ -11,6 +11,12 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + // ToStream maps a HLS stream to a MediaMTX stream. func ToStream( c *gohlslib.Client, @@ -21,6 +27,7 @@ func ToStream( for _, track := range tracks { var medi *description.Media + clockRate := track.ClockRate switch tcodec := track.Codec.(type) { case *codecs.AV1: @@ -31,11 +38,11 @@ func ToStream( }}, } - c.OnDataAV1(track, func(pts time.Duration, tu [][]byte) { + c.OnDataAV1(track, func(pts int64, tu [][]byte) { (*stream).WriteUnit(medi, medi.Formats[0], &unit.AV1{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, TU: tu, }) @@ -49,37 +56,16 @@ func ToStream( }}, } - c.OnDataVP9(track, func(pts time.Duration, frame []byte) { + c.OnDataVP9(track, func(pts int64, frame []byte) { (*stream).WriteUnit(medi, medi.Formats[0], &unit.VP9{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, Frame: frame, }) }) - case *codecs.H264: - medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H264{ - PayloadTyp: 96, - PacketizationMode: 1, - SPS: tcodec.SPS, - PPS: tcodec.PPS, - }}, - } - - c.OnDataH26x(track, func(pts time.Duration, _ time.Duration, au [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.H264{ - Base: unit.Base{ - NTP: time.Now(), - PTS: pts, - }, - AU: au, - }) - }) - case *codecs.H265: medi = &description.Media{ Type: description.MediaTypeVideo, @@ -91,16 +77,56 @@ func ToStream( }}, } - c.OnDataH26x(track, func(pts time.Duration, _ time.Duration, au [][]byte) { + c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) { (*stream).WriteUnit(medi, medi.Formats[0], &unit.H265{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, AU: au, }) }) + case *codecs.H264: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + SPS: tcodec.SPS, + PPS: tcodec.PPS, + }}, + } + + c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.H264{ + Base: unit.Base{ + NTP: time.Now(), + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + }, + AU: au, + }) + }) + + case *codecs.Opus: + medi = &description.Media{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{ + PayloadTyp: 96, + ChannelCount: tcodec.ChannelCount, + }}, + } + + c.OnDataOpus(track, func(pts int64, packets [][]byte) { + (*stream).WriteUnit(medi, medi.Formats[0], &unit.Opus{ + Base: unit.Base{ + NTP: time.Now(), + PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), int64(clockRate)), + }, + Packets: packets, + }) + }) + case *codecs.MPEG4Audio: medi = &description.Media{ Type: description.MediaTypeAudio, @@ -113,35 +139,16 @@ func ToStream( }}, } - c.OnDataMPEG4Audio(track, func(pts time.Duration, aus [][]byte) { + c.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) { (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4Audio{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), int64(clockRate)), }, AUs: aus, }) }) - case *codecs.Opus: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.Opus{ - PayloadTyp: 96, - ChannelCount: tcodec.ChannelCount, - }}, - } - - c.OnDataOpus(track, func(pts time.Duration, packets [][]byte) { - (*stream).WriteUnit(medi, medi.Formats[0], &unit.Opus{ - Base: unit.Base{ - NTP: time.Now(), - PTS: pts, - }, - Packets: packets, - }) - }) - default: panic("should not happen") } diff --git a/internal/protocols/mpegts/from_stream.go b/internal/protocols/mpegts/from_stream.go index 7e343129..91925a1f 100644 --- a/internal/protocols/mpegts/from_stream.go +++ b/internal/protocols/mpegts/from_stream.go @@ -18,8 +18,10 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) } // FromStream maps a MediaMTX stream to a MPEG-TS writer. @@ -47,11 +49,13 @@ func FromStream( for _, media := range strea.Desc().Medias { for _, forma := range media.Formats { + clockRate := forma.ClockRate() + switch forma := forma.(type) { case *format.H265: //nolint:dupl track := &mcmpegts.Track{Codec: &mcmpegts.CodecH265{}} - var dtsExtractor *h265.DTSExtractor + var dtsExtractor *h265.DTSExtractor2 addTrack( media, @@ -69,7 +73,7 @@ func FromStream( if !randomAccess { return nil } - dtsExtractor = h265.NewDTSExtractor() + dtsExtractor = h265.NewDTSExtractor2() } dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) @@ -78,7 +82,12 @@ func FromStream( } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err = (*w).WriteH265(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + err = (*w).WriteH265( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + dts, + randomAccess, + tunit.AU) if err != nil { return err } @@ -88,7 +97,7 @@ func FromStream( case *format.H264: //nolint:dupl track := &mcmpegts.Track{Codec: &mcmpegts.CodecH264{}} - var dtsExtractor *h264.DTSExtractor + var dtsExtractor *h264.DTSExtractor2 addTrack( media, @@ -106,7 +115,7 @@ func FromStream( if !idrPresent { return nil } - dtsExtractor = h264.NewDTSExtractor() + dtsExtractor = h264.NewDTSExtractor2() } dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) @@ -115,7 +124,12 @@ func FromStream( } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err = (*w).WriteH264(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU) + err = (*w).WriteH264( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + dts, + idrPresent, + tunit.AU) if err != nil { return err } @@ -126,7 +140,7 @@ func FromStream( track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG4Video{}} firstReceived := false - var lastPTS time.Duration + var lastPTS int64 addTrack( media, @@ -146,7 +160,10 @@ func FromStream( lastPTS = tunit.PTS sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + err := (*w).WriteMPEG4Video( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + tunit.Frame) if err != nil { return err } @@ -157,7 +174,7 @@ func FromStream( track := &mcmpegts.Track{Codec: &mcmpegts.CodecMPEG1Video{}} firstReceived := false - var lastPTS time.Duration + var lastPTS int64 addTrack( media, @@ -177,7 +194,10 @@ func FromStream( lastPTS = tunit.PTS sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + err := (*w).WriteMPEG1Video( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + tunit.Frame) if err != nil { return err } @@ -200,7 +220,10 @@ func FromStream( } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + err := (*w).WriteOpus( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + tunit.Packets) if err != nil { return err } @@ -225,7 +248,10 @@ func FromStream( } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + err := (*w).WriteMPEG4Audio( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + tunit.AUs) if err != nil { return err } @@ -247,7 +273,10 @@ func FromStream( } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + err := (*w).WriteMPEG1Audio( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + tunit.Frames) if err != nil { return err } @@ -257,8 +286,6 @@ func FromStream( case *format.AC3: track := &mcmpegts.Track{Codec: &mcmpegts.CodecAC3{}} - sampleRate := time.Duration(forma.SampleRate) - addTrack( media, forma, @@ -270,11 +297,13 @@ func FromStream( } for i, frame := range tunit.Frames { - framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* - time.Second/sampleRate + framePTS := tunit.PTS + int64(i)*ac3.SamplesPerFrame sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err := (*w).WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + err := (*w).WriteAC3( + track, + multiplyAndDivide(framePTS, 90000, int64(clockRate)), + frame) if err != nil { return err } diff --git a/internal/protocols/mpegts/to_stream.go b/internal/protocols/mpegts/to_stream.go index 2a416bbe..d40c5b1e 100644 --- a/internal/protocols/mpegts/to_stream.go +++ b/internal/protocols/mpegts/to_stream.go @@ -27,13 +27,7 @@ func ToStream( var medias []*description.Media //nolint:prealloc var unsupportedTracks []int - var td *mpegts.TimeDecoder - decodeTime := func(t int64) time.Duration { - if td == nil { - td = mpegts.NewTimeDecoder(t) - } - return td.Decode(t) - } + td := mpegts.NewTimeDecoder2() for i, track := range r.Tracks() { //nolint:dupl var medi *description.Media @@ -48,10 +42,12 @@ func ToStream( } r.OnDataH265(track, func(pts int64, _ int64, au [][]byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.H265{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, AU: au, }) @@ -68,10 +64,12 @@ func ToStream( } r.OnDataH264(track, func(pts int64, _ int64, au [][]byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.H264{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, AU: au, }) @@ -87,10 +85,12 @@ func ToStream( } r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4Video{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, Frame: frame, }) @@ -104,10 +104,12 @@ func ToStream( } r.OnDataMPEGxVideo(track, func(pts int64, frame []byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG1Video{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, Frame: frame, }) @@ -124,10 +126,12 @@ func ToStream( } r.OnDataOpus(track, func(pts int64, packets [][]byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.Opus{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), }, Packets: packets, }) @@ -147,10 +151,12 @@ func ToStream( } r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4Audio{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), }, AUs: aus, }) @@ -164,10 +170,12 @@ func ToStream( } r.OnDataMPEG1Audio(track, func(pts int64, frames [][]byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG1Audio{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP }, Frames: frames, }) @@ -185,10 +193,12 @@ func ToStream( } r.OnDataAC3(track, func(pts int64, frame []byte) error { + pts = td.Decode(pts) + (*stream).WriteUnit(medi, medi.Formats[0], &unit.AC3{ Base: unit.Base{ NTP: time.Now(), - PTS: decodeTime(pts), + PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), 90000), }, Frames: [][]byte{frame}, }) diff --git a/internal/protocols/rtmp/from_stream.go b/internal/protocols/rtmp/from_stream.go index e3414bd5..e55975ec 100644 --- a/internal/protocols/rtmp/from_stream.go +++ b/internal/protocols/rtmp/from_stream.go @@ -18,6 +18,16 @@ import ( var errNoSupportedCodecsFrom = errors.New( "the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio") +func multiplyAndDivide2(v, m, d time.Duration) time.Duration { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +func timestampToDuration(t int64, clockRate int) time.Duration { + return multiplyAndDivide2(time.Duration(t), time.Second, time.Duration(clockRate)) +} + func setupVideo( strea *stream.Stream, reader stream.Reader, @@ -29,7 +39,7 @@ func setupVideo( videoMedia := strea.Desc().FindFormat(&videoFormatH264) if videoFormatH264 != nil { - var videoDTSExtractor *h264.DTSExtractor + var videoDTSExtractor *h264.DTSExtractor2 strea.AddReader( reader, @@ -56,35 +66,28 @@ func setupVideo( } } - var dts time.Duration - // wait until we receive an IDR if videoDTSExtractor == nil { if !idrPresent { return nil } - videoDTSExtractor = h264.NewDTSExtractor() + videoDTSExtractor = h264.NewDTSExtractor2() + } else if !idrPresent && !nonIDRPresent { + return nil + } - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } - } else { - if !idrPresent && !nonIDRPresent { - return nil - } - - var err error - dts, err = videoDTSExtractor.Extract(tunit.AU, tunit.PTS) - if err != nil { - return err - } + dts, err := videoDTSExtractor.Extract(tunit.AU, tunit.PTS) + if err != nil { + return err } nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU) + return (*w).WriteH264( + timestampToDuration(tunit.PTS, videoFormatH264.ClockRate()), + timestampToDuration(dts, videoFormatH264.ClockRate()), + idrPresent, + tunit.AU) }) return videoFormatH264 @@ -116,10 +119,11 @@ func setupAudio( } for i, au := range tunit.AUs { + pts := tunit.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit + nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) err := (*w).WriteMPEG4Audio( - tunit.PTS+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/time.Duration(audioFormatMPEG4Audio.ClockRate()), + timestampToDuration(pts, audioFormatMPEG4Audio.ClockRate()), au, ) if err != nil { @@ -158,13 +162,16 @@ func setupAudio( } nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) - err = (*w).WriteMPEG1Audio(pts, &h, frame) + err = (*w).WriteMPEG1Audio( + timestampToDuration(pts, audioFormatMPEG1.ClockRate()), + &h, + frame) if err != nil { return err } - pts += time.Duration(h.SampleCount()) * - time.Second / time.Duration(h.SampleRate) + pts += int64(h.SampleCount()) * + int64(audioFormatMPEG1.ClockRate()) / int64(h.SampleRate) } return nil diff --git a/internal/protocols/rtmp/to_stream.go b/internal/protocols/rtmp/to_stream.go index 1853f9ca..4b8f8b0c 100644 --- a/internal/protocols/rtmp/to_stream.go +++ b/internal/protocols/rtmp/to_stream.go @@ -14,6 +14,16 @@ var errNoSupportedCodecsTo = errors.New( "the stream doesn't contain any supported codec, which are currently " + "AV1, VP9, H265, H264, MPEG-4 Audio, MPEG-1/2 Audio, G711, LPCM") +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +func durationToTimestamp(d time.Duration, clockRate int) int64 { + return multiplyAndDivide(int64(d), int64(clockRate), int64(time.Second)) +} + // ToStream maps a RTMP stream to a MediaMTX stream. func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { videoFormat, audioFormat := r.Tracks() @@ -33,7 +43,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, videoFormat, &unit.AV1{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, videoFormat.ClockRate()), }, TU: tu, }) @@ -44,7 +54,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, videoFormat, &unit.VP9{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, videoFormat.ClockRate()), }, Frame: frame, }) @@ -55,7 +65,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, videoFormat, &unit.H265{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, videoFormat.ClockRate()), }, AU: au, }) @@ -66,7 +76,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, videoFormat, &unit.H264{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, videoFormat.ClockRate()), }, AU: au, }) @@ -90,7 +100,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, audioFormat, &unit.MPEG4Audio{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, audioFormat.ClockRate()), }, AUs: [][]byte{au}, }) @@ -101,7 +111,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, audioFormat, &unit.MPEG1Audio{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, audioFormat.ClockRate()), }, Frames: [][]byte{frame}, }) @@ -112,7 +122,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, audioFormat, &unit.G711{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, audioFormat.ClockRate()), }, Samples: samples, }) @@ -123,7 +133,7 @@ func ToStream(r *Reader, stream **stream.Stream) ([]*description.Media, error) { (*stream).WriteUnit(medi, audioFormat, &unit.LPCM{ Base: unit.Base{ NTP: time.Now(), - PTS: pts, + PTS: durationToTimestamp(pts, audioFormat.ClockRate()), }, Samples: samples, }) diff --git a/internal/protocols/webrtc/from_stream.go b/internal/protocols/webrtc/from_stream.go index 7411eb73..2250e85f 100644 --- a/internal/protocols/webrtc/from_stream.go +++ b/internal/protocols/webrtc/from_stream.go @@ -4,7 +4,6 @@ import ( "crypto/rand" "errors" "fmt" - "time" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1" @@ -213,7 +212,7 @@ func setupVideoTrack( } firstReceived := false - var lastPTS time.Duration + var lastPTS int64 stream.AddReader( reader, diff --git a/internal/protocols/webrtc/to_stream.go b/internal/protocols/webrtc/to_stream.go index 43d05b3a..10978fb4 100644 --- a/internal/protocols/webrtc/to_stream.go +++ b/internal/protocols/webrtc/to_stream.go @@ -24,7 +24,7 @@ func ToStream( stream **stream.Stream, ) ([]*description.Media, error) { var medias []*description.Media //nolint:prealloc - timeDecoder := rtptime.NewGlobalDecoder() + timeDecoder := rtptime.NewGlobalDecoder2() for _, track := range pc.incomingTracks { var typ description.MediaType diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index c152c4ec..95c4e73d 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -25,13 +25,6 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) -func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { - timeScale64 := uint64(timeScale) - secs := v / time.Second - dec := v % time.Second - return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second) -} - func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int { switch cm { case mpeg1audio.ChannelModeStereo, @@ -144,6 +137,8 @@ func (f *formatFMP4) initialize() { for _, media := range f.ai.agent.Stream.Desc().Medias { for _, forma := range media.Formats { + clockRate := forma.ClockRate() + switch forma := forma.(type) { case *rtspformat.AV1: codec := &fmp4.CodecAV1{ @@ -298,7 +293,7 @@ func (f *formatFMP4) initialize() { } track := addTrack(forma, codec) - var dtsExtractor *h265.DTSExtractor + var dtsExtractor *h265.DTSExtractor2 f.ai.agent.Stream.AddReader( f.ai, @@ -343,7 +338,7 @@ func (f *formatFMP4) initialize() { if !randomAccess { return nil } - dtsExtractor = h265.NewDTSExtractor() + dtsExtractor = h265.NewDTSExtractor2() } dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) @@ -352,7 +347,7 @@ func (f *formatFMP4) initialize() { } sampl, err := fmp4.NewPartSampleH26x( - int32(durationGoToMp4(tunit.PTS-dts, 90000)), + int32(tunit.PTS-dts), randomAccess, tunit.AU) if err != nil { @@ -380,7 +375,7 @@ func (f *formatFMP4) initialize() { } track := addTrack(forma, codec) - var dtsExtractor *h264.DTSExtractor + var dtsExtractor *h264.DTSExtractor2 f.ai.agent.Stream.AddReader( f.ai, @@ -418,7 +413,7 @@ func (f *formatFMP4) initialize() { if !randomAccess { return nil } - dtsExtractor = h264.NewDTSExtractor() + dtsExtractor = h264.NewDTSExtractor2() } dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) @@ -427,7 +422,7 @@ func (f *formatFMP4) initialize() { } sampl, err := fmp4.NewPartSampleH26x( - int32(durationGoToMp4(tunit.PTS-dts, 90000)), + int32(tunit.PTS-dts), randomAccess, tunit.AU) if err != nil { @@ -454,7 +449,7 @@ func (f *formatFMP4) initialize() { track := addTrack(forma, codec) firstReceived := false - var lastPTS time.Duration + var lastPTS int64 f.ai.agent.Stream.AddReader( f.ai, @@ -507,7 +502,7 @@ func (f *formatFMP4) initialize() { track := addTrack(forma, codec) firstReceived := false - var lastPTS time.Duration + var lastPTS int64 f.ai.agent.Stream.AddReader( f.ai, @@ -608,21 +603,21 @@ func (f *formatFMP4) initialize() { return nil } - var dt time.Duration + pts := tunit.PTS for _, packet := range tunit.Packets { err := track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: packet, }, - dts: tunit.PTS + dt, - ntp: tunit.NTP.Add(dt), + dts: pts, + ntp: tunit.NTP.Add(timestampToDuration(pts, clockRate)), }) if err != nil { return err } - dt += opus.PacketDuration(packet) + pts += int64(opus.PacketDuration(packet)) * int64(clockRate) / int64(time.Second) } return nil @@ -636,8 +631,6 @@ func (f *formatFMP4) initialize() { } track := addTrack(forma, codec) - sampleRate := time.Duration(forma.ClockRate()) - f.ai.agent.Stream.AddReader( f.ai, media, @@ -649,15 +642,14 @@ func (f *formatFMP4) initialize() { } for i, au := range tunit.AUs { - dt := time.Duration(i) * mpeg4audio.SamplesPerAccessUnit * - time.Second / sampleRate + pts := tunit.PTS + int64(i)*mpeg4audio.SamplesPerAccessUnit err := track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: au, }, - dts: tunit.PTS + dt, - ntp: tunit.NTP.Add(dt), + dts: pts, + ntp: tunit.NTP.Add(timestampToDuration(pts, clockRate)), }) if err != nil { return err @@ -772,15 +764,14 @@ func (f *formatFMP4) initialize() { updateCodecs() } - dt := time.Duration(i) * time.Duration(ac3.SamplesPerFrame) * - time.Second / time.Duration(codec.SampleRate) + pts := tunit.PTS + int64(i)*ac3.SamplesPerFrame err = track.write(&sample{ PartSample: &fmp4.PartSample{ Payload: frame, }, - dts: tunit.PTS + dt, - ntp: tunit.NTP.Add(dt), + dts: pts, + ntp: tunit.NTP.Add(timestampToDuration(pts, clockRate)), }) if err != nil { return err @@ -881,8 +872,9 @@ func (f *formatFMP4) initialize() { func (f *formatFMP4) close() { if f.currentSegment != nil { for _, track := range f.tracks { - if track.nextSample != nil && track.nextSample.dts > f.currentSegment.lastDTS { - f.currentSegment.lastDTS = track.nextSample.dts + if track.nextSample != nil && + timestampToDuration(track.nextSample.dts, int(track.initTrack.TimeScale)) > f.currentSegment.lastDTS { + f.currentSegment.lastDTS = timestampToDuration(track.nextSample.dts, int(track.initTrack.TimeScale)) } } diff --git a/internal/recorder/format_fmp4_part.go b/internal/recorder/format_fmp4_part.go index b8ed532d..f2f83c53 100644 --- a/internal/recorder/format_fmp4_part.go +++ b/internal/recorder/format_fmp4_part.go @@ -82,18 +82,19 @@ func (p *formatFMP4Part) close() error { return writePart(p.s.fi, p.sequenceNumber, p.partTracks) } -func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample) error { +func (p *formatFMP4Part) write(track *formatFMP4Track, sample *sample, dtsDuration time.Duration) error { partTrack, ok := p.partTracks[track] if !ok { partTrack = &fmp4.PartTrack{ - ID: track.initTrack.ID, - BaseTime: durationGoToMp4(sample.dts-p.s.startDTS, track.initTrack.TimeScale), + ID: track.initTrack.ID, + BaseTime: uint64(multiplyAndDivide(int64(dtsDuration-p.s.startDTS), + int64(track.initTrack.TimeScale), int64(time.Second))), } p.partTracks[track] = partTrack } partTrack.Samples = append(partTrack.Samples, sample.PartSample) - p.endDTS = sample.dts + p.endDTS = dtsDuration return nil } diff --git a/internal/recorder/format_fmp4_segment.go b/internal/recorder/format_fmp4_segment.go index 5c1d1519..3fa7c3b3 100644 --- a/internal/recorder/format_fmp4_segment.go +++ b/internal/recorder/format_fmp4_segment.go @@ -69,14 +69,14 @@ func (s *formatFMP4Segment) close() error { return err } -func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample) error { - s.lastDTS = sample.dts +func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample, dtsDuration time.Duration) error { + s.lastDTS = dtsDuration if s.curPart == nil { s.curPart = &formatFMP4Part{ s: s, sequenceNumber: s.f.nextSequenceNumber, - startDTS: sample.dts, + startDTS: dtsDuration, } s.curPart.initialize() s.f.nextSequenceNumber++ @@ -91,11 +91,11 @@ func (s *formatFMP4Segment) write(track *formatFMP4Track, sample *sample) error s.curPart = &formatFMP4Part{ s: s, sequenceNumber: s.f.nextSequenceNumber, - startDTS: sample.dts, + startDTS: dtsDuration, } s.curPart.initialize() s.f.nextSequenceNumber++ } - return s.curPart.write(track, sample) + return s.curPart.write(track, sample, dtsDuration) } diff --git a/internal/recorder/format_fmp4_track.go b/internal/recorder/format_fmp4_track.go index 537fd829..c3fe1214 100644 --- a/internal/recorder/format_fmp4_track.go +++ b/internal/recorder/format_fmp4_track.go @@ -21,29 +21,33 @@ func (t *formatFMP4Track) write(sample *sample) error { if sample == nil { return nil } - sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale)) + sample.Duration = uint32(t.nextSample.dts - sample.dts) + + dtsDuration := timestampToDuration(sample.dts, int(t.initTrack.TimeScale)) if t.f.currentSegment == nil { t.f.currentSegment = &formatFMP4Segment{ f: t.f, - startDTS: sample.dts, + startDTS: dtsDuration, startNTP: sample.ntp, } t.f.currentSegment.initialize() // BaseTime is negative, this is not supported by fMP4. Reject the sample silently. - } else if (sample.dts - t.f.currentSegment.startDTS) < 0 { + } else if (dtsDuration - t.f.currentSegment.startDTS) < 0 { return nil } - err := t.f.currentSegment.write(t, sample) + err := t.f.currentSegment.write(t, sample, dtsDuration) if err != nil { return err } + nextDTSDuration := timestampToDuration(t.nextSample.dts, int(t.initTrack.TimeScale)) + if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) && !t.nextSample.IsNonSyncSample && - (t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.ai.agent.SegmentDuration { - t.f.currentSegment.lastDTS = t.nextSample.dts + (nextDTSDuration-t.f.currentSegment.startDTS) >= t.f.ai.agent.SegmentDuration { + t.f.currentSegment.lastDTS = nextDTSDuration err := t.f.currentSegment.close() if err != nil { return err @@ -51,7 +55,7 @@ func (t *formatFMP4Track) write(sample *sample) error { t.f.currentSegment = &formatFMP4Segment{ f: t.f, - startDTS: t.nextSample.dts, + startDTS: nextDTSDuration, startNTP: t.nextSample.ntp, } t.f.currentSegment.initialize() diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index aa201c62..96d3d089 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -23,8 +23,20 @@ const ( mpegtsMaxBufferSize = 64 * 1024 ) -func durationGoToMPEGTS(v time.Duration) int64 { - return int64(v.Seconds() * 90000) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +func multiplyAndDivide2(v, m, d time.Duration) time.Duration { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +func timestampToDuration(t int64, clockRate int) time.Duration { + return multiplyAndDivide2(time.Duration(t), time.Second, time.Duration(clockRate)) } type dynamicWriter struct { @@ -67,11 +79,13 @@ func (f *formatMPEGTS) initialize() { for _, media := range f.ai.agent.Stream.Desc().Medias { for _, forma := range media.Formats { + clockRate := forma.ClockRate() + switch forma := forma.(type) { case *rtspformat.H265: //nolint:dupl track := addTrack(forma, &mpegts.CodecH265{}) - var dtsExtractor *h265.DTSExtractor + var dtsExtractor *h265.DTSExtractor2 f.ai.agent.Stream.AddReader( f.ai, @@ -89,7 +103,7 @@ func (f *formatMPEGTS) initialize() { if !randomAccess { return nil } - dtsExtractor = h265.NewDTSExtractor() + dtsExtractor = h265.NewDTSExtractor2() } dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) @@ -98,12 +112,17 @@ func (f *formatMPEGTS) initialize() { } return f.write( - dts, + timestampToDuration(dts, clockRate), tunit.NTP, true, randomAccess, func() error { - return f.mw.WriteH265(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + return f.mw.WriteH265( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + dts, + randomAccess, + tunit.AU) }, ) }) @@ -111,7 +130,7 @@ func (f *formatMPEGTS) initialize() { case *rtspformat.H264: //nolint:dupl track := addTrack(forma, &mpegts.CodecH264{}) - var dtsExtractor *h264.DTSExtractor + var dtsExtractor *h264.DTSExtractor2 f.ai.agent.Stream.AddReader( f.ai, @@ -129,7 +148,7 @@ func (f *formatMPEGTS) initialize() { if !randomAccess { return nil } - dtsExtractor = h264.NewDTSExtractor() + dtsExtractor = h264.NewDTSExtractor2() } dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS) @@ -138,12 +157,17 @@ func (f *formatMPEGTS) initialize() { } return f.write( - dts, + timestampToDuration(dts, clockRate), tunit.NTP, true, randomAccess, func() error { - return f.mw.WriteH264(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU) + return f.mw.WriteH264( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + dts, + randomAccess, + tunit.AU) }, ) }) @@ -152,7 +176,7 @@ func (f *formatMPEGTS) initialize() { track := addTrack(forma, &mpegts.CodecMPEG4Video{}) firstReceived := false - var lastPTS time.Duration + var lastPTS int64 f.ai.agent.Stream.AddReader( f.ai, @@ -174,12 +198,15 @@ func (f *formatMPEGTS) initialize() { randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)}) return f.write( - tunit.PTS, + timestampToDuration(tunit.PTS, clockRate), tunit.NTP, true, randomAccess, func() error { - return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + return f.mw.WriteMPEG4Video( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + tunit.Frame) }, ) }) @@ -188,7 +215,7 @@ func (f *formatMPEGTS) initialize() { track := addTrack(forma, &mpegts.CodecMPEG1Video{}) firstReceived := false - var lastPTS time.Duration + var lastPTS int64 f.ai.agent.Stream.AddReader( f.ai, @@ -210,12 +237,15 @@ func (f *formatMPEGTS) initialize() { randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8}) return f.write( - tunit.PTS, + timestampToDuration(tunit.PTS, clockRate), tunit.NTP, true, randomAccess, func() error { - return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame) + return f.mw.WriteMPEG1Video( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + tunit.Frame) }, ) }) @@ -236,12 +266,15 @@ func (f *formatMPEGTS) initialize() { } return f.write( - tunit.PTS, + timestampToDuration(tunit.PTS, clockRate), tunit.NTP, false, true, func() error { - return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets) + return f.mw.WriteOpus( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + tunit.Packets) }, ) }) @@ -266,12 +299,15 @@ func (f *formatMPEGTS) initialize() { } return f.write( - tunit.PTS, + timestampToDuration(tunit.PTS, clockRate), tunit.NTP, false, true, func() error { - return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs) + return f.mw.WriteMPEG4Audio( + track, + multiplyAndDivide(tunit.PTS, 90000, int64(clockRate)), + tunit.AUs) }, ) }) @@ -291,12 +327,15 @@ func (f *formatMPEGTS) initialize() { } return f.write( - tunit.PTS, + timestampToDuration(tunit.PTS, clockRate), tunit.NTP, false, true, func() error { - return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames) + return f.mw.WriteMPEG1Audio( + track, + tunit.PTS, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP + tunit.Frames) }, ) }) @@ -304,8 +343,6 @@ func (f *formatMPEGTS) initialize() { case *rtspformat.AC3: track := addTrack(forma, &mpegts.CodecAC3{}) - sampleRate := time.Duration(forma.SampleRate) - f.ai.agent.Stream.AddReader( f.ai, media, @@ -317,16 +354,18 @@ func (f *formatMPEGTS) initialize() { } return f.write( - tunit.PTS, + timestampToDuration(tunit.PTS, clockRate), tunit.NTP, false, true, func() error { for i, frame := range tunit.Frames { - framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame* - time.Second/sampleRate + framePTS := tunit.PTS + int64(i)*ac3.SamplesPerFrame - err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame) + err := f.mw.WriteAC3( + track, + multiplyAndDivide(framePTS, 90000, int64(clockRate)), + frame) if err != nil { return err } @@ -370,7 +409,7 @@ func (f *formatMPEGTS) close() { } func (f *formatMPEGTS) write( - dts time.Duration, + dtsDuration time.Duration, ntp time.Time, isVideo bool, randomAccess bool, @@ -384,14 +423,14 @@ func (f *formatMPEGTS) write( case f.currentSegment == nil: f.currentSegment = &formatMPEGTSSegment{ f: f, - startDTS: dts, + startDTS: dtsDuration, startNTP: ntp, } f.currentSegment.initialize() case (!f.hasVideo || isVideo) && randomAccess && - (dts-f.currentSegment.startDTS) >= f.ai.agent.SegmentDuration: - f.currentSegment.lastDTS = dts + (dtsDuration-f.currentSegment.startDTS) >= f.ai.agent.SegmentDuration: + f.currentSegment.lastDTS = dtsDuration err := f.currentSegment.close() if err != nil { return err @@ -399,21 +438,21 @@ func (f *formatMPEGTS) write( f.currentSegment = &formatMPEGTSSegment{ f: f, - startDTS: dts, + startDTS: dtsDuration, startNTP: ntp, } f.currentSegment.initialize() - case (dts - f.currentSegment.lastFlush) >= f.ai.agent.PartDuration: + case (dtsDuration - f.currentSegment.lastFlush) >= f.ai.agent.PartDuration: err := f.bw.Flush() if err != nil { return err } - f.currentSegment.lastFlush = dts + f.currentSegment.lastFlush = dtsDuration } - f.currentSegment.lastDTS = dts + f.currentSegment.lastDTS = dtsDuration return writeCB() } diff --git a/internal/recorder/recoder_instance.go b/internal/recorder/recoder_instance.go index e0261e43..3b111b11 100644 --- a/internal/recorder/recoder_instance.go +++ b/internal/recorder/recoder_instance.go @@ -13,7 +13,7 @@ import ( type sample struct { *fmp4.PartSample - dts time.Duration + dts int64 ntp time.Time } diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index c3fb654b..270cf134 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -70,9 +70,9 @@ func TestRecorder(t *testing.T) { }, }} - writeToStream := func(stream *stream.Stream, startDTS time.Duration, startNTP time.Time) { + writeToStream := func(stream *stream.Stream, startDTS int64, startNTP time.Time) { for i := 0; i < 2; i++ { - pts := startDTS + time.Duration(i)*100*time.Millisecond + pts := startDTS + int64(i)*100*90000/1000 ntp := startNTP.Add(time.Duration(i*60) * time.Second) stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ @@ -101,21 +101,21 @@ func TestRecorder(t *testing.T) { stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{ Base: unit.Base{ - PTS: pts, + PTS: pts * int64(desc.Medias[2].Formats[0].ClockRate()) / 90000, }, AUs: [][]byte{{1, 2, 3, 4}}, }) stream.WriteUnit(desc.Medias[3], desc.Medias[3].Formats[0], &unit.G711{ Base: unit.Base{ - PTS: pts, + PTS: pts * int64(desc.Medias[3].Formats[0].ClockRate()) / 90000, }, Samples: []byte{1, 2, 3, 4}, }) stream.WriteUnit(desc.Medias[4], desc.Medias[4].Formats[0], &unit.LPCM{ Base: unit.Base{ - PTS: pts, + PTS: pts * int64(desc.Medias[4].Formats[0].ClockRate()) / 90000, }, Samples: []byte{1, 2, 3, 4}, }) @@ -198,11 +198,11 @@ func TestRecorder(t *testing.T) { w.Initialize() writeToStream(stream, - 50*time.Second, + 50*90000, time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC)) writeToStream(stream, - 52*time.Second, + 52*90000, time.Date(2008, 5, 20, 22, 16, 25, 0, time.UTC)) // simulate a write error @@ -296,7 +296,7 @@ func TestRecorder(t *testing.T) { time.Sleep(50 * time.Millisecond) writeToStream(stream, - 300*time.Second, + 300*90000, time.Date(2010, 5, 20, 22, 15, 25, 0, time.UTC)) time.Sleep(50 * time.Millisecond) @@ -367,7 +367,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) { for i := 0; i < 3; i++ { stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ Base: unit.Base{ - PTS: -50*time.Millisecond + (time.Duration(i) * 200 * time.Millisecond), + PTS: -50*90000/1000 + (int64(i) * 200 * 90000 / 1000), NTP: time.Date(2008, 5, 20, 22, 15, 25, 0, time.UTC), }, AU: [][]byte{ @@ -379,7 +379,7 @@ func TestRecorderFMP4NegativeDTS(t *testing.T) { stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.MPEG4Audio{ Base: unit.Base{ - PTS: -100*time.Millisecond + (time.Duration(i) * 200 * time.Millisecond), + PTS: -100*44100/1000 + (int64(i) * 200 * 44100 / 1000), }, AUs: [][]byte{{1, 2, 3, 4}}, }) diff --git a/internal/servers/hls/server_test.go b/internal/servers/hls/server_test.go index ad871fbf..4d19dd26 100644 --- a/internal/servers/hls/server_test.go +++ b/internal/servers/hls/server_test.go @@ -217,12 +217,13 @@ func TestServerRead(t *testing.T) { c.OnTracks = func(tracks []*gohlslib.Track) error { require.Equal(t, []*gohlslib.Track{{ - Codec: &codecs.H264{}, + Codec: &codecs.H264{}, + ClockRate: 90000, }}, tracks) - c.OnDataH26x(tracks[0], func(pts, dts time.Duration, au [][]byte) { - require.Equal(t, time.Duration(0), pts) - require.Equal(t, time.Duration(0), dts) + c.OnDataH26x(tracks[0], func(pts, dts int64, au [][]byte) { + require.Equal(t, int64(0), pts) + require.Equal(t, int64(0), dts) require.Equal(t, [][]byte{ test.FormatH264.SPS, test.FormatH264.PPS, @@ -246,7 +247,7 @@ func TestServerRead(t *testing.T) { str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{ Base: unit.Base{ NTP: time.Time{}, - PTS: time.Duration(i) * time.Second, + PTS: int64(i) * 90000, }, AU: [][]byte{ {5, 1}, // IDR @@ -311,7 +312,7 @@ func TestServerRead(t *testing.T) { str.WriteUnit(test.MediaH264, test.FormatH264, &unit.H264{ Base: unit.Base{ NTP: time.Time{}, - PTS: time.Duration(i) * time.Second, + PTS: int64(i) * 90000, }, AU: [][]byte{ {5, 1}, // IDR @@ -327,12 +328,13 @@ func TestServerRead(t *testing.T) { c.OnTracks = func(tracks []*gohlslib.Track) error { require.Equal(t, []*gohlslib.Track{{ - Codec: &codecs.H264{}, + Codec: &codecs.H264{}, + ClockRate: 90000, }}, tracks) - c.OnDataH26x(tracks[0], func(pts, dts time.Duration, au [][]byte) { - require.Equal(t, time.Duration(0), pts) - require.Equal(t, time.Duration(0), dts) + c.OnDataH26x(tracks[0], func(pts, dts int64, au [][]byte) { + require.Equal(t, int64(0), pts) + require.Equal(t, int64(0), dts) require.Equal(t, [][]byte{ test.FormatH264.SPS, test.FormatH264.PPS, diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 0a1b3d76..862c8638 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -292,7 +292,7 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons cforma := forma s.rsession.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) { - pts, ok := s.rsession.PacketPTS(cmedi, pkt) + pts, ok := s.rsession.PacketPTS2(cmedi, pkt) if !ok { return } diff --git a/internal/staticsources/rpicamera/source.go b/internal/staticsources/rpicamera/source.go index 75e70288..37e4b856 100644 --- a/internal/staticsources/rpicamera/source.go +++ b/internal/staticsources/rpicamera/source.go @@ -14,6 +14,12 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + func paramsFromConf(logLevel conf.LogLevel, cnf *conf.Path) params { return params{ LogLevel: func() string { @@ -105,7 +111,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { stream.WriteUnit(medi, medi.Formats[0], &unit.H264{ Base: unit.Base{ NTP: time.Now(), - PTS: dts, + PTS: multiplyAndDivide(int64(dts), 90000, int64(time.Second)), }, AU: au, }) diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index e7a2d8ab..1c161bb6 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -143,7 +143,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { cforma := forma c.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) { - pts, ok := c.PacketPTS(cmedi, pkt) + pts, ok := c.PacketPTS2(cmedi, pkt) if !ok { return } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 8ff8b949..7ca4779c 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -240,7 +240,7 @@ func (s *Stream) WriteRTPPacket( forma format.Format, pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, ) { sm := s.streamMedias[medi] sf := sm.formats[forma] diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 30130875..cea65f6b 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -76,7 +76,7 @@ func (sf *streamFormat) writeRTPPacket( medi *description.Media, pkt *rtp.Packet, ntp time.Time, - pts time.Duration, + pts int64, ) { hasNonRTSPReaders := len(sf.pausedReaders) > 0 || len(sf.runningReaders) > 0 diff --git a/internal/unit/base.go b/internal/unit/base.go index 616f5573..619be1aa 100644 --- a/internal/unit/base.go +++ b/internal/unit/base.go @@ -10,7 +10,7 @@ import ( type Base struct { RTPPackets []*rtp.Packet NTP time.Time - PTS time.Duration + PTS int64 } // GetRTPPackets implements Unit. @@ -24,6 +24,6 @@ func (u *Base) GetNTP() time.Time { } // GetPTS implements Unit. -func (u *Base) GetPTS() time.Duration { +func (u *Base) GetPTS() int64 { return u.PTS } diff --git a/internal/unit/unit.go b/internal/unit/unit.go index 55f986c7..7a58a414 100644 --- a/internal/unit/unit.go +++ b/internal/unit/unit.go @@ -16,5 +16,5 @@ type Unit interface { GetNTP() time.Time // returns the PTS of the unit. - GetPTS() time.Duration + GetPTS() int64 }