diff --git a/CHANGELOG.md b/CHANGELOG.md index d27df3e9..44ea8c81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ #### Core v16.8.0 > ? +- Add optional escape character to process placeholder - Fix output address validation for tee outputs - Fix updating process config - Add experimental SRT connection stats and logs API diff --git a/README.md b/README.md index 295fcaa4..068c876e 100644 --- a/README.md +++ b/README.md @@ -647,8 +647,10 @@ Currently supported placeholders are: | `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` | | `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` | -Before replacing the placeholder in the process, all references will be resolved, i.e., you can put the placeholder also in the params for an -option. +Before replacing the placeholders in the process config, all references (see below) will be resolved. + +If the value that gets filled in on the place of the placeholder needs escaping, you can define the character to be escaped in the placeholder by adding it to the placeholder name and prefix it with a `^`. +E.g. escape all `:` in the value (`http://example.com:8080`) for `{memfs}` placeholder, write `{memfs^:}`. It will then be replaced by `http\://example.com\:8080`. The escape character is always `\`. ### References diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index be5741b8..ad73b5c4 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -188,10 +188,10 @@ func (h *RestreamHandler) Update(c echo.Context) error { if err := h.restream.UpdateProcess(id, config); err != nil { if err == restream.ErrUnknownProcess { - return api.Err(http.StatusNotFound, "Process not found: %s", id) + return api.Err(http.StatusNotFound, "Process not found", "%s", id) } - return api.Err(http.StatusBadRequest, "Process can't be updated: %s", err) + return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) } p, _ := h.getProcess(config.ID, "config") diff --git a/restream/app/process.go b/restream/app/process.go index 446e2fd5..a84da11c 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -1,6 +1,11 @@ package app -import "github.com/datarhei/core/v16/process" +import ( + "regexp" + "strings" + + "github.com/datarhei/core/v16/process" +) type ConfigIOCleanup struct { Pattern string `json:"pattern"` @@ -75,6 +80,120 @@ func (config *Config) Clone() *Config { return clone } +func replace(what, placeholder, value string) string { + re, err := regexp.Compile(`{` + regexp.QuoteMeta(placeholder) + `(\^(.))?}`) + if err != nil { + return what + } + + innerRe := re.Copy() + what = re.ReplaceAllStringFunc(what, func(match string) string { + matches := innerRe.FindStringSubmatch(match) + var v string + + if matches[2] != "" { + v = strings.ReplaceAll(value, matches[2], `\`+matches[2]) + } else { + v = value + } + + return strings.Replace(match, match, v, 1) + }) + + return what +} + +// ReplacePlaceholders replaces all placeholders in the config. The config +// will be modified in place. +func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) { + for i, option := range config.Options { + // Replace any known placeholders + option = replace(option, "diskfs", basediskfs) + + config.Options[i] = option + } + + // Resolving the given inputs + for i, input := range config.Input { + // Replace any known placeholders + input.ID = replace(input.ID, "processid", config.ID) + input.ID = replace(input.ID, "reference", config.Reference) + input.Address = replace(input.Address, "inputid", input.ID) + input.Address = replace(input.Address, "processid", config.ID) + input.Address = replace(input.Address, "reference", config.Reference) + input.Address = replace(input.Address, "diskfs", basediskfs) + input.Address = replace(input.Address, "memfs", basememfs) + + for j, option := range input.Options { + // Replace any known placeholders + option = replace(option, "inputid", input.ID) + option = replace(option, "processid", config.ID) + option = replace(option, "reference", config.Reference) + option = replace(option, "diskfs", basediskfs) + option = replace(option, "memfs", basememfs) + + input.Options[j] = option + } + + config.Input[i] = input + } + + // Resolving the given outputs + for i, output := range config.Output { + // Replace any known placeholders + output.ID = replace(output.ID, "processid", config.ID) + output.Address = replace(output.Address, "outputid", output.ID) + output.Address = replace(output.Address, "processid", config.ID) + output.Address = replace(output.Address, "reference", config.Reference) + output.Address = replace(output.Address, "diskfs", basediskfs) + output.Address = replace(output.Address, "memfs", basememfs) + + for j, option := range output.Options { + // Replace any known placeholders + option = replace(option, "outputid", output.ID) + option = replace(option, "processid", config.ID) + option = replace(option, "reference", config.Reference) + option = replace(option, "diskfs", basediskfs) + option = replace(option, "memfs", basememfs) + + output.Options[j] = option + } + + for j, cleanup := range output.Cleanup { + // Replace any known placeholders + cleanup.Pattern = replace(cleanup.Pattern, "outputid", output.ID) + cleanup.Pattern = replace(cleanup.Pattern, "processid", config.ID) + cleanup.Pattern = replace(cleanup.Pattern, "reference", config.Reference) + + output.Cleanup[j] = cleanup + } + + config.Output[i] = output + } +} + +// CreateCommand created the FFmpeg command from this config. +func (config *Config) CreateCommand() []string { + var command []string + + // Copy global options + command = append(command, config.Options...) + + for _, input := range config.Input { + // Add the resolved input to the process command + command = append(command, input.Options...) + command = append(command, "-i", input.Address) + } + + for _, output := range config.Output { + // Add the resolved output to the process command + command = append(command, output.Options...) + command = append(command, output.Address) + } + + return command +} + type Process struct { ID string `json:"id"` Reference string `json:"reference"` diff --git a/restream/app/process_test.go b/restream/app/process_test.go new file mode 100644 index 00000000..96ef22a0 --- /dev/null +++ b/restream/app/process_test.go @@ -0,0 +1,45 @@ +package app + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReplace(t *testing.T) { + foobar := `;:.,-_$£!^` + + samples := [][2]string{ + {"{foobar}", foobar}, + {"{foobar^:}", `;\:.,-_$£!^`}, + {"{foobar^:}barfoo{foobar^:}", `;\:.,-_$£!^barfoo;\:.,-_$£!^`}, + {"{foobar^:.}", "{foobar^:.}"}, + {"{foobar^}", "{foobar^}"}, + {"{barfoo^:}", "{barfoo^:}"}, + {"{foobar^^}", `;:.,-_$£!\^`}, + } + + for _, e := range samples { + replaced := replace(e[0], "foobar", foobar) + require.Equal(t, e[1], replaced) + } +} + +func TestCreateCommand(t *testing.T) { + config := &Config{ + Options: []string{"-global", "global"}, + Input: []ConfigIO{ + {Address: "inputAddress", Options: []string{"-input", "inputoption"}}, + }, + Output: []ConfigIO{ + {Address: "outputAddress", Options: []string{"-output", "oututoption"}}, + }, + } + + command := config.CreateCommand() + require.Equal(t, []string{ + "-global", "global", + "-input", "inputoption", "-i", "inputAddress", + "-output", "oututoption", "outputAddress", + }, command) +} diff --git a/restream/restream.go b/restream/restream.go index ac144dd7..bcd89314 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -268,7 +268,7 @@ func (r *restream) load() error { } // Replace all placeholders in the config - r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base()) + t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base()) tasks[id] = t } @@ -304,7 +304,7 @@ func (r *restream) load() error { continue } - t.command = r.createCommand(t.config) + t.command = t.config.CreateCommand() t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ @@ -418,7 +418,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { logger: r.logger.WithField("id", process.ID), } - r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base()) + t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base()) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -435,7 +435,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { return nil, err } - t.command = r.createCommand(t.config) + t.command = t.config.CreateCommand() t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference) ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{ @@ -545,94 +545,6 @@ func (r *restream) unsetPlayoutPorts(t *task) { t.playout = nil } -func (r *restream) resolvePlaceholders(config *app.Config, basediskfs, basememfs string) { - for i, option := range config.Options { - // Replace any known placeholders - option = strings.Replace(option, "{diskfs}", basediskfs, -1) - - config.Options[i] = option - } - - // Resolving the given inputs - for i, input := range config.Input { - // Replace any known placeholders - input.ID = strings.Replace(input.ID, "{processid}", config.ID, -1) - input.ID = strings.Replace(input.ID, "{reference}", config.Reference, -1) - input.Address = strings.Replace(input.Address, "{inputid}", input.ID, -1) - input.Address = strings.Replace(input.Address, "{processid}", config.ID, -1) - input.Address = strings.Replace(input.Address, "{reference}", config.Reference, -1) - input.Address = strings.Replace(input.Address, "{diskfs}", basediskfs, -1) - input.Address = strings.Replace(input.Address, "{memfs}", basememfs, -1) - - for j, option := range input.Options { - // Replace any known placeholders - option = strings.Replace(option, "{inputid}", input.ID, -1) - option = strings.Replace(option, "{processid}", config.ID, -1) - option = strings.Replace(option, "{reference}", config.Reference, -1) - option = strings.Replace(option, "{diskfs}", basediskfs, -1) - option = strings.Replace(option, "{memfs}", basememfs, -1) - - input.Options[j] = option - } - - config.Input[i] = input - } - - // Resolving the given outputs - for i, output := range config.Output { - // Replace any known placeholders - output.ID = strings.Replace(output.ID, "{processid}", config.ID, -1) - output.Address = strings.Replace(output.Address, "{outputid}", output.ID, -1) - output.Address = strings.Replace(output.Address, "{processid}", config.ID, -1) - output.Address = strings.Replace(output.Address, "{reference}", config.Reference, -1) - output.Address = strings.Replace(output.Address, "{diskfs}", basediskfs, -1) - output.Address = strings.Replace(output.Address, "{memfs}", basememfs, -1) - - for j, option := range output.Options { - // Replace any known placeholders - option = strings.Replace(option, "{outputid}", output.ID, -1) - option = strings.Replace(option, "{processid}", config.ID, -1) - option = strings.Replace(option, "{reference}", config.Reference, -1) - option = strings.Replace(option, "{diskfs}", basediskfs, -1) - option = strings.Replace(option, "{memfs}", basememfs, -1) - - output.Options[j] = option - } - - for j, cleanup := range output.Cleanup { - // Replace any known placeholders - cleanup.Pattern = strings.Replace(cleanup.Pattern, "{outputid}", output.ID, -1) - cleanup.Pattern = strings.Replace(cleanup.Pattern, "{processid}", config.ID, -1) - cleanup.Pattern = strings.Replace(cleanup.Pattern, "{reference}", config.Reference, -1) - - output.Cleanup[j] = cleanup - } - - config.Output[i] = output - } -} - -func (r *restream) createCommand(config *app.Config) []string { - var command []string - - // Copy global options - command = append(command, config.Options...) - - for _, input := range config.Input { - // Add the resolved input to the process command - command = append(command, input.Options...) - command = append(command, "-i", input.Address) - } - - for _, output := range config.Output { - // Add the resolved output to the process command - command = append(command, output.Options...) - command = append(command, output.Address) - } - - return command -} - func (r *restream) validateConfig(config *app.Config) (bool, error) { if len(config.Input) == 0 { return false, fmt.Errorf("at least one input must be defined for the process '%s'", config.ID) @@ -1071,7 +983,7 @@ func (r *restream) reloadProcess(id string) error { t.config = t.process.Config.Clone() - r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base()) + t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base()) err := r.resolveAddresses(r.tasks, t.config) if err != nil { @@ -1088,7 +1000,7 @@ func (r *restream) reloadProcess(id string) error { return err } - t.command = r.createCommand(t.config) + t.command = t.config.CreateCommand() order := "stop" if t.process.Order == "start" {