diff --git a/internal/core/path.go b/internal/core/path.go index 70ab2d54..2119927c 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -169,26 +169,19 @@ func (pa *path) run() { if pa.conf.Source == "redirect" { pa.source = &sourceRedirect{} } else if pa.conf.HasStaticSource() { - resolvedSource := pa.conf.Source - if len(pa.matches) > 1 { - for i, ma := range pa.matches[1:] { - resolvedSource = strings.ReplaceAll(resolvedSource, "$G"+strconv.FormatInt(int64(i+1), 10), ma) - } - } - pa.source = &staticSourceHandler{ conf: pa.conf, logLevel: pa.logLevel, readTimeout: pa.readTimeout, writeTimeout: pa.writeTimeout, writeQueueSize: pa.writeQueueSize, - resolvedSource: resolvedSource, + matches: pa.matches, parent: pa, } pa.source.(*staticSourceHandler).initialize() if !pa.conf.SourceOnDemand { - pa.source.(*staticSourceHandler).start(false) + pa.source.(*staticSourceHandler).start(false, "") } } @@ -431,7 +424,7 @@ func (pa *path) doDescribe(req defs.PathDescribeReq) { if pa.conf.HasOnDemandStaticSource() { if pa.onDemandStaticSourceState == pathOnDemandStateInitial { - pa.onDemandStaticSourceStart() + pa.onDemandStaticSourceStart(req.AccessRequest.Query) } pa.describeRequestsOnHold = append(pa.describeRequestsOnHold, req) return @@ -539,7 +532,7 @@ func (pa *path) doAddReader(req defs.PathAddReaderReq) { if pa.conf.HasOnDemandStaticSource() { if pa.onDemandStaticSourceState == pathOnDemandStateInitial { - pa.onDemandStaticSourceStart() + pa.onDemandStaticSourceStart(req.AccessRequest.Query) } pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req) return @@ -655,8 +648,8 @@ func (pa *path) shouldClose() bool { len(pa.readerAddRequestsOnHold) == 0 } -func (pa *path) onDemandStaticSourceStart() { - pa.source.(*staticSourceHandler).start(true) +func (pa *path) onDemandStaticSourceStart(query string) { + pa.source.(*staticSourceHandler).start(true, query) pa.onDemandStaticSourceReadyTimer.Stop() pa.onDemandStaticSourceReadyTimer = time.NewTimer(time.Duration(pa.conf.SourceOnDemandStartTimeout)) diff --git a/internal/core/path_test.go b/internal/core/path_test.go index 145c00c7..5b123061 100644 --- a/internal/core/path_test.go +++ b/internal/core/path_test.go @@ -700,13 +700,14 @@ func TestPathFallback(t *testing.T) { } } -func TestPathSourceRegexp(t *testing.T) { +func TestPathResolveSource(t *testing.T) { var stream *gortsplib.ServerStream s := gortsplib.Server{ Handler: &testServer{ onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx, ) (*base.Response, *gortsplib.ServerStream, error) { + require.Equal(t, "key=val", ctx.Query) require.Equal(t, "/a", ctx.Path) return &base.Response{ StatusCode: base.StatusOK, @@ -736,7 +737,7 @@ func TestPathSourceRegexp(t *testing.T) { p, ok := newInstance( "paths:\n" + " '~^test_(.+)$':\n" + - " source: rtsp://127.0.0.1:8555/$G1\n" + + " source: rtsp://127.0.0.1:8555/$G1?$MTX_QUERY\n" + " sourceOnDemand: yes\n" + " 'all':\n") require.Equal(t, true, ok) @@ -744,7 +745,7 @@ func TestPathSourceRegexp(t *testing.T) { reader := gortsplib.Client{} - u, err := base.ParseURL("rtsp://127.0.0.1:8554/test_a") + u, err := base.ParseURL("rtsp://127.0.0.1:8554/test_a?key=val") require.NoError(t, err) err = reader.Start(u.Scheme, u.Host) diff --git a/internal/core/static_source_handler.go b/internal/core/static_source_handler.go index 57f4c983..adec4f5b 100644 --- a/internal/core/static_source_handler.go +++ b/internal/core/static_source_handler.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "strconv" "strings" "time" @@ -22,6 +23,18 @@ const ( staticSourceHandlerRetryPause = 5 * time.Second ) +func resolveSource(s string, matches []string, query string) string { + if len(matches) > 1 { + for i, ma := range matches[1:] { + s = strings.ReplaceAll(s, "$G"+strconv.FormatInt(int64(i+1), 10), ma) + } + } + + s = strings.ReplaceAll(s, "$MTX_QUERY", query) + + return s +} + type staticSourceHandlerParent interface { logger.Writer staticSourceHandlerSetReady(context.Context, defs.PathSourceStaticSetReadyReq) @@ -35,13 +48,14 @@ type staticSourceHandler struct { readTimeout conf.StringDuration writeTimeout conf.StringDuration writeQueueSize int - resolvedSource string + matches []string parent staticSourceHandlerParent ctx context.Context ctxCancel func() instance defs.StaticSource running bool + query string // in chReloadConf chan *conf.Path @@ -58,60 +72,57 @@ func (s *staticSourceHandler) initialize() { s.chInstanceSetNotReady = make(chan defs.PathSourceStaticSetNotReadyReq) switch { - case strings.HasPrefix(s.resolvedSource, "rtsp://") || - strings.HasPrefix(s.resolvedSource, "rtsps://"): + case strings.HasPrefix(s.conf.Source, "rtsp://") || + strings.HasPrefix(s.conf.Source, "rtsps://"): s.instance = &rtspsource.Source{ - ResolvedSource: s.resolvedSource, ReadTimeout: s.readTimeout, WriteTimeout: s.writeTimeout, WriteQueueSize: s.writeQueueSize, Parent: s, } - case strings.HasPrefix(s.resolvedSource, "rtmp://") || - strings.HasPrefix(s.resolvedSource, "rtmps://"): + case strings.HasPrefix(s.conf.Source, "rtmp://") || + strings.HasPrefix(s.conf.Source, "rtmps://"): s.instance = &rtmpsource.Source{ - ResolvedSource: s.resolvedSource, - ReadTimeout: s.readTimeout, - WriteTimeout: s.writeTimeout, - Parent: s, + ReadTimeout: s.readTimeout, + WriteTimeout: s.writeTimeout, + Parent: s, } - case strings.HasPrefix(s.resolvedSource, "http://") || - strings.HasPrefix(s.resolvedSource, "https://"): + case strings.HasPrefix(s.conf.Source, "http://") || + strings.HasPrefix(s.conf.Source, "https://"): s.instance = &hlssource.Source{ - ResolvedSource: s.resolvedSource, - ReadTimeout: s.readTimeout, - Parent: s, + ReadTimeout: s.readTimeout, + Parent: s, } - case strings.HasPrefix(s.resolvedSource, "udp://"): + case strings.HasPrefix(s.conf.Source, "udp://"): s.instance = &udpsource.Source{ - ResolvedSource: s.resolvedSource, - ReadTimeout: s.readTimeout, - Parent: s, + ReadTimeout: s.readTimeout, + Parent: s, } - case strings.HasPrefix(s.resolvedSource, "srt://"): + case strings.HasPrefix(s.conf.Source, "srt://"): s.instance = &srtsource.Source{ - ResolvedSource: s.resolvedSource, - ReadTimeout: s.readTimeout, - Parent: s, + ReadTimeout: s.readTimeout, + Parent: s, } - case strings.HasPrefix(s.resolvedSource, "whep://") || - strings.HasPrefix(s.resolvedSource, "wheps://"): + case strings.HasPrefix(s.conf.Source, "whep://") || + strings.HasPrefix(s.conf.Source, "wheps://"): s.instance = &webrtcsource.Source{ - ResolvedSource: s.resolvedSource, - ReadTimeout: s.readTimeout, - Parent: s, + ReadTimeout: s.readTimeout, + Parent: s, } - case s.resolvedSource == "rpiCamera": + case s.conf.Source == "rpiCamera": s.instance = &rpicamerasource.Source{ LogLevel: s.logLevel, Parent: s, } + + default: + panic("should not happen") } } @@ -119,12 +130,16 @@ func (s *staticSourceHandler) close(reason string) { s.stop(reason) } -func (s *staticSourceHandler) start(onDemand bool) { +func (s *staticSourceHandler) start(onDemand bool, query string) { if s.running { panic("should not happen") } s.running = true + s.query = query + s.ctx, s.ctxCancel = context.WithCancel(context.Background()) + s.done = make(chan struct{}) + s.instance.Log(logger.Info, "started%s", func() string { if onDemand { @@ -133,9 +148,6 @@ func (s *staticSourceHandler) start(onDemand bool) { return "" }()) - s.ctx, s.ctxCancel = context.WithCancel(context.Background()) - s.done = make(chan struct{}) - go s.run() } @@ -145,6 +157,7 @@ func (s *staticSourceHandler) stop(reason string) { } s.running = false + s.instance.Log(logger.Info, "stopped: %s", reason) s.ctxCancel() @@ -167,12 +180,15 @@ func (s *staticSourceHandler) run() { runReloadConf := make(chan *conf.Path) recreate := func() { + resolvedSource := resolveSource(s.conf.Source, s.matches, s.query) + runCtx, runCtxCancel = context.WithCancel(context.Background()) go func() { runErr <- s.instance.Run(defs.StaticSourceRunParams{ - Context: runCtx, - Conf: s.conf, - ReloadConf: runReloadConf, + Context: runCtx, + ResolvedSource: resolvedSource, + Conf: s.conf, + ReloadConf: runReloadConf, }) }() } diff --git a/internal/defs/static_source.go b/internal/defs/static_source.go index eaf1cf42..dd5adbac 100644 --- a/internal/defs/static_source.go +++ b/internal/defs/static_source.go @@ -23,7 +23,8 @@ type StaticSourceParent interface { // StaticSourceRunParams is the set of params passed to Run(). type StaticSourceRunParams struct { - Context context.Context - Conf *conf.Path - ReloadConf chan *conf.Path + Context context.Context + ResolvedSource string + Conf *conf.Path + ReloadConf chan *conf.Path } diff --git a/internal/staticsources/hls/source.go b/internal/staticsources/hls/source.go index 4e81d42f..7e333f72 100644 --- a/internal/staticsources/hls/source.go +++ b/internal/staticsources/hls/source.go @@ -20,9 +20,8 @@ import ( // Source is a HLS static source. type Source struct { - ResolvedSource string - ReadTimeout conf.StringDuration - Parent defs.StaticSourceParent + ReadTimeout conf.StringDuration + Parent defs.StaticSourceParent } // Log implements logger.Writer. @@ -49,7 +48,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { var c *gohlslib.Client c = &gohlslib.Client{ - URI: s.ResolvedSource, + URI: params.ResolvedSource, HTTPClient: &http.Client{ Timeout: time.Duration(s.ReadTimeout), Transport: tr, diff --git a/internal/staticsources/hls/source_test.go b/internal/staticsources/hls/source_test.go index bd38bede..6da53776 100644 --- a/internal/staticsources/hls/source_test.go +++ b/internal/staticsources/hls/source_test.go @@ -90,10 +90,10 @@ func TestSource(t *testing.T) { te := test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "http://localhost:5780/stream.m3u8", - Parent: p, + Parent: p, } }, + "http://localhost:5780/stream.m3u8", &conf.Path{}, ) defer te.Close() diff --git a/internal/staticsources/rtmp/source.go b/internal/staticsources/rtmp/source.go index 8ad22667..91a62100 100644 --- a/internal/staticsources/rtmp/source.go +++ b/internal/staticsources/rtmp/source.go @@ -23,10 +23,9 @@ import ( // Source is a RTMP static source. type Source struct { - ResolvedSource string - ReadTimeout conf.StringDuration - WriteTimeout conf.StringDuration - Parent defs.StaticSourceParent + ReadTimeout conf.StringDuration + WriteTimeout conf.StringDuration + Parent defs.StaticSourceParent } // Log implements logger.Writer. @@ -38,7 +37,7 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) { func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") - u, err := url.Parse(s.ResolvedSource) + u, err := url.Parse(params.ResolvedSource) if err != nil { return err } diff --git a/internal/staticsources/rtmp/source_test.go b/internal/staticsources/rtmp/source_test.go index 925a157f..2258ee33 100644 --- a/internal/staticsources/rtmp/source_test.go +++ b/internal/staticsources/rtmp/source_test.go @@ -64,24 +64,24 @@ func TestSource(t *testing.T) { te = test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "rtmp://localhost/teststream", - ReadTimeout: conf.StringDuration(10 * time.Second), - WriteTimeout: conf.StringDuration(10 * time.Second), - Parent: p, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteTimeout: conf.StringDuration(10 * time.Second), + Parent: p, } }, + "rtmp://localhost/teststream", &conf.Path{}, ) } else { te = test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "rtmps://localhost/teststream", - ReadTimeout: conf.StringDuration(10 * time.Second), - WriteTimeout: conf.StringDuration(10 * time.Second), - Parent: p, + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteTimeout: conf.StringDuration(10 * time.Second), + Parent: p, } }, + "rtmps://localhost/teststream", &conf.Path{ SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739", }, diff --git a/internal/staticsources/rtsp/source.go b/internal/staticsources/rtsp/source.go index be603149..e7a2d8ab 100644 --- a/internal/staticsources/rtsp/source.go +++ b/internal/staticsources/rtsp/source.go @@ -62,7 +62,6 @@ func createRangeHeader(cnf *conf.Path) (*headers.Range, error) { // Source is a RTSP static source. type Source struct { - ResolvedSource string ReadTimeout conf.StringDuration WriteTimeout conf.StringDuration WriteQueueSize int @@ -104,7 +103,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { }, } - u, err := base.ParseURL(s.ResolvedSource) + u, err := base.ParseURL(params.ResolvedSource) if err != nil { return err } diff --git a/internal/staticsources/rtsp/source_test.go b/internal/staticsources/rtsp/source_test.go index 91b5ae7e..10494e28 100644 --- a/internal/staticsources/rtsp/source_test.go +++ b/internal/staticsources/rtsp/source_test.go @@ -138,13 +138,13 @@ func TestSource(t *testing.T) { te = test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "rtsp://testuser:testpass@localhost:8555/teststream", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), WriteQueueSize: 2048, Parent: p, } }, + "rtsp://testuser:testpass@localhost:8555/teststream", &conf.Path{ RTSPTransport: sp, }, @@ -153,13 +153,13 @@ func TestSource(t *testing.T) { te = test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "rtsps://testuser:testpass@localhost:8555/teststream", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), WriteQueueSize: 2048, Parent: p, } }, + "rtsps://testuser:testpass@localhost:8555/teststream", &conf.Path{ SourceFingerprint: "33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739", }, @@ -241,13 +241,13 @@ func TestRTSPSourceNoPassword(t *testing.T) { te := test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "rtsp://testuser:@127.0.0.1:8555/teststream", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), WriteQueueSize: 2048, Parent: p, } }, + "rtsp://testuser:@127.0.0.1:8555/teststream", &conf.Path{ RTSPTransport: sp, }, @@ -338,13 +338,13 @@ func TestRTSPSourceRange(t *testing.T) { te := test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "rtsp://127.0.0.1:8555/teststream", ReadTimeout: conf.StringDuration(10 * time.Second), WriteTimeout: conf.StringDuration(10 * time.Second), WriteQueueSize: 2048, Parent: p, } }, + "rtsp://127.0.0.1:8555/teststream", cnf, ) defer te.Close() diff --git a/internal/staticsources/srt/source.go b/internal/staticsources/srt/source.go index 8026ed3e..69e65c3f 100644 --- a/internal/staticsources/srt/source.go +++ b/internal/staticsources/srt/source.go @@ -17,9 +17,8 @@ import ( // Source is a SRT static source. type Source struct { - ResolvedSource string - ReadTimeout conf.StringDuration - Parent defs.StaticSourceParent + ReadTimeout conf.StringDuration + Parent defs.StaticSourceParent } // Log implements logger.Writer. @@ -32,7 +31,7 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") conf := srt.DefaultConfig() - address, err := conf.UnmarshalURL(s.ResolvedSource) + address, err := conf.UnmarshalURL(params.ResolvedSource) if err != nil { return err } diff --git a/internal/staticsources/srt/source_test.go b/internal/staticsources/srt/source_test.go index 6243a8e1..f4257ec7 100644 --- a/internal/staticsources/srt/source_test.go +++ b/internal/staticsources/srt/source_test.go @@ -55,11 +55,11 @@ func TestSource(t *testing.T) { te := test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567", - ReadTimeout: conf.StringDuration(10 * time.Second), - Parent: p, + ReadTimeout: conf.StringDuration(10 * time.Second), + Parent: p, } }, + "srt://127.0.0.1:9002?streamid=sidname&passphrase=ttest1234567", &conf.Path{}, ) defer te.Close() diff --git a/internal/staticsources/udp/source.go b/internal/staticsources/udp/source.go index 1202b2f1..0d899f32 100644 --- a/internal/staticsources/udp/source.go +++ b/internal/staticsources/udp/source.go @@ -45,9 +45,8 @@ type packetConn interface { // Source is a UDP static source. type Source struct { - ResolvedSource string - ReadTimeout conf.StringDuration - Parent defs.StaticSourceParent + ReadTimeout conf.StringDuration + Parent defs.StaticSourceParent } // Log implements logger.Writer. @@ -59,7 +58,7 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) { func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") - hostPort := s.ResolvedSource[len("udp://"):] + hostPort := params.ResolvedSource[len("udp://"):] addr, err := net.ResolveUDPAddr("udp", hostPort) if err != nil { diff --git a/internal/staticsources/udp/source_test.go b/internal/staticsources/udp/source_test.go index 65fb759a..fa64dc7e 100644 --- a/internal/staticsources/udp/source_test.go +++ b/internal/staticsources/udp/source_test.go @@ -18,11 +18,11 @@ func TestSource(t *testing.T) { te := test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "udp://127.0.0.1:9001", - ReadTimeout: conf.StringDuration(10 * time.Second), - Parent: p, + ReadTimeout: conf.StringDuration(10 * time.Second), + Parent: p, } }, + "udp://127.0.0.1:9001", &conf.Path{}, ) defer te.Close() diff --git a/internal/staticsources/webrtc/source.go b/internal/staticsources/webrtc/source.go index 3fab2a77..f2004a7d 100644 --- a/internal/staticsources/webrtc/source.go +++ b/internal/staticsources/webrtc/source.go @@ -19,9 +19,8 @@ import ( // Source is a WebRTC static source. type Source struct { - ResolvedSource string - ReadTimeout conf.StringDuration - Parent defs.StaticSourceParent + ReadTimeout conf.StringDuration + Parent defs.StaticSourceParent } // Log implements logger.Writer. @@ -33,7 +32,7 @@ func (s *Source) Log(level logger.Level, format string, args ...interface{}) { func (s *Source) Run(params defs.StaticSourceRunParams) error { s.Log(logger.Debug, "connecting") - u, err := url.Parse(s.ResolvedSource) + u, err := url.Parse(params.ResolvedSource) if err != nil { return err } diff --git a/internal/staticsources/webrtc/source_test.go b/internal/staticsources/webrtc/source_test.go index 20edf3e7..c8b2842f 100644 --- a/internal/staticsources/webrtc/source_test.go +++ b/internal/staticsources/webrtc/source_test.go @@ -121,11 +121,11 @@ func TestSource(t *testing.T) { te := test.NewSourceTester( func(p defs.StaticSourceParent) defs.StaticSource { return &Source{ - ResolvedSource: "whep://localhost:9003/my/resource", - ReadTimeout: conf.StringDuration(10 * time.Second), - Parent: p, + ReadTimeout: conf.StringDuration(10 * time.Second), + Parent: p, } }, + "whep://localhost:9003/my/resource", &conf.Path{}, ) defer te.Close() diff --git a/internal/test/source_tester.go b/internal/test/source_tester.go index 7a52501a..92d6e4d3 100644 --- a/internal/test/source_tester.go +++ b/internal/test/source_tester.go @@ -24,7 +24,11 @@ type SourceTester struct { } // NewSourceTester allocates a SourceTester. -func NewSourceTester(createFunc func(defs.StaticSourceParent) defs.StaticSource, conf *conf.Path) *SourceTester { +func NewSourceTester( + createFunc func(defs.StaticSourceParent) defs.StaticSource, + resolvedSource string, + conf *conf.Path, +) *SourceTester { ctx, ctxCancel := context.WithCancel(context.Background()) t := &SourceTester{ @@ -38,8 +42,9 @@ func NewSourceTester(createFunc func(defs.StaticSourceParent) defs.StaticSource, go func() { s.Run(defs.StaticSourceRunParams{ //nolint:errcheck - Context: ctx, - Conf: conf, + Context: ctx, + ResolvedSource: resolvedSource, + Conf: conf, }) close(t.done) }() diff --git a/mediamtx.yml b/mediamtx.yml index dbc451eb..f936a305 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -418,8 +418,10 @@ pathDefaults: # * wheps://existing-url -> the stream is pulled from another WebRTC server / camera with HTTPS # * redirect -> the stream is provided by another path or server # * rpiCamera -> the stream is provided by a Raspberry Pi Camera - # If path name is a regular expression, $G1, G2, etc will be replaced - # with regular expression groups. + # The following variables can be used in the source string: + # * $MTX_QUERY: query parameters (passed by first reader) + # * $G1, $G2, ...: regular expression groups, if path name is + # a regular expression. source: publisher # If the source is a URL, and the source certificate is self-signed # or invalid, you can provide the fingerprint of the certificate in order to