allow using MTX_QUERY inside source (#3486)

this allows to pass query parameters to sources, for instance:

source: rtsp://my_host/my_path?$MTX_QUERY
sourceOnDemand: true
This commit is contained in:
Alessandro Ros
2024-06-18 22:10:26 +02:00
committed by GitHub
parent dfa2e81e61
commit 65d90f7cc6
18 changed files with 118 additions and 106 deletions

View File

@@ -169,26 +169,19 @@ func (pa *path) run() {
if pa.conf.Source == "redirect" {
pa.source = &sourceRedirect{}
} else if pa.conf.HasStaticSource() {
resolvedSource := pa.conf.Source
if len(pa.matches) > 1 {
for i, ma := range pa.matches[1:] {
resolvedSource = strings.ReplaceAll(resolvedSource, "$G"+strconv.FormatInt(int64(i+1), 10), ma)
}
}
pa.source = &staticSourceHandler{
conf: pa.conf,
logLevel: pa.logLevel,
readTimeout: pa.readTimeout,
writeTimeout: pa.writeTimeout,
writeQueueSize: pa.writeQueueSize,
resolvedSource: resolvedSource,
matches: pa.matches,
parent: pa,
}
pa.source.(*staticSourceHandler).initialize()
if !pa.conf.SourceOnDemand {
pa.source.(*staticSourceHandler).start(false)
pa.source.(*staticSourceHandler).start(false, "")
}
}
@@ -431,7 +424,7 @@ func (pa *path) doDescribe(req defs.PathDescribeReq) {
if pa.conf.HasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
pa.onDemandStaticSourceStart()
pa.onDemandStaticSourceStart(req.AccessRequest.Query)
}
pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req)
return
@@ -539,7 +532,7 @@ func (pa *path) doAddReader(req defs.PathAddReaderReq) {
if pa.conf.HasOnDemandStaticSource() {
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
pa.onDemandStaticSourceStart()
pa.onDemandStaticSourceStart(req.AccessRequest.Query)
}
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
return
@@ -655,8 +648,8 @@ func (pa *path) shouldClose() bool {
len(pa.readerAddRequestsOnHold) == 0
}
func (pa *path) onDemandStaticSourceStart() {
pa.source.(*staticSourceHandler).start(true)
func (pa *path) onDemandStaticSourceStart(query string) {
pa.source.(*staticSourceHandler).start(true, query)
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout))

View File

@@ -700,13 +700,14 @@ func TestPathFallback(t *testing.T) {
}
}
func TestPathSourceRegexp(t *testing.T) {
func TestPathResolveSource(t *testing.T) {
var stream *gortsplib.ServerStream
s := gortsplib.Server{
Handler: &testServer{
onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) {
require.Equal(t, "key=val", ctx.Query)
require.Equal(t, "/a", ctx.Path)
return &base.Response{
StatusCode: base.StatusOK,
@@ -736,7 +737,7 @@ func TestPathSourceRegexp(t *testing.T) {
p, ok := newInstance(
"paths:\n" +
" '~^test_(.+)$':\n" +
" source: rtsp://127.0.0.1:8555/$G1\n" +
" source: rtsp://127.0.0.1:8555/$G1?$MTX_QUERY\n" +
" sourceOnDemand: yes\n" +
" 'all':\n")
require.Equal(t, true, ok)
@@ -744,7 +745,7 @@ func TestPathSourceRegexp(t *testing.T) {
reader := gortsplib.Client{}
u, err := base.ParseURL("rtsp://127.0.0.1:8554/test_a")
u, err := base.ParseURL("rtsp://127.0.0.1:8554/test_a?key=val")
require.NoError(t, err)
err = reader.Start(u.Scheme, u.Host)

View File

@@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"strconv"
"strings"
"time"
@@ -22,6 +23,18 @@ const (
staticSourceHandlerRetryPause = 5 * time.Second
)
func resolveSource(s string, matches []string, query string) string {
if len(matches) > 1 {
for i, ma := range matches[1:] {
s = strings.ReplaceAll(s, "$G"+strconv.FormatInt(int64(i+1), 10), ma)
}
}
s = strings.ReplaceAll(s, "$MTX_QUERY", query)
return s
}
type staticSourceHandlerParent interface {
logger.Writer
staticSourceHandlerSetReady(context.Context, defs.PathSourceStaticSetReadyReq)
@@ -35,13 +48,14 @@ type staticSourceHandler struct {
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
writeQueueSize int
resolvedSource string
matches []string
parent staticSourceHandlerParent
ctx context.Context
ctxCancel func()
instance defs.StaticSource
running bool
query string
// in
chReloadConf chan *conf.Path
@@ -58,60 +72,57 @@ func (s *staticSourceHandler) initialize() {
s.chInstanceSetNotReady = make(chan defs.PathSourceStaticSetNotReadyReq)
switch {
case strings.HasPrefix(s.resolvedSource, "rtsp://") ||
strings.HasPrefix(s.resolvedSource, "rtsps://"):
case strings.HasPrefix(s.conf.Source, "rtsp://") ||
strings.HasPrefix(s.conf.Source, "rtsps://"):
s.instance = &rtspsource.Source{
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
WriteQueueSize: s.writeQueueSize,
Parent: s,
}
case strings.HasPrefix(s.resolvedSource, "rtmp://") ||
strings.HasPrefix(s.resolvedSource, "rtmps://"):
case strings.HasPrefix(s.conf.Source, "rtmp://") ||
strings.HasPrefix(s.conf.Source, "rtmps://"):
s.instance = &rtmpsource.Source{
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
Parent: s,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
Parent: s,
}
case strings.HasPrefix(s.resolvedSource, "http://") ||
strings.HasPrefix(s.resolvedSource, "https://"):
case strings.HasPrefix(s.conf.Source, "http://") ||
strings.HasPrefix(s.conf.Source, "https://"):
s.instance = &hlssource.Source{
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
ReadTimeout: s.readTimeout,
Parent: s,
}
case strings.HasPrefix(s.resolvedSource, "udp://"):
case strings.HasPrefix(s.conf.Source, "udp://"):
s.instance = &udpsource.Source{
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
ReadTimeout: s.readTimeout,
Parent: s,
}
case strings.HasPrefix(s.resolvedSource, "srt://"):
case strings.HasPrefix(s.conf.Source, "srt://"):
s.instance = &srtsource.Source{
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
ReadTimeout: s.readTimeout,
Parent: s,
}
case strings.HasPrefix(s.resolvedSource, "whep://") ||
strings.HasPrefix(s.resolvedSource, "wheps://"):
case strings.HasPrefix(s.conf.Source, "whep://") ||
strings.HasPrefix(s.conf.Source, "wheps://"):
s.instance = &webrtcsource.Source{
ResolvedSource: s.resolvedSource,
ReadTimeout: s.readTimeout,
Parent: s,
ReadTimeout: s.readTimeout,
Parent: s,
}
case s.resolvedSource == "rpiCamera":
case s.conf.Source == "rpiCamera":
s.instance = &rpicamerasource.Source{
LogLevel: s.logLevel,
Parent: s,
}
default:
panic("should not happen")
}
}
@@ -119,12 +130,16 @@ func (s *staticSourceHandler) close(reason string) {
s.stop(reason)
}
func (s *staticSourceHandler) start(onDemand bool) {
func (s *staticSourceHandler) start(onDemand bool, query string) {
if s.running {
panic("should not happen")
}
s.running = true
s.query = query
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
s.done = make(chan struct{})
s.instance.Log(logger.Info, "started%s",
func() string {
if onDemand {
@@ -133,9 +148,6 @@ func (s *staticSourceHandler) start(onDemand bool) {
return ""
}())
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
s.done = make(chan struct{})
go s.run()
}
@@ -145,6 +157,7 @@ func (s *staticSourceHandler) stop(reason string) {
}
s.running = false
s.instance.Log(logger.Info, "stopped: %s", reason)
s.ctxCancel()
@@ -167,12 +180,15 @@ func (s *staticSourceHandler) run() {
runReloadConf := make(chan *conf.Path)
recreate := func() {
resolvedSource := resolveSource(s.conf.Source, s.matches, s.query)
runCtx, runCtxCancel = context.WithCancel(context.Background())
go func() {
runErr <- s.instance.Run(defs.StaticSourceRunParams{
Context: runCtx,
Conf: s.conf,
ReloadConf: runReloadConf,
Context: runCtx,
ResolvedSource: resolvedSource,
Conf: s.conf,
ReloadConf: runReloadConf,
})
}()
}

View File

@@ -23,7 +23,8 @@ type StaticSourceParent interface {
// StaticSourceRunParams is the set of params passed to Run().
type StaticSourceRunParams struct {
Context context.Context
Conf *conf.Path
ReloadConf chan *conf.Path
Context context.Context
ResolvedSource string
Conf *conf.Path
ReloadConf chan *conf.Path
}

View File

@@ -20,9 +20,8 @@ import (
// Source is a HLS static source.
type Source struct {
ResolvedSource string
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
}
// Log implements logger.Writer.
@@ -49,7 +48,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
var c *gohlslib.Client
c = &gohlslib.Client{
URI: s.ResolvedSource,
URI: params.ResolvedSource,
HTTPClient: &http.Client{
Timeout: time.Duration(s.ReadTimeout),
Transport: tr,

View File

@@ -90,10 +90,10 @@ func TestSource(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "http://localhost:5780/stream.m3u8",
Parent: p,
Parent: p,
}
},
"http://localhost:5780/stream.m3u8",
&conf.Path{},
)
defer te.Close()

View File

@@ -23,10 +23,9 @@ import (
// Source is a RTMP static source.
type Source struct {
ResolvedSource string
ReadTimeout conf.StringDuration
WriteTimeout conf.StringDuration
Parent defs.StaticSourceParent
ReadTimeout conf.StringDuration
WriteTimeout conf.StringDuration
Parent defs.StaticSourceParent
}
// Log implements logger.Writer.
@@ -38,7 +37,7 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
u, err := url.Parse(s.ResolvedSource)
u, err := url.Parse(params.ResolvedSource)
if err != nil {
return err
}

View File

@@ -64,24 +64,24 @@ func TestSource(t *testing.T) {
te = test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "rtmp://localhost/teststream",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
}
},
"rtmp://localhost/teststream",
&conf.Path{},
)
} else {
te = test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "rtmps://localhost/teststream",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
}
},
"rtmps://localhost/teststream",
&conf.Path{
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
},

View File

@@ -62,7 +62,6 @@ func createRangeHeader(cnf *conf.Path) (*headers.Range, error) {
// Source is a RTSP static source.
type Source struct {
ResolvedSource string
ReadTimeout conf.StringDuration
WriteTimeout conf.StringDuration
WriteQueueSize int
@@ -104,7 +103,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
},
}
u, err := base.ParseURL(s.ResolvedSource)
u, err := base.ParseURL(params.ResolvedSource)
if err != nil {
return err
}

View File

@@ -138,13 +138,13 @@ func TestSource(t *testing.T) {
te = test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "rtsp://testuser:testpass@localhost:8555/teststream",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsp://testuser:testpass@localhost:8555/teststream",
&conf.Path{
RTSPTransport: sp,
},
@@ -153,13 +153,13 @@ func TestSource(t *testing.T) {
te = test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "rtsps://testuser:testpass@localhost:8555/teststream",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsps://testuser:testpass@localhost:8555/teststream",
&conf.Path{
SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739",
},
@@ -241,13 +241,13 @@ func TestRTSPSourceNoPassword(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "rtsp://testuser:@127.0.0.1:8555/teststream",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsp://testuser:@127.0.0.1:8555/teststream",
&conf.Path{
RTSPTransport: sp,
},
@@ -338,13 +338,13 @@ func TestRTSPSourceRange(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "rtsp://127.0.0.1:8555/teststream",
ReadTimeout: conf.StringDuration(10 * time.Second),
WriteTimeout: conf.StringDuration(10 * time.Second),
WriteQueueSize: 2048,
Parent: p,
}
},
"rtsp://127.0.0.1:8555/teststream",
cnf,
)
defer te.Close()

View File

@@ -17,9 +17,8 @@ import (
// Source is a SRT static source.
type Source struct {
ResolvedSource string
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
}
// Log implements logger.Writer.
@@ -32,7 +31,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
conf := srt.DefaultConfig()
address, err := conf.UnmarshalURL(s.ResolvedSource)
address, err := conf.UnmarshalURL(params.ResolvedSource)
if err != nil {
return err
}

View File

@@ -55,11 +55,11 @@ func TestSource(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567",
ReadTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
ReadTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
}
},
"srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567",
&conf.Path{},
)
defer te.Close()

View File

@@ -45,9 +45,8 @@ type packetConn interface {
// Source is a UDP static source.
type Source struct {
ResolvedSource string
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
}
// Log implements logger.Writer.
@@ -59,7 +58,7 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
hostPort := s.ResolvedSource[len("udp://"):]
hostPort := params.ResolvedSource[len("udp://"):]
addr, err := net.ResolveUDPAddr("udp", hostPort)
if err != nil {

View File

@@ -18,11 +18,11 @@ func TestSource(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "udp://127.0.0.1:9001",
ReadTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
ReadTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
}
},
"udp://127.0.0.1:9001",
&conf.Path{},
)
defer te.Close()

View File

@@ -19,9 +19,8 @@ import (
// Source is a WebRTC static source.
type Source struct {
ResolvedSource string
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
ReadTimeout conf.StringDuration
Parent defs.StaticSourceParent
}
// Log implements logger.Writer.
@@ -33,7 +32,7 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
u, err := url.Parse(s.ResolvedSource)
u, err := url.Parse(params.ResolvedSource)
if err != nil {
return err
}

View File

@@ -121,11 +121,11 @@ func TestSource(t *testing.T) {
te := test.NewSourceTester(
func(p defs.StaticSourceParent) defs.StaticSource {
return &Source{
ResolvedSource: "whep://localhost:9003/my/resource",
ReadTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
ReadTimeout: conf.StringDuration(10 * time.Second),
Parent: p,
}
},
"whep://localhost:9003/my/resource",
&conf.Path{},
)
defer te.Close()

View File

@@ -24,7 +24,11 @@ type SourceTester struct {
}
// NewSourceTester allocates a SourceTester.
func NewSourceTester(createFunc func(defs.StaticSourceParent) defs.StaticSource, conf *conf.Path) *SourceTester {
func NewSourceTester(
createFunc func(defs.StaticSourceParent) defs.StaticSource,
resolvedSource string,
conf *conf.Path,
) *SourceTester {
ctx, ctxCancel := context.WithCancel(context.Background())
t := &SourceTester{
@@ -38,8 +42,9 @@ func NewSourceTester(createFunc func(defs.StaticSourceParent) defs.StaticSource,
go func() {
s.Run(defs.StaticSourceRunParams{ //nolint:errcheck
Context: ctx,
Conf: conf,
Context: ctx,
ResolvedSource: resolvedSource,
Conf: conf,
})
close(t.done)
}()

View File

@@ -418,8 +418,10 @@ pathDefaults:
# * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS
# * redirect -> the stream is provided by another path or server
# * rpiCamera -> the stream is provided by a Raspberry Pi Camera
# If path name is a regular expression, $G1, G2, etc will be replaced
# with regular expression groups.
# The following variables can be used in the source string:
# * $MTX_QUERY: query parameters (passed by first reader)
# * $G1, $G2, ...: regular expression groups, if path name is
# a regular expression.
source: publisher
# If the source is a URL, and the source certificate is self-signed
# or invalid, you can provide the fingerprint of the certificate in order to