hls: support routing absolute timestamps (#1300) (#4372)

This commit is contained in:
Alessandro Ros
2025-03-27 22:18:13 +01:00
committed by GitHub
parent 0d46cf3f74
commit 60cabeea92
4 changed files with 229 additions and 48 deletions

View File

@@ -125,6 +125,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [Forward streams to other servers](#forward-streams-to-other-servers)
* [Proxy requests to other servers](#proxy-requests-to-other-servers)
* [On-demand publishing](#on-demand-publishing)
* [Route absolute timestamps](#route-absolute-timestamps)
* [Expose the server in a subfolder](#expose-the-server-in-a-subfolder)
* [Start on boot](#start-on-boot)
* [Linux](#linux)
@@ -1706,6 +1707,17 @@ paths:
The command inserted into `runOnDemand` will start only when a client requests the path `ondemand`, therefore the file will start streaming only when requested.
### Route absolute timestamps
Some streaming protocols allow to route absolute timestamps, associated with each frame, that are useful for synchronizing several video or data streams together. In particular, _MediaMTX_ supports receiving absolute timestamps with the following protocols:
* HLS (through the `EXT-X-PROGRAM-DATE-TIME` tag in playlists)
and supports sending absolute timestamps with the following protocols:
* HLS (through the `EXT-X-PROGRAM-DATE-TIME` tag in playlists)
* RTSP (through RTCP reports)
### Expose the server in a subfolder
HTTP-based services (WebRTC, HLS, Control API, Playback Server, Metrics, pprof) can be exposed in a subfolder of an existing HTTP server or reverse proxy. The reverse proxy must be able to intercept HTTP requests addressed to MediaMTX and corresponding responses, and perform the following changes:

View File

@@ -7,10 +7,20 @@ import (
"github.com/bluenviron/gohlslib/v2/pkg/codecs"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
type ntpState int
const (
ntpStateInitial ntpState = iota
ntpStateUnavailable
ntpStateAvailable
ntpStateDegraded
)
func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
@@ -23,6 +33,42 @@ func ToStream(
tracks []*gohlslib.Track,
stream **stream.Stream,
) ([]*description.Media, error) {
var state ntpState
handleNTP := func(track *gohlslib.Track) time.Time {
switch state {
case ntpStateInitial:
ntp, avail := c.AbsoluteTime(track)
if !avail {
state = ntpStateUnavailable
return time.Now()
}
state = ntpStateAvailable
return ntp
case ntpStateAvailable:
ntp, avail := c.AbsoluteTime(track)
if !avail {
panic("should not happen")
}
return ntp
case ntpStateUnavailable:
_, avail := c.AbsoluteTime(track)
if avail {
(*stream).Parent.Log(logger.Warn, "absolute timestamp appeared after stream started, we are not using it")
state = ntpStateDegraded
}
return time.Now()
default: // ntpStateDegraded
return time.Now()
}
}
var medias []*description.Media //nolint:prealloc
for _, track := range tracks {
@@ -41,7 +87,7 @@ func ToStream(
c.OnDataAV1(track, func(pts int64, tu [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.AV1{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
TU: tu,
@@ -59,7 +105,7 @@ func ToStream(
c.OnDataVP9(track, func(pts int64, frame []byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.VP9{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
Frame: frame,
@@ -80,7 +126,7 @@ func ToStream(
c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.H265{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
AU: au,
@@ -101,7 +147,7 @@ func ToStream(
c.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: pts, // no conversion is needed since clock rate is 90khz in both MPEG-TS and RTSP
},
AU: au,
@@ -120,7 +166,7 @@ func ToStream(
c.OnDataOpus(track, func(pts int64, packets [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.Opus{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), int64(clockRate)),
},
Packets: packets,
@@ -142,7 +188,7 @@ func ToStream(
c.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) {
(*stream).WriteUnit(medi, medi.Formats[0], &unit.MPEG4Audio{
Base: unit.Base{
NTP: time.Now(),
NTP: handleNTP(track),
PTS: multiplyAndDivide(pts, int64(medi.Formats[0].ClockRate()), int64(clockRate)),
},
AUs: aus,

View File

@@ -1,9 +1,19 @@
package hls
import (
"context"
"net"
"net/http"
"testing"
"time"
"github.com/bluenviron/gohlslib/v2"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/bluenviron/mediamtx/internal/unit"
"github.com/stretchr/testify/require"
)
@@ -14,3 +24,119 @@ func TestToStreamNoSupportedCodecs(t *testing.T) {
// this is impossible to test since currently we support all gohlslib.Tracks.
// func TestToStreamSkipUnsupportedTracks(t *testing.T)
func TestToStream(t *testing.T) {
track1 := &mpegts.Track{
Codec: &mpegts.CodecH264{},
}
s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/stream.m3u8":
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n" +
"#EXT-X-PROGRAM-DATE-TIME:2018-05-20T08:17:15Z\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXT-X-ENDLIST\n"))
case r.Method == http.MethodGet && r.URL.Path == "/segment1.ts":
w.Header().Set("Content-Type", `video/MP2T`)
w := &mpegts.Writer{W: w, Tracks: []*mpegts.Track{track1}}
err := w.Initialize()
require.NoError(t, err)
err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
})
require.NoError(t, err)
case r.Method == http.MethodGet && r.URL.Path == "/segment2.ts":
w.Header().Set("Content-Type", `video/MP2T`)
w := &mpegts.Writer{W: w, Tracks: []*mpegts.Track{track1}}
err := w.Initialize()
require.NoError(t, err)
err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{5, 1},
})
require.NoError(t, err)
}
}),
}
ln, err := net.Listen("tcp", "localhost:5780")
require.NoError(t, err)
go s.Serve(ln)
defer s.Shutdown(context.Background())
var strm *stream.Stream
done := make(chan struct{})
reader := test.NilLogger
var c *gohlslib.Client
c = &gohlslib.Client{
URI: "http://localhost:5780/stream.m3u8",
OnTracks: func(tracks []*gohlslib.Track) error {
medias, err2 := ToStream(c, tracks, &strm)
require.NoError(t, err2)
require.Equal(t, []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}}, medias)
strm = &stream.Stream{
WriteQueueSize: 512,
UDPMaxPayloadSize: 1472,
Desc: &description.Session{Medias: medias},
GenerateRTPPackets: true,
Parent: test.NilLogger,
}
err2 = strm.Initialize()
require.NoError(t, err2)
strm.AddReader(
reader,
medias[0],
medias[0].Formats[0],
func(u unit.Unit) error {
require.Equal(t, time.Date(2018, 0o5, 20, 8, 17, 15, 0, time.UTC), u.GetNTP())
close(done)
return nil
})
strm.StartReader(reader)
return nil
},
}
err = c.Start()
require.NoError(t, err)
defer c.Close()
select {
case <-done:
case err := <-c.Wait():
t.Error(err.Error())
}
strm.RemoveReader(reader)
strm.Close()
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/conf"
@@ -36,54 +35,52 @@ func TestSource(t *testing.T) {
track2,
}
gin.SetMode(gin.ReleaseMode)
router := gin.New()
s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/stream.m3u8":
w.Header().Set("Content-Type", `application/vnd.apple.mpegurl`)
w.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXT-X-ENDLIST\n"))
router.GET("/stream.m3u8", func(ctx *gin.Context) {
ctx.Header("Content-Type", `application/vnd.apple.mpegurl`)
ctx.Writer.Write([]byte("#EXTM3U\n" +
"#EXT-X-VERSION:3\n" +
"#EXT-X-ALLOW-CACHE:NO\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MEDIA-SEQUENCE:0\n" +
"#EXTINF:2,\n" +
"segment1.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXTINF:2,\n" +
"segment2.ts\n" +
"#EXT-X-ENDLIST\n"))
})
case r.Method == http.MethodGet && r.URL.Path == "/segment1.ts":
w.Header().Set("Content-Type", `video/MP2T`)
router.GET("/segment1.ts", func(ctx *gin.Context) {
ctx.Header("Content-Type", `video/MP2T`)
w := &mpegts.Writer{W: w, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)
w := &mpegts.Writer{W: ctx.Writer, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)
err = w.WriteMPEG4Audio(track2, 1*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)
err = w.WriteMPEG4Audio(track2, 1*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)
err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
})
require.NoError(t, err)
err = w.WriteH264(track1, 2*90000, 2*90000, [][]byte{
{7, 1, 2, 3}, // SPS
{8}, // PPS
})
require.NoError(t, err)
})
case r.Method == http.MethodGet && r.URL.Path == "/segment2.ts":
w.Header().Set("Content-Type", `video/MP2T`)
router.GET("/segment2.ts", func(ctx *gin.Context) {
ctx.Header("Content-Type", `video/MP2T`)
w := &mpegts.Writer{W: w, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)
w := &mpegts.Writer{W: ctx.Writer, Tracks: tracks}
err := w.Initialize()
require.NoError(t, err)
err = w.WriteMPEG4Audio(track2, 3*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)
})
s := &http.Server{Handler: router}
err = w.WriteMPEG4Audio(track2, 3*90000, [][]byte{{1, 2, 3, 4}})
require.NoError(t, err)
}
}),
}
ln, err := net.Listen("tcp", "localhost:5780")
require.NoError(t, err)