From 0147651de6193b3a1c7649607374a2ee3dcf4593 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 20 Jan 2023 13:38:33 +0100 Subject: [PATCH] Extend placeholders 1. Allow variables in placeholders for parameter values, e.g. {rtmp,name=$processid}. The variable starts with a $ letter. The recognized variables are provided with the Replace func. 2. The template func recieves the process config and the name of the section where this placeholder is located, i.e. "global", "input", or "output". --- app/api/api.go | 78 ++++++++++++++++--------- restream/app/process.go | 74 ------------------------ restream/replace/replace.go | 53 ++++++++++------- restream/replace/replace_test.go | 66 ++++++++++++++++++---- restream/restream.go | 97 +++++++++++++++++++++++++++++++- 5 files changed, 234 insertions(+), 134 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index d8722c30..b1f5d76b 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -31,6 +31,7 @@ import ( "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" "github.com/datarhei/core/v16/restream" + restreamapp "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/store" "github.com/datarhei/core/v16/rtmp" @@ -449,39 +450,62 @@ func (a *api) start() error { a.replacer = replace.New() { - a.replacer.RegisterTemplate("diskfs", a.diskfs.Base(), nil) - a.replacer.RegisterTemplate("memfs", a.memfs.Base(), nil) + a.replacer.RegisterTemplateFunc("diskfs", func(config *restreamapp.Config, section string) string { + return a.diskfs.Base() + }, nil) - host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address) - if len(host) == 0 { - host = "localhost" - } + a.replacer.RegisterTemplateFunc("fs:disk", func(config *restreamapp.Config, section string) string { + return a.diskfs.Base() + }, nil) - template := "rtmp://" + host + ":" + port - if cfg.RTMP.App != "/" { - template += cfg.RTMP.App - } - template += "/{name}" + a.replacer.RegisterTemplateFunc("memfs", func(config *restreamapp.Config, section string) string { + return a.memfs.Base() + }, nil) - if len(cfg.RTMP.Token) != 0 { - template += "?token=" + cfg.RTMP.Token - } + a.replacer.RegisterTemplateFunc("fs:mem", func(config *restreamapp.Config, section string) string { + return a.memfs.Base() + }, nil) - a.replacer.RegisterTemplate("rtmp", template, nil) + a.replacer.RegisterTemplateFunc("rtmp", func(config *restreamapp.Config, section string) string { + host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address) + if len(host) == 0 { + host = "localhost" + } - host, port, _ = gonet.SplitHostPort(cfg.SRT.Address) - if len(host) == 0 { - host = "localhost" - } + template := "rtmp://" + host + ":" + port + if cfg.RTMP.App != "/" { + template += cfg.RTMP.App + } + template += "/{name}" - template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name},mode:{mode}" - if len(cfg.SRT.Token) != 0 { - template += ",token:" + cfg.SRT.Token - } - if len(cfg.SRT.Passphrase) != 0 { - template += "&passphrase=" + cfg.SRT.Passphrase - } - a.replacer.RegisterTemplate("srt", template, map[string]string{ + if len(cfg.RTMP.Token) != 0 { + template += "?token=" + cfg.RTMP.Token + } + + return template + }, nil) + + a.replacer.RegisterTemplateFunc("srt", func(config *restreamapp.Config, section string) string { + host, port, _ = gonet.SplitHostPort(cfg.SRT.Address) + if len(host) == 0 { + host = "localhost" + } + + template := "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name}" + if section == "output" { + template += ",mode:publish" + } else { + template += ",mode:request" + } + if len(cfg.SRT.Token) != 0 { + template += ",token:" + cfg.SRT.Token + } + if len(cfg.SRT.Passphrase) != 0 { + template += "&passphrase=" + cfg.SRT.Passphrase + } + + return template + }, map[string]string{ "latency": "20000", // 20 milliseconds, FFmpeg requires microseconds }) } diff --git a/restream/app/process.go b/restream/app/process.go index 1d62220b..4ec6036a 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -2,7 +2,6 @@ package app import ( "github.com/datarhei/core/v16/process" - "github.com/datarhei/core/v16/restream/replace" ) type ConfigIOCleanup struct { @@ -80,79 +79,6 @@ func (config *Config) Clone() *Config { return clone } -// ReplacePlaceholders replaces all placeholders in the config. The config -// will be modified in place. -func (config *Config) ResolvePlaceholders(r replace.Replacer) { - for i, option := range config.Options { - // Replace any known placeholders - option = r.Replace(option, "diskfs", "") - - config.Options[i] = option - } - - // Resolving the given inputs - for i, input := range config.Input { - // Replace any known placeholders - input.ID = r.Replace(input.ID, "processid", config.ID) - input.ID = r.Replace(input.ID, "reference", config.Reference) - input.Address = r.Replace(input.Address, "inputid", input.ID) - input.Address = r.Replace(input.Address, "processid", config.ID) - input.Address = r.Replace(input.Address, "reference", config.Reference) - input.Address = r.Replace(input.Address, "diskfs", "") - input.Address = r.Replace(input.Address, "memfs", "") - input.Address = r.Replace(input.Address, "rtmp", "") - input.Address = r.Replace(input.Address, "srt", "") - - for j, option := range input.Options { - // Replace any known placeholders - option = r.Replace(option, "inputid", input.ID) - option = r.Replace(option, "processid", config.ID) - option = r.Replace(option, "reference", config.Reference) - option = r.Replace(option, "diskfs", "") - option = r.Replace(option, "memfs", "") - - input.Options[j] = option - } - - config.Input[i] = input - } - - // Resolving the given outputs - for i, output := range config.Output { - // Replace any known placeholders - output.ID = r.Replace(output.ID, "processid", config.ID) - output.Address = r.Replace(output.Address, "outputid", output.ID) - output.Address = r.Replace(output.Address, "processid", config.ID) - output.Address = r.Replace(output.Address, "reference", config.Reference) - output.Address = r.Replace(output.Address, "diskfs", "") - output.Address = r.Replace(output.Address, "memfs", "") - output.Address = r.Replace(output.Address, "rtmp", "") - output.Address = r.Replace(output.Address, "srt", "") - - for j, option := range output.Options { - // Replace any known placeholders - option = r.Replace(option, "outputid", output.ID) - option = r.Replace(option, "processid", config.ID) - option = r.Replace(option, "reference", config.Reference) - option = r.Replace(option, "diskfs", "") - option = r.Replace(option, "memfs", "") - - output.Options[j] = option - } - - for j, cleanup := range output.Cleanup { - // Replace any known placeholders - cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID) - cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID) - cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference) - - output.Cleanup[j] = cleanup - } - - config.Output[i] = output - } -} - // CreateCommand created the FFmpeg command from this config. func (config *Config) CreateCommand() []string { var command []string diff --git a/restream/replace/replace.go b/restream/replace/replace.go index f87757eb..83202ff1 100644 --- a/restream/replace/replace.go +++ b/restream/replace/replace.go @@ -4,8 +4,13 @@ import ( "net/url" "regexp" "strings" + + "github.com/datarhei/core/v16/glob" + "github.com/datarhei/core/v16/restream/app" ) +type TemplateFn func(config *app.Config, section string) string + type Replacer interface { // RegisterTemplate registers a template for a specific placeholder. Template // may contain placeholders as well of the form {name}. They will be replaced @@ -15,7 +20,7 @@ type Replacer interface { // RegisterTemplateFunc does the same as RegisterTemplate, but the template // is returned by the template function. - RegisterTemplateFunc(placeholder string, template func() string, defaults map[string]string) + RegisterTemplateFunc(placeholder string, template TemplateFn, defaults map[string]string) // Replace replaces all occurences of placeholder in str with value. The placeholder is of the // form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^ @@ -25,12 +30,13 @@ type Replacer interface { // the value of the corresponding key in the parameters. // If the value is an empty string, the registered templates will be searched for that // placeholder. If no template is found, the placeholder will be replaced by the empty string. - // A placeholder name may consist on of the letters a-z. - Replace(str, placeholder, value string) string + // A placeholder name may consist on of the letters a-z and ':'. The placeholder may contain + // a glob pattern to find the appropriate template. + Replace(str, placeholder, value string, vars map[string]string, config *app.Config, section string) string } type template struct { - fn func() string + fn TemplateFn defaults map[string]string } @@ -45,38 +51,38 @@ type replacer struct { func New() Replacer { r := &replacer{ templates: make(map[string]template), - re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`), - templateRe: regexp.MustCompile(`{([a-z]+)}`), + re: regexp.MustCompile(`{([a-z:]+)(?:\^(.))?(?:,(.*?))?}`), + templateRe: regexp.MustCompile(`{([a-z:]+)}`), } return r } func (r *replacer) RegisterTemplate(placeholder, tmpl string, defaults map[string]string) { + r.RegisterTemplateFunc(placeholder, func(*app.Config, string) string { return tmpl }, defaults) +} + +func (r *replacer) RegisterTemplateFunc(placeholder string, templateFn TemplateFn, defaults map[string]string) { r.templates[placeholder] = template{ - fn: func() string { return tmpl }, + fn: templateFn, defaults: defaults, } } -func (r *replacer) RegisterTemplateFunc(placeholder string, tmplFn func() string, defaults map[string]string) { - r.templates[placeholder] = template{ - fn: tmplFn, - defaults: defaults, - } -} - -func (r *replacer) Replace(str, placeholder, value string) string { +func (r *replacer) Replace(str, placeholder, value string, vars map[string]string, config *app.Config, kind string) string { str = r.re.ReplaceAllStringFunc(str, func(match string) string { matches := r.re.FindStringSubmatch(match) - if matches[1] != placeholder { + + if ok, _ := glob.Match(placeholder, matches[1], ':'); !ok { return match } + placeholder := matches[1] + // We need a copy from the value v := value var tmpl template = template{ - fn: func() string { return v }, + fn: func(*app.Config, string) string { return v }, } // Check for a registered template @@ -87,8 +93,8 @@ func (r *replacer) Replace(str, placeholder, value string) string { } } - v = tmpl.fn() - v = r.compileTemplate(v, matches[3], tmpl.defaults) + v = tmpl.fn(config, kind) + v = r.compileTemplate(v, matches[3], vars, tmpl.defaults) if len(matches[2]) != 0 { // If there's a character to escape, we also have to escape the @@ -113,7 +119,7 @@ func (r *replacer) Replace(str, placeholder, value string) string { // placeholder name and will be replaced with the value. The resulting string is "Hello World!". // If a placeholder name is not present in the params string, it will not be replaced. The key // and values can be escaped as in net/url.QueryEscape. -func (r *replacer) compileTemplate(str, params string, defaults map[string]string) string { +func (r *replacer) compileTemplate(str, params string, vars map[string]string, defaults map[string]string) string { if len(params) == 0 && len(defaults) == 0 { return str } @@ -132,15 +138,22 @@ func (r *replacer) compileTemplate(str, params string, defaults map[string]strin if key == "" { continue } + key, value, _ := strings.Cut(key, "=") key, err := url.QueryUnescape(key) if err != nil { continue } + value, err = url.QueryUnescape(value) if err != nil { continue } + + for name, v := range vars { + value = strings.ReplaceAll(value, "$"+name, v) + } + p[key] = value } diff --git a/restream/replace/replace_test.go b/restream/replace/replace_test.go index f1ebcceb..1d9ccfe0 100644 --- a/restream/replace/replace_test.go +++ b/restream/replace/replace_test.go @@ -3,6 +3,7 @@ package replace import ( "testing" + "github.com/datarhei/core/v16/restream/app" "github.com/stretchr/testify/require" ) @@ -24,25 +25,39 @@ func TestReplace(t *testing.T) { r := New() for _, e := range samples { - replaced := r.Replace(e[0], "foobar", foobar) + replaced := r.Replace(e[0], "foobar", foobar, nil, nil, "") require.Equal(t, e[1], replaced, e[0]) } - replaced := r.Replace("{foobar}", "foobar", "") + replaced := r.Replace("{foobar}", "foobar", "", nil, nil, "") require.Equal(t, "", replaced) } func TestReplaceTemplate(t *testing.T) { r := New() - r.RegisterTemplate("foobar", "Hello {who}! {what}?", nil) + r.RegisterTemplate("foo:bar", "Hello {who}! {what}?", nil) - replaced := r.Replace("{foobar,who=World}", "foobar", "") + replaced := r.Replace("{foo:bar,who=World}", "foo:bar", "", nil, nil, "") require.Equal(t, "Hello World! {what}?", replaced) - replaced = r.Replace("{foobar,who=World,what=E%3dmc^2}", "foobar", "") + replaced = r.Replace("{foo:bar,who=World,what=E%3dmc^2}", "foo:bar", "", nil, nil, "") require.Equal(t, "Hello World! E=mc^2?", replaced) - replaced = r.Replace("{foobar^:,who=World,what=E%3dmc:2}", "foobar", "") + replaced = r.Replace("{foo:bar^:,who=World,what=E%3dmc:2}", "foo:bar", "", nil, nil, "") + require.Equal(t, "Hello World! E=mc\\\\:2?", replaced) +} + +func TestReplaceTemplateFunc(t *testing.T) { + r := New() + r.RegisterTemplateFunc("foo:bar", func(config *app.Config, kind string) string { return "Hello {who}! {what}?" }, nil) + + replaced := r.Replace("{foo:bar,who=World}", "foo:bar", "", nil, nil, "") + require.Equal(t, "Hello World! {what}?", replaced) + + replaced = r.Replace("{foo:bar,who=World,what=E%3dmc^2}", "foo:bar", "", nil, nil, "") + require.Equal(t, "Hello World! E=mc^2?", replaced) + + replaced = r.Replace("{foo:bar^:,who=World,what=E%3dmc:2}", "foo:bar", "", nil, nil, "") require.Equal(t, "Hello World! E=mc\\\\:2?", replaced) } @@ -53,10 +68,10 @@ func TestReplaceTemplateDefaults(t *testing.T) { "what": "something", }) - replaced := r.Replace("{foobar}", "foobar", "") + replaced := r.Replace("{foobar}", "foobar", "", nil, nil, "") require.Equal(t, "Hello someone! something?", replaced) - replaced = r.Replace("{foobar,who=World}", "foobar", "") + replaced = r.Replace("{foobar,who=World}", "foobar", "", nil, nil, "") require.Equal(t, "Hello World! something?", replaced) } @@ -72,7 +87,7 @@ func TestReplaceCompileTemplate(t *testing.T) { r := New().(*replacer) for _, e := range samples { - replaced := r.compileTemplate(e[0], e[1], nil) + replaced := r.compileTemplate(e[0], e[1], nil, nil) require.Equal(t, e[2], replaced, e[0]) } } @@ -89,10 +104,41 @@ func TestReplaceCompileTemplateDefaults(t *testing.T) { r := New().(*replacer) for _, e := range samples { - replaced := r.compileTemplate(e[0], e[1], map[string]string{ + replaced := r.compileTemplate(e[0], e[1], nil, map[string]string{ "who": "someone", "what": "something", }) require.Equal(t, e[2], replaced, e[0]) } } + +func TestReplaceCompileTemplateWithVars(t *testing.T) { + samples := [][3]string{ + {"Hello {who}!", "who=$processid", "Hello 123456789!"}, + {"Hello {who}! {what}?", "who=$location", "Hello World! {what}?"}, + {"Hello {who}! {what}?", "who=$location,what=Yeah", "Hello World! Yeah?"}, + {"Hello {who}! {what}?", "who=$location,what=$processid", "Hello World! 123456789?"}, + {"Hello {who}!", "who=$processidxxx", "Hello 123456789xxx!"}, + } + + vars := map[string]string{ + "processid": "123456789", + "location": "World", + } + + r := New().(*replacer) + + for _, e := range samples { + replaced := r.compileTemplate(e[0], e[1], vars, nil) + require.Equal(t, e[2], replaced, e[0]) + } +} + +func TestReplaceGlob(t *testing.T) { + r := New() + r.RegisterTemplate("foo:bar", "Hello foobar", nil) + r.RegisterTemplate("foo:baz", "Hello foobaz", nil) + + replaced := r.Replace("{foo:baz}, {foo:bar}", "foo:*", "", nil, nil, "") + require.Equal(t, "Hello foobaz, Hello foobar", replaced) +} diff --git a/restream/restream.go b/restream/restream.go index 4c5f0b28..66f71b1b 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -290,7 +290,7 @@ func (r *restream) load() error { } // Replace all placeholders in the config - t.config.ResolvePlaceholders(r.replace) + resolvePlaceholders(t.config, r.replace) tasks[id] = t } @@ -463,7 +463,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { logger: r.logger.WithField("id", process.ID), } - t.config.ResolvePlaceholders(r.replace) + resolvePlaceholders(t.config, r.replace) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -1089,7 +1089,7 @@ func (r *restream) reloadProcess(id string) error { t.config = t.process.Config.Clone() - t.config.ResolvePlaceholders(r.replace) + resolvePlaceholders(t.config, r.replace) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -1437,3 +1437,94 @@ func (r *restream) GetMetadata(key string) (interface{}, error) { return data, nil } + +// resolvePlaceholders replaces all placeholders in the config. The config +// will be modified in place. +func resolvePlaceholders(config *app.Config, r replace.Replacer) { + vars := map[string]string{ + "processid": config.ID, + "reference": config.Reference, + } + + for i, option := range config.Options { + // Replace any known placeholders + option = r.Replace(option, "diskfs", "", vars, config, "global") + option = r.Replace(option, "fs:*", "", vars, config, "global") + + config.Options[i] = option + } + + // Resolving the given inputs + for i, input := range config.Input { + vars["inputid"] = input.ID + + // Replace any known placeholders + input.ID = r.Replace(input.ID, "processid", config.ID, nil, nil, "input") + input.ID = r.Replace(input.ID, "reference", config.Reference, nil, nil, "input") + input.Address = r.Replace(input.Address, "inputid", input.ID, nil, nil, "input") + input.Address = r.Replace(input.Address, "processid", config.ID, nil, nil, "input") + input.Address = r.Replace(input.Address, "reference", config.Reference, nil, nil, "input") + input.Address = r.Replace(input.Address, "diskfs", "", vars, config, "input") + input.Address = r.Replace(input.Address, "memfs", "", vars, config, "input") + input.Address = r.Replace(input.Address, "fs:*", "", vars, config, "input") + input.Address = r.Replace(input.Address, "rtmp", "", vars, config, "input") + input.Address = r.Replace(input.Address, "srt", "", vars, config, "input") + + for j, option := range input.Options { + // Replace any known placeholders + option = r.Replace(option, "inputid", input.ID, nil, nil, "input") + option = r.Replace(option, "processid", config.ID, nil, nil, "input") + option = r.Replace(option, "reference", config.Reference, nil, nil, "input") + option = r.Replace(option, "diskfs", "", vars, config, "input") + option = r.Replace(option, "memfs", "", vars, config, "input") + option = r.Replace(option, "fs:*", "", vars, config, "input") + + input.Options[j] = option + } + + delete(vars, "inputid") + + config.Input[i] = input + } + + // Resolving the given outputs + for i, output := range config.Output { + vars["outputid"] = output.ID + + // Replace any known placeholders + output.ID = r.Replace(output.ID, "processid", config.ID, nil, nil, "output") + output.Address = r.Replace(output.Address, "outputid", output.ID, nil, nil, "output") + output.Address = r.Replace(output.Address, "processid", config.ID, nil, nil, "output") + output.Address = r.Replace(output.Address, "reference", config.Reference, nil, nil, "output") + output.Address = r.Replace(output.Address, "diskfs", "", vars, config, "output") + output.Address = r.Replace(output.Address, "memfs", "", vars, config, "output") + output.Address = r.Replace(output.Address, "fs:*", "", vars, config, "output") + output.Address = r.Replace(output.Address, "rtmp", "", vars, config, "output") + output.Address = r.Replace(output.Address, "srt", "", vars, config, "output") + + for j, option := range output.Options { + // Replace any known placeholders + option = r.Replace(option, "outputid", output.ID, nil, nil, "output") + option = r.Replace(option, "processid", config.ID, nil, nil, "output") + option = r.Replace(option, "reference", config.Reference, nil, nil, "output") + option = r.Replace(option, "diskfs", "", vars, config, "output") + option = r.Replace(option, "memfs", "", vars, config, "output") + option = r.Replace(option, "fs:*", "", vars, config, "output") + + output.Options[j] = option + } + + for j, cleanup := range output.Cleanup { + // Replace any known placeholders + cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID, nil, nil, "output") + cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID, nil, nil, "output") + cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference, nil, nil, "output") + + output.Cleanup[j] = cleanup + } + + delete(vars, "outputid") + + config.Output[i] = output + } +}