rtsp: route original absolute timestamp of packets (#1300) (#4378)

This commit is contained in:
Alessandro Ros
2025-03-29 19:08:42 +01:00
committed by GitHub
parent 4bbedc4955
commit 8b98c02903
10 changed files with 168 additions and 60 deletions

View File

@@ -1712,6 +1712,7 @@ The command inserted into `runOnDemand` will start only when a client requests t
Some streaming protocols allow to route absolute timestamps, associated with each frame, that are useful for synchronizing several video or data streams together. In particular, _MediaMTX_ supports receiving absolute timestamps with the following protocols:
* HLS (through the `EXT-X-PROGRAM-DATE-TIME` tag in playlists)
* RTSP (through RTCP reports, when `rtspAbsoluteTimestamp` is `true` in settings)
and supports sending absolute timestamps with the following protocols:

View File

@@ -324,6 +324,8 @@ components:
type: string
fallback:
type: string
rtspAbsoluteTimestamp:
type: boolean
# Record
record:

View File

@@ -118,6 +118,7 @@ type Path struct {
MaxReaders int `json:"maxReaders"`
SRTReadPassphrase string `json:"srtReadPassphrase"`
Fallback string `json:"fallback"`
RTSPAbsoluteTimestamp bool `json:"rtspAbsoluteTimestamp"`
// Record
Record bool `json:"record"`

View File

@@ -0,0 +1,89 @@
// Package rtsp provides RTSP utilities.
package rtsp
import (
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/pion/rtp"
)
type ntpState int
const (
ntpStateInitial ntpState = iota
ntpStateReplace
ntpStateAvailable
)
type rtspSource interface {
PacketPTS2(*description.Media, *rtp.Packet) (int64, bool)
PacketNTP(*description.Media, *rtp.Packet) (time.Time, bool)
OnPacketRTP(*description.Media, format.Format, gortsplib.OnPacketRTPFunc)
}
// ToStream maps a RTSP stream to a MediaMTX stream.
func ToStream(
source rtspSource,
medias []*description.Media,
pathConf *conf.Path,
stream *stream.Stream,
log logger.Writer,
) {
for _, medi := range medias {
for _, forma := range medi.Formats {
cmedi := medi
cforma := forma
var ntpStat ntpState
if !pathConf.RTSPAbsoluteTimestamp {
ntpStat = ntpStateReplace
}
handleNTP := func(pkt *rtp.Packet) (time.Time, bool) {
switch ntpStat {
case ntpStateReplace:
return time.Now(), true
case ntpStateInitial:
ntp, avail := source.PacketNTP(cmedi, pkt)
if !avail {
log.Log(logger.Warn, "received RTP packet without absolute time, skipping it")
return time.Time{}, false
}
ntpStat = ntpStateAvailable
return ntp, true
default: // ntpStateAvailable
ntp, avail := source.PacketNTP(cmedi, pkt)
if !avail {
panic("should not happen")
}
return ntp, true
}
}
source.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
pts, ok := source.PacketPTS2(cmedi, pkt)
if !ok {
return
}
ntp, ok := handleNTP(pkt)
if !ok {
return
}
stream.WriteRTPPacket(cmedi, cforma, pkt, ntp, pts)
})
}
}
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/google/uuid"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/auth"
"github.com/bluenviron/mediamtx/internal/conf"
@@ -20,6 +19,7 @@ import (
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/hooks"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/rtsp"
"github.com/bluenviron/mediamtx/internal/stream"
)
@@ -317,21 +317,12 @@ func (s *session) onRecord(_ *gortsplib.ServerHandlerOnRecordCtx) (*base.Respons
s.stream = stream
for _, medi := range s.rsession.AnnouncedDescription().Medias {
for _, forma := range medi.Formats {
cmedi := medi
cforma := forma
s.rsession.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
pts, ok := s.rsession.PacketPTS2(cmedi, pkt)
if !ok {
return
}
stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now(), pts)
})
}
}
rtsp.ToStream(
s.rsession,
s.rsession.AnnouncedDescription().Medias,
s.path.SafeConf(),
stream,
s)
s.mutex.Lock()
s.state = gortsplib.ServerSessionStateRecord

View File

@@ -7,12 +7,12 @@ import (
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/counterdumper"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/rtsp"
"github.com/bluenviron/mediamtx/internal/protocols/tls"
)
@@ -168,21 +168,12 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
for _, medi := range desc.Medias {
for _, forma := range medi.Formats {
cmedi := medi
cforma := forma
c.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS2(cmedi, pkt)
if !ok {
return
}
res.Stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now(), pts)
})
}
}
rtsp.ToStream(
c,
desc.Medias,
params.Conf,
res.Stream,
s)
rangeHeader, err := createRangeHeader(params.Conf)
if err != nil {

View File

@@ -178,7 +178,7 @@ func TestSource(t *testing.T) {
}
}
func TestRTSPSourceNoPassword(t *testing.T) {
func TestSourceNoPassword(t *testing.T) {
var stream *gortsplib.ServerStream
nonce, err := auth.GenerateNonce()
@@ -267,7 +267,7 @@ func TestRTSPSourceNoPassword(t *testing.T) {
<-te.Unit
}
func TestRTSPSourceRange(t *testing.T) {
func TestSourceRange(t *testing.T) {
for _, ca := range []string{"clock", "npt", "smpte"} {
t.Run(ca, func(t *testing.T) {
var stream *gortsplib.ServerStream

View File

@@ -75,10 +75,10 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
return rres.Err
}
stream = rres.Stream
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
stream = rres.Stream
client.StartReading()
return client.Wait(params.Context)

View File

@@ -3,9 +3,9 @@ package main
import (
"os"
"reflect"
"sort"
"strings"
"testing"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/conf/yamlwrapper"
@@ -13,20 +13,34 @@ import (
"github.com/stretchr/testify/require"
)
type openAPIProperty struct {
Ref string `json:"$ref"`
Type string `json:"type"`
Nullable bool `json:"nullable"`
}
type openAPISchema struct {
Type string `json:"type"`
Properties map[string]openAPIProperty `json:"properties"`
}
type openAPI struct {
Components struct {
Schemas map[string]openAPISchema `json:"schemas"`
} `json:"components"`
}
func TestAPIDocs(t *testing.T) {
byts, err := os.ReadFile("../../apidocs/openapi.yaml")
require.NoError(t, err)
var raw map[string]interface{}
err = yamlwrapper.Unmarshal(byts, &raw)
var doc openAPI
err = yamlwrapper.Unmarshal(byts, &doc)
require.NoError(t, err)
components := raw["components"].(map[string]interface{})
schemas := components["schemas"].(map[string]interface{})
for _, ca := range []struct {
yamlKey string
goStruct interface{}
openAPIKey string
goStruct any
}{
{
"AuthInternalUser",
@@ -125,30 +139,46 @@ func TestAPIDocs(t *testing.T) {
defs.APIWebRTCSessionList{},
},
} {
t.Run(ca.yamlKey, func(t *testing.T) {
yamlContent := schemas[ca.yamlKey].(map[string]interface{})
props := yamlContent["properties"].(map[string]interface{})
key1 := make([]string, len(props))
i := 0
for key := range props {
key1[i] = key
i++
}
t.Run(ca.openAPIKey, func(t *testing.T) {
content1 := doc.Components.Schemas[ca.openAPIKey]
var key2 []string
content2 := openAPISchema{
Type: "object",
Properties: make(map[string]openAPIProperty),
}
ty := reflect.TypeOf(ca.goStruct)
for i := 0; i < ty.NumField(); i++ {
for i := range ty.NumField() {
sf := ty.Field(i)
js := sf.Tag.Get("json")
if js != "-" && js != "paths" && js != "pathDefaults" && !strings.Contains(js, ",omitempty") {
key2 = append(key2, js)
switch {
case sf.Type == reflect.TypeOf(""):
content2.Properties[js] = openAPIProperty{Type: "string"}
case sf.Type == reflect.TypeOf(int(0)):
content2.Properties[js] = openAPIProperty{Type: "integer"}
case sf.Type == reflect.TypeOf(false):
content2.Properties[js] = openAPIProperty{Type: "boolean"}
case sf.Type == reflect.TypeOf(time.Time{}):
content2.Properties[js] = openAPIProperty{Type: "string"}
case sf.Type == reflect.TypeOf(&time.Time{}):
content2.Properties[js] = openAPIProperty{
Type: "string",
Nullable: true,
}
default:
if existing, ok := content1.Properties[js]; ok {
content2.Properties[js] = existing
}
}
}
}
sort.Strings(key1)
sort.Strings(key2)
require.Equal(t, key1, key2)
require.Equal(t, content2, content1)
})
}
}

View File

@@ -451,6 +451,9 @@ pathDefaults:
# If the stream is not available, redirect readers to this path.
# It can be can be a relative path (i.e. /otherstream) or an absolute RTSP URL.
fallback:
# Route absolute timestamps of RTSP packets published or proxied through this path.
# Absolute timestamps are transported through RTCP reports.
rtspAbsoluteTimestamp: false
###############################################
# Default path settings -> Record