diff --git a/app/api/api.go b/app/api/api.go index d8722c30..b1f5d76b 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -31,6 +31,7 @@ import ( "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" "github.com/datarhei/core/v16/restream" + restreamapp "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/store" "github.com/datarhei/core/v16/rtmp" @@ -449,39 +450,62 @@ func (a *api) start() error { a.replacer = replace.New() { - a.replacer.RegisterTemplate("diskfs", a.diskfs.Base(), nil) - a.replacer.RegisterTemplate("memfs", a.memfs.Base(), nil) + a.replacer.RegisterTemplateFunc("diskfs", func(config *restreamapp.Config, section string) string { + return a.diskfs.Base() + }, nil) - host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address) - if len(host) == 0 { - host = "localhost" - } + a.replacer.RegisterTemplateFunc("fs:disk", func(config *restreamapp.Config, section string) string { + return a.diskfs.Base() + }, nil) - template := "rtmp://" + host + ":" + port - if cfg.RTMP.App != "/" { - template += cfg.RTMP.App - } - template += "/{name}" + a.replacer.RegisterTemplateFunc("memfs", func(config *restreamapp.Config, section string) string { + return a.memfs.Base() + }, nil) - if len(cfg.RTMP.Token) != 0 { - template += "?token=" + cfg.RTMP.Token - } + a.replacer.RegisterTemplateFunc("fs:mem", func(config *restreamapp.Config, section string) string { + return a.memfs.Base() + }, nil) - a.replacer.RegisterTemplate("rtmp", template, nil) + a.replacer.RegisterTemplateFunc("rtmp", func(config *restreamapp.Config, section string) string { + host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address) + if len(host) == 0 { + host = "localhost" + } - host, port, _ = gonet.SplitHostPort(cfg.SRT.Address) - if len(host) == 0 { - host = "localhost" - } + template := "rtmp://" + host + ":" + port + if cfg.RTMP.App != "/" { + template += cfg.RTMP.App + } + template += "/{name}" - template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name},mode:{mode}" - if len(cfg.SRT.Token) != 0 { - template += ",token:" + cfg.SRT.Token - } - if len(cfg.SRT.Passphrase) != 0 { - template += "&passphrase=" + cfg.SRT.Passphrase - } - a.replacer.RegisterTemplate("srt", template, map[string]string{ + if len(cfg.RTMP.Token) != 0 { + template += "?token=" + cfg.RTMP.Token + } + + return template + }, nil) + + a.replacer.RegisterTemplateFunc("srt", func(config *restreamapp.Config, section string) string { + host, port, _ = gonet.SplitHostPort(cfg.SRT.Address) + if len(host) == 0 { + host = "localhost" + } + + template := "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name}" + if section == "output" { + template += ",mode:publish" + } else { + template += ",mode:request" + } + if len(cfg.SRT.Token) != 0 { + template += ",token:" + cfg.SRT.Token + } + if len(cfg.SRT.Passphrase) != 0 { + template += "&passphrase=" + cfg.SRT.Passphrase + } + + return template + }, map[string]string{ "latency": "20000", // 20 milliseconds, FFmpeg requires microseconds }) } diff --git a/restream/app/process.go b/restream/app/process.go index 1d62220b..4ec6036a 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -2,7 +2,6 @@ package app import ( "github.com/datarhei/core/v16/process" - "github.com/datarhei/core/v16/restream/replace" ) type ConfigIOCleanup struct { @@ -80,79 +79,6 @@ func (config *Config) Clone() *Config { return clone } -// ReplacePlaceholders replaces all placeholders in the config. The config -// will be modified in place. -func (config *Config) ResolvePlaceholders(r replace.Replacer) { - for i, option := range config.Options { - // Replace any known placeholders - option = r.Replace(option, "diskfs", "") - - config.Options[i] = option - } - - // Resolving the given inputs - for i, input := range config.Input { - // Replace any known placeholders - input.ID = r.Replace(input.ID, "processid", config.ID) - input.ID = r.Replace(input.ID, "reference", config.Reference) - input.Address = r.Replace(input.Address, "inputid", input.ID) - input.Address = r.Replace(input.Address, "processid", config.ID) - input.Address = r.Replace(input.Address, "reference", config.Reference) - input.Address = r.Replace(input.Address, "diskfs", "") - input.Address = r.Replace(input.Address, "memfs", "") - input.Address = r.Replace(input.Address, "rtmp", "") - input.Address = r.Replace(input.Address, "srt", "") - - for j, option := range input.Options { - // Replace any known placeholders - option = r.Replace(option, "inputid", input.ID) - option = r.Replace(option, "processid", config.ID) - option = r.Replace(option, "reference", config.Reference) - option = r.Replace(option, "diskfs", "") - option = r.Replace(option, "memfs", "") - - input.Options[j] = option - } - - config.Input[i] = input - } - - // Resolving the given outputs - for i, output := range config.Output { - // Replace any known placeholders - output.ID = r.Replace(output.ID, "processid", config.ID) - output.Address = r.Replace(output.Address, "outputid", output.ID) - output.Address = r.Replace(output.Address, "processid", config.ID) - output.Address = r.Replace(output.Address, "reference", config.Reference) - output.Address = r.Replace(output.Address, "diskfs", "") - output.Address = r.Replace(output.Address, "memfs", "") - output.Address = r.Replace(output.Address, "rtmp", "") - output.Address = r.Replace(output.Address, "srt", "") - - for j, option := range output.Options { - // Replace any known placeholders - option = r.Replace(option, "outputid", output.ID) - option = r.Replace(option, "processid", config.ID) - option = r.Replace(option, "reference", config.Reference) - option = r.Replace(option, "diskfs", "") - option = r.Replace(option, "memfs", "") - - output.Options[j] = option - } - - for j, cleanup := range output.Cleanup { - // Replace any known placeholders - cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID) - cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID) - cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference) - - output.Cleanup[j] = cleanup - } - - config.Output[i] = output - } -} - // CreateCommand created the FFmpeg command from this config. func (config *Config) CreateCommand() []string { var command []string diff --git a/restream/replace/replace.go b/restream/replace/replace.go index f87757eb..83202ff1 100644 --- a/restream/replace/replace.go +++ b/restream/replace/replace.go @@ -4,8 +4,13 @@ import ( "net/url" "regexp" "strings" + + "github.com/datarhei/core/v16/glob" + "github.com/datarhei/core/v16/restream/app" ) +type TemplateFn func(config *app.Config, section string) string + type Replacer interface { // RegisterTemplate registers a template for a specific placeholder. Template // may contain placeholders as well of the form {name}. They will be replaced @@ -15,7 +20,7 @@ type Replacer interface { // RegisterTemplateFunc does the same as RegisterTemplate, but the template // is returned by the template function. - RegisterTemplateFunc(placeholder string, template func() string, defaults map[string]string) + RegisterTemplateFunc(placeholder string, template TemplateFn, defaults map[string]string) // Replace replaces all occurences of placeholder in str with value. The placeholder is of the // form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^ @@ -25,12 +30,13 @@ type Replacer interface { // the value of the corresponding key in the parameters. // If the value is an empty string, the registered templates will be searched for that // placeholder. If no template is found, the placeholder will be replaced by the empty string. - // A placeholder name may consist on of the letters a-z. - Replace(str, placeholder, value string) string + // A placeholder name may consist on of the letters a-z and ':'. The placeholder may contain + // a glob pattern to find the appropriate template. + Replace(str, placeholder, value string, vars map[string]string, config *app.Config, section string) string } type template struct { - fn func() string + fn TemplateFn defaults map[string]string } @@ -45,38 +51,38 @@ type replacer struct { func New() Replacer { r := &replacer{ templates: make(map[string]template), - re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`), - templateRe: regexp.MustCompile(`{([a-z]+)}`), + re: regexp.MustCompile(`{([a-z:]+)(?:\^(.))?(?:,(.*?))?}`), + templateRe: regexp.MustCompile(`{([a-z:]+)}`), } return r } func (r *replacer) RegisterTemplate(placeholder, tmpl string, defaults map[string]string) { + r.RegisterTemplateFunc(placeholder, func(*app.Config, string) string { return tmpl }, defaults) +} + +func (r *replacer) RegisterTemplateFunc(placeholder string, templateFn TemplateFn, defaults map[string]string) { r.templates[placeholder] = template{ - fn: func() string { return tmpl }, + fn: templateFn, defaults: defaults, } } -func (r *replacer) RegisterTemplateFunc(placeholder string, tmplFn func() string, defaults map[string]string) { - r.templates[placeholder] = template{ - fn: tmplFn, - defaults: defaults, - } -} - -func (r *replacer) Replace(str, placeholder, value string) string { +func (r *replacer) Replace(str, placeholder, value string, vars map[string]string, config *app.Config, kind string) string { str = r.re.ReplaceAllStringFunc(str, func(match string) string { matches := r.re.FindStringSubmatch(match) - if matches[1] != placeholder { + + if ok, _ := glob.Match(placeholder, matches[1], ':'); !ok { return match } + placeholder := matches[1] + // We need a copy from the value v := value var tmpl template = template{ - fn: func() string { return v }, + fn: func(*app.Config, string) string { return v }, } // Check for a registered template @@ -87,8 +93,8 @@ func (r *replacer) Replace(str, placeholder, value string) string { } } - v = tmpl.fn() - v = r.compileTemplate(v, matches[3], tmpl.defaults) + v = tmpl.fn(config, kind) + v = r.compileTemplate(v, matches[3], vars, tmpl.defaults) if len(matches[2]) != 0 { // If there's a character to escape, we also have to escape the @@ -113,7 +119,7 @@ func (r *replacer) Replace(str, placeholder, value string) string { // placeholder name and will be replaced with the value. The resulting string is "Hello World!". // If a placeholder name is not present in the params string, it will not be replaced. The key // and values can be escaped as in net/url.QueryEscape. -func (r *replacer) compileTemplate(str, params string, defaults map[string]string) string { +func (r *replacer) compileTemplate(str, params string, vars map[string]string, defaults map[string]string) string { if len(params) == 0 && len(defaults) == 0 { return str } @@ -132,15 +138,22 @@ func (r *replacer) compileTemplate(str, params string, defaults map[string]strin if key == "" { continue } + key, value, _ := strings.Cut(key, "=") key, err := url.QueryUnescape(key) if err != nil { continue } + value, err = url.QueryUnescape(value) if err != nil { continue } + + for name, v := range vars { + value = strings.ReplaceAll(value, "$"+name, v) + } + p[key] = value } diff --git a/restream/replace/replace_test.go b/restream/replace/replace_test.go index f1ebcceb..1d9ccfe0 100644 --- a/restream/replace/replace_test.go +++ b/restream/replace/replace_test.go @@ -3,6 +3,7 @@ package replace import ( "testing" + "github.com/datarhei/core/v16/restream/app" "github.com/stretchr/testify/require" ) @@ -24,25 +25,39 @@ func TestReplace(t *testing.T) { r := New() for _, e := range samples { - replaced := r.Replace(e[0], "foobar", foobar) + replaced := r.Replace(e[0], "foobar", foobar, nil, nil, "") require.Equal(t, e[1], replaced, e[0]) } - replaced := r.Replace("{foobar}", "foobar", "") + replaced := r.Replace("{foobar}", "foobar", "", nil, nil, "") require.Equal(t, "", replaced) } func TestReplaceTemplate(t *testing.T) { r := New() - r.RegisterTemplate("foobar", "Hello {who}! {what}?", nil) + r.RegisterTemplate("foo:bar", "Hello {who}! {what}?", nil) - replaced := r.Replace("{foobar,who=World}", "foobar", "") + replaced := r.Replace("{foo:bar,who=World}", "foo:bar", "", nil, nil, "") require.Equal(t, "Hello World! {what}?", replaced) - replaced = r.Replace("{foobar,who=World,what=E%3dmc^2}", "foobar", "") + replaced = r.Replace("{foo:bar,who=World,what=E%3dmc^2}", "foo:bar", "", nil, nil, "") require.Equal(t, "Hello World! E=mc^2?", replaced) - replaced = r.Replace("{foobar^:,who=World,what=E%3dmc:2}", "foobar", "") + replaced = r.Replace("{foo:bar^:,who=World,what=E%3dmc:2}", "foo:bar", "", nil, nil, "") + require.Equal(t, "Hello World! E=mc\\\\:2?", replaced) +} + +func TestReplaceTemplateFunc(t *testing.T) { + r := New() + r.RegisterTemplateFunc("foo:bar", func(config *app.Config, kind string) string { return "Hello {who}! {what}?" }, nil) + + replaced := r.Replace("{foo:bar,who=World}", "foo:bar", "", nil, nil, "") + require.Equal(t, "Hello World! {what}?", replaced) + + replaced = r.Replace("{foo:bar,who=World,what=E%3dmc^2}", "foo:bar", "", nil, nil, "") + require.Equal(t, "Hello World! E=mc^2?", replaced) + + replaced = r.Replace("{foo:bar^:,who=World,what=E%3dmc:2}", "foo:bar", "", nil, nil, "") require.Equal(t, "Hello World! E=mc\\\\:2?", replaced) } @@ -53,10 +68,10 @@ func TestReplaceTemplateDefaults(t *testing.T) { "what": "something", }) - replaced := r.Replace("{foobar}", "foobar", "") + replaced := r.Replace("{foobar}", "foobar", "", nil, nil, "") require.Equal(t, "Hello someone! something?", replaced) - replaced = r.Replace("{foobar,who=World}", "foobar", "") + replaced = r.Replace("{foobar,who=World}", "foobar", "", nil, nil, "") require.Equal(t, "Hello World! something?", replaced) } @@ -72,7 +87,7 @@ func TestReplaceCompileTemplate(t *testing.T) { r := New().(*replacer) for _, e := range samples { - replaced := r.compileTemplate(e[0], e[1], nil) + replaced := r.compileTemplate(e[0], e[1], nil, nil) require.Equal(t, e[2], replaced, e[0]) } } @@ -89,10 +104,41 @@ func TestReplaceCompileTemplateDefaults(t *testing.T) { r := New().(*replacer) for _, e := range samples { - replaced := r.compileTemplate(e[0], e[1], map[string]string{ + replaced := r.compileTemplate(e[0], e[1], nil, map[string]string{ "who": "someone", "what": "something", }) require.Equal(t, e[2], replaced, e[0]) } } + +func TestReplaceCompileTemplateWithVars(t *testing.T) { + samples := [][3]string{ + {"Hello {who}!", "who=$processid", "Hello 123456789!"}, + {"Hello {who}! {what}?", "who=$location", "Hello World! {what}?"}, + {"Hello {who}! {what}?", "who=$location,what=Yeah", "Hello World! Yeah?"}, + {"Hello {who}! {what}?", "who=$location,what=$processid", "Hello World! 123456789?"}, + {"Hello {who}!", "who=$processidxxx", "Hello 123456789xxx!"}, + } + + vars := map[string]string{ + "processid": "123456789", + "location": "World", + } + + r := New().(*replacer) + + for _, e := range samples { + replaced := r.compileTemplate(e[0], e[1], vars, nil) + require.Equal(t, e[2], replaced, e[0]) + } +} + +func TestReplaceGlob(t *testing.T) { + r := New() + r.RegisterTemplate("foo:bar", "Hello foobar", nil) + r.RegisterTemplate("foo:baz", "Hello foobaz", nil) + + replaced := r.Replace("{foo:baz}, {foo:bar}", "foo:*", "", nil, nil, "") + require.Equal(t, "Hello foobaz, Hello foobar", replaced) +} diff --git a/restream/restream.go b/restream/restream.go index 4c5f0b28..66f71b1b 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -290,7 +290,7 @@ func (r *restream) load() error { } // Replace all placeholders in the config - t.config.ResolvePlaceholders(r.replace) + resolvePlaceholders(t.config, r.replace) tasks[id] = t } @@ -463,7 +463,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { logger: r.logger.WithField("id", process.ID), } - t.config.ResolvePlaceholders(r.replace) + resolvePlaceholders(t.config, r.replace) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -1089,7 +1089,7 @@ func (r *restream) reloadProcess(id string) error { t.config = t.process.Config.Clone() - t.config.ResolvePlaceholders(r.replace) + resolvePlaceholders(t.config, r.replace) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -1437,3 +1437,94 @@ func (r *restream) GetMetadata(key string) (interface{}, error) { return data, nil } + +// resolvePlaceholders replaces all placeholders in the config. The config +// will be modified in place. +func resolvePlaceholders(config *app.Config, r replace.Replacer) { + vars := map[string]string{ + "processid": config.ID, + "reference": config.Reference, + } + + for i, option := range config.Options { + // Replace any known placeholders + option = r.Replace(option, "diskfs", "", vars, config, "global") + option = r.Replace(option, "fs:*", "", vars, config, "global") + + config.Options[i] = option + } + + // Resolving the given inputs + for i, input := range config.Input { + vars["inputid"] = input.ID + + // Replace any known placeholders + input.ID = r.Replace(input.ID, "processid", config.ID, nil, nil, "input") + input.ID = r.Replace(input.ID, "reference", config.Reference, nil, nil, "input") + input.Address = r.Replace(input.Address, "inputid", input.ID, nil, nil, "input") + input.Address = r.Replace(input.Address, "processid", config.ID, nil, nil, "input") + input.Address = r.Replace(input.Address, "reference", config.Reference, nil, nil, "input") + input.Address = r.Replace(input.Address, "diskfs", "", vars, config, "input") + input.Address = r.Replace(input.Address, "memfs", "", vars, config, "input") + input.Address = r.Replace(input.Address, "fs:*", "", vars, config, "input") + input.Address = r.Replace(input.Address, "rtmp", "", vars, config, "input") + input.Address = r.Replace(input.Address, "srt", "", vars, config, "input") + + for j, option := range input.Options { + // Replace any known placeholders + option = r.Replace(option, "inputid", input.ID, nil, nil, "input") + option = r.Replace(option, "processid", config.ID, nil, nil, "input") + option = r.Replace(option, "reference", config.Reference, nil, nil, "input") + option = r.Replace(option, "diskfs", "", vars, config, "input") + option = r.Replace(option, "memfs", "", vars, config, "input") + option = r.Replace(option, "fs:*", "", vars, config, "input") + + input.Options[j] = option + } + + delete(vars, "inputid") + + config.Input[i] = input + } + + // Resolving the given outputs + for i, output := range config.Output { + vars["outputid"] = output.ID + + // Replace any known placeholders + output.ID = r.Replace(output.ID, "processid", config.ID, nil, nil, "output") + output.Address = r.Replace(output.Address, "outputid", output.ID, nil, nil, "output") + output.Address = r.Replace(output.Address, "processid", config.ID, nil, nil, "output") + output.Address = r.Replace(output.Address, "reference", config.Reference, nil, nil, "output") + output.Address = r.Replace(output.Address, "diskfs", "", vars, config, "output") + output.Address = r.Replace(output.Address, "memfs", "", vars, config, "output") + output.Address = r.Replace(output.Address, "fs:*", "", vars, config, "output") + output.Address = r.Replace(output.Address, "rtmp", "", vars, config, "output") + output.Address = r.Replace(output.Address, "srt", "", vars, config, "output") + + for j, option := range output.Options { + // Replace any known placeholders + option = r.Replace(option, "outputid", output.ID, nil, nil, "output") + option = r.Replace(option, "processid", config.ID, nil, nil, "output") + option = r.Replace(option, "reference", config.Reference, nil, nil, "output") + option = r.Replace(option, "diskfs", "", vars, config, "output") + option = r.Replace(option, "memfs", "", vars, config, "output") + option = r.Replace(option, "fs:*", "", vars, config, "output") + + output.Options[j] = option + } + + for j, cleanup := range output.Cleanup { + // Replace any known placeholders + cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID, nil, nil, "output") + cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID, nil, nil, "output") + cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference, nil, nil, "output") + + output.Cleanup[j] = cleanup + } + + delete(vars, "outputid") + + config.Output[i] = output + } +}