mirror of
https://github.com/datarhei/core.git
synced 2025-09-27 04:16:25 +08:00
Add optional escape character to process placeholder
If a value for a placeholder needs escaping, add the character to escape with an "^" to the name of the placeholder, e.g. {memfs^:}. This will escape all occurences of ":" in the value for {memfs} with a "\".
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
#### Core v16.8.0 > ?
|
||||
|
||||
- Add optional escape character to process placeholder
|
||||
- Fix output address validation for tee outputs
|
||||
- Fix updating process config
|
||||
- Add experimental SRT connection stats and logs API
|
||||
|
@@ -647,8 +647,10 @@ Currently supported placeholders are:
|
||||
| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` |
|
||||
| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` |
|
||||
|
||||
Before replacing the placeholder in the process, all references will be resolved, i.e., you can put the placeholder also in the params for an
|
||||
option.
|
||||
Before replacing the placeholders in the process config, all references (see below) will be resolved.
|
||||
|
||||
If the value that gets filled in on the place of the placeholder needs escaping, you can define the character to be escaped in the placeholder by adding it to the placeholder name and prefix it with a `^`.
|
||||
E.g. escape all `:` in the value (`http://example.com:8080`) for `{memfs}` placeholder, write `{memfs^:}`. It will then be replaced by `http\://example.com\:8080`. The escape character is always `\`.
|
||||
|
||||
### References
|
||||
|
||||
|
@@ -188,10 +188,10 @@ func (h *RestreamHandler) Update(c echo.Context) error {
|
||||
|
||||
if err := h.restream.UpdateProcess(id, config); err != nil {
|
||||
if err == restream.ErrUnknownProcess {
|
||||
return api.Err(http.StatusNotFound, "Process not found: %s", id)
|
||||
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
|
||||
}
|
||||
|
||||
return api.Err(http.StatusBadRequest, "Process can't be updated: %s", err)
|
||||
return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err)
|
||||
}
|
||||
|
||||
p, _ := h.getProcess(config.ID, "config")
|
||||
|
@@ -1,6 +1,11 @@
|
||||
package app
|
||||
|
||||
import "github.com/datarhei/core/v16/process"
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/datarhei/core/v16/process"
|
||||
)
|
||||
|
||||
type ConfigIOCleanup struct {
|
||||
Pattern string `json:"pattern"`
|
||||
@@ -75,6 +80,120 @@ func (config *Config) Clone() *Config {
|
||||
return clone
|
||||
}
|
||||
|
||||
func replace(what, placeholder, value string) string {
|
||||
re, err := regexp.Compile(`{` + regexp.QuoteMeta(placeholder) + `(\^(.))?}`)
|
||||
if err != nil {
|
||||
return what
|
||||
}
|
||||
|
||||
innerRe := re.Copy()
|
||||
what = re.ReplaceAllStringFunc(what, func(match string) string {
|
||||
matches := innerRe.FindStringSubmatch(match)
|
||||
var v string
|
||||
|
||||
if matches[2] != "" {
|
||||
v = strings.ReplaceAll(value, matches[2], `\`+matches[2])
|
||||
} else {
|
||||
v = value
|
||||
}
|
||||
|
||||
return strings.Replace(match, match, v, 1)
|
||||
})
|
||||
|
||||
return what
|
||||
}
|
||||
|
||||
// ReplacePlaceholders replaces all placeholders in the config. The config
|
||||
// will be modified in place.
|
||||
func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) {
|
||||
for i, option := range config.Options {
|
||||
// Replace any known placeholders
|
||||
option = replace(option, "diskfs", basediskfs)
|
||||
|
||||
config.Options[i] = option
|
||||
}
|
||||
|
||||
// Resolving the given inputs
|
||||
for i, input := range config.Input {
|
||||
// Replace any known placeholders
|
||||
input.ID = replace(input.ID, "processid", config.ID)
|
||||
input.ID = replace(input.ID, "reference", config.Reference)
|
||||
input.Address = replace(input.Address, "inputid", input.ID)
|
||||
input.Address = replace(input.Address, "processid", config.ID)
|
||||
input.Address = replace(input.Address, "reference", config.Reference)
|
||||
input.Address = replace(input.Address, "diskfs", basediskfs)
|
||||
input.Address = replace(input.Address, "memfs", basememfs)
|
||||
|
||||
for j, option := range input.Options {
|
||||
// Replace any known placeholders
|
||||
option = replace(option, "inputid", input.ID)
|
||||
option = replace(option, "processid", config.ID)
|
||||
option = replace(option, "reference", config.Reference)
|
||||
option = replace(option, "diskfs", basediskfs)
|
||||
option = replace(option, "memfs", basememfs)
|
||||
|
||||
input.Options[j] = option
|
||||
}
|
||||
|
||||
config.Input[i] = input
|
||||
}
|
||||
|
||||
// Resolving the given outputs
|
||||
for i, output := range config.Output {
|
||||
// Replace any known placeholders
|
||||
output.ID = replace(output.ID, "processid", config.ID)
|
||||
output.Address = replace(output.Address, "outputid", output.ID)
|
||||
output.Address = replace(output.Address, "processid", config.ID)
|
||||
output.Address = replace(output.Address, "reference", config.Reference)
|
||||
output.Address = replace(output.Address, "diskfs", basediskfs)
|
||||
output.Address = replace(output.Address, "memfs", basememfs)
|
||||
|
||||
for j, option := range output.Options {
|
||||
// Replace any known placeholders
|
||||
option = replace(option, "outputid", output.ID)
|
||||
option = replace(option, "processid", config.ID)
|
||||
option = replace(option, "reference", config.Reference)
|
||||
option = replace(option, "diskfs", basediskfs)
|
||||
option = replace(option, "memfs", basememfs)
|
||||
|
||||
output.Options[j] = option
|
||||
}
|
||||
|
||||
for j, cleanup := range output.Cleanup {
|
||||
// Replace any known placeholders
|
||||
cleanup.Pattern = replace(cleanup.Pattern, "outputid", output.ID)
|
||||
cleanup.Pattern = replace(cleanup.Pattern, "processid", config.ID)
|
||||
cleanup.Pattern = replace(cleanup.Pattern, "reference", config.Reference)
|
||||
|
||||
output.Cleanup[j] = cleanup
|
||||
}
|
||||
|
||||
config.Output[i] = output
|
||||
}
|
||||
}
|
||||
|
||||
// CreateCommand created the FFmpeg command from this config.
|
||||
func (config *Config) CreateCommand() []string {
|
||||
var command []string
|
||||
|
||||
// Copy global options
|
||||
command = append(command, config.Options...)
|
||||
|
||||
for _, input := range config.Input {
|
||||
// Add the resolved input to the process command
|
||||
command = append(command, input.Options...)
|
||||
command = append(command, "-i", input.Address)
|
||||
}
|
||||
|
||||
for _, output := range config.Output {
|
||||
// Add the resolved output to the process command
|
||||
command = append(command, output.Options...)
|
||||
command = append(command, output.Address)
|
||||
}
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
type Process struct {
|
||||
ID string `json:"id"`
|
||||
Reference string `json:"reference"`
|
||||
|
45
restream/app/process_test.go
Normal file
45
restream/app/process_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReplace(t *testing.T) {
|
||||
foobar := `;:.,-_$£!^`
|
||||
|
||||
samples := [][2]string{
|
||||
{"{foobar}", foobar},
|
||||
{"{foobar^:}", `;\:.,-_$£!^`},
|
||||
{"{foobar^:}barfoo{foobar^:}", `;\:.,-_$£!^barfoo;\:.,-_$£!^`},
|
||||
{"{foobar^:.}", "{foobar^:.}"},
|
||||
{"{foobar^}", "{foobar^}"},
|
||||
{"{barfoo^:}", "{barfoo^:}"},
|
||||
{"{foobar^^}", `;:.,-_$£!\^`},
|
||||
}
|
||||
|
||||
for _, e := range samples {
|
||||
replaced := replace(e[0], "foobar", foobar)
|
||||
require.Equal(t, e[1], replaced)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateCommand(t *testing.T) {
|
||||
config := &Config{
|
||||
Options: []string{"-global", "global"},
|
||||
Input: []ConfigIO{
|
||||
{Address: "inputAddress", Options: []string{"-input", "inputoption"}},
|
||||
},
|
||||
Output: []ConfigIO{
|
||||
{Address: "outputAddress", Options: []string{"-output", "oututoption"}},
|
||||
},
|
||||
}
|
||||
|
||||
command := config.CreateCommand()
|
||||
require.Equal(t, []string{
|
||||
"-global", "global",
|
||||
"-input", "inputoption", "-i", "inputAddress",
|
||||
"-output", "oututoption", "outputAddress",
|
||||
}, command)
|
||||
}
|
@@ -268,7 +268,7 @@ func (r *restream) load() error {
|
||||
}
|
||||
|
||||
// Replace all placeholders in the config
|
||||
r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base())
|
||||
t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base())
|
||||
|
||||
tasks[id] = t
|
||||
}
|
||||
@@ -304,7 +304,7 @@ func (r *restream) load() error {
|
||||
continue
|
||||
}
|
||||
|
||||
t.command = r.createCommand(t.config)
|
||||
t.command = t.config.CreateCommand()
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
|
||||
|
||||
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||
@@ -418,7 +418,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
|
||||
logger: r.logger.WithField("id", process.ID),
|
||||
}
|
||||
|
||||
r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base())
|
||||
t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base())
|
||||
|
||||
err := r.resolveAddresses(r.tasks, t.config)
|
||||
if err != nil {
|
||||
@@ -435,7 +435,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.command = r.createCommand(t.config)
|
||||
t.command = t.config.CreateCommand()
|
||||
t.parser = r.ffmpeg.NewProcessParser(t.logger, t.id, t.reference)
|
||||
|
||||
ffmpeg, err := r.ffmpeg.New(ffmpeg.ProcessConfig{
|
||||
@@ -545,94 +545,6 @@ func (r *restream) unsetPlayoutPorts(t *task) {
|
||||
t.playout = nil
|
||||
}
|
||||
|
||||
func (r *restream) resolvePlaceholders(config *app.Config, basediskfs, basememfs string) {
|
||||
for i, option := range config.Options {
|
||||
// Replace any known placeholders
|
||||
option = strings.Replace(option, "{diskfs}", basediskfs, -1)
|
||||
|
||||
config.Options[i] = option
|
||||
}
|
||||
|
||||
// Resolving the given inputs
|
||||
for i, input := range config.Input {
|
||||
// Replace any known placeholders
|
||||
input.ID = strings.Replace(input.ID, "{processid}", config.ID, -1)
|
||||
input.ID = strings.Replace(input.ID, "{reference}", config.Reference, -1)
|
||||
input.Address = strings.Replace(input.Address, "{inputid}", input.ID, -1)
|
||||
input.Address = strings.Replace(input.Address, "{processid}", config.ID, -1)
|
||||
input.Address = strings.Replace(input.Address, "{reference}", config.Reference, -1)
|
||||
input.Address = strings.Replace(input.Address, "{diskfs}", basediskfs, -1)
|
||||
input.Address = strings.Replace(input.Address, "{memfs}", basememfs, -1)
|
||||
|
||||
for j, option := range input.Options {
|
||||
// Replace any known placeholders
|
||||
option = strings.Replace(option, "{inputid}", input.ID, -1)
|
||||
option = strings.Replace(option, "{processid}", config.ID, -1)
|
||||
option = strings.Replace(option, "{reference}", config.Reference, -1)
|
||||
option = strings.Replace(option, "{diskfs}", basediskfs, -1)
|
||||
option = strings.Replace(option, "{memfs}", basememfs, -1)
|
||||
|
||||
input.Options[j] = option
|
||||
}
|
||||
|
||||
config.Input[i] = input
|
||||
}
|
||||
|
||||
// Resolving the given outputs
|
||||
for i, output := range config.Output {
|
||||
// Replace any known placeholders
|
||||
output.ID = strings.Replace(output.ID, "{processid}", config.ID, -1)
|
||||
output.Address = strings.Replace(output.Address, "{outputid}", output.ID, -1)
|
||||
output.Address = strings.Replace(output.Address, "{processid}", config.ID, -1)
|
||||
output.Address = strings.Replace(output.Address, "{reference}", config.Reference, -1)
|
||||
output.Address = strings.Replace(output.Address, "{diskfs}", basediskfs, -1)
|
||||
output.Address = strings.Replace(output.Address, "{memfs}", basememfs, -1)
|
||||
|
||||
for j, option := range output.Options {
|
||||
// Replace any known placeholders
|
||||
option = strings.Replace(option, "{outputid}", output.ID, -1)
|
||||
option = strings.Replace(option, "{processid}", config.ID, -1)
|
||||
option = strings.Replace(option, "{reference}", config.Reference, -1)
|
||||
option = strings.Replace(option, "{diskfs}", basediskfs, -1)
|
||||
option = strings.Replace(option, "{memfs}", basememfs, -1)
|
||||
|
||||
output.Options[j] = option
|
||||
}
|
||||
|
||||
for j, cleanup := range output.Cleanup {
|
||||
// Replace any known placeholders
|
||||
cleanup.Pattern = strings.Replace(cleanup.Pattern, "{outputid}", output.ID, -1)
|
||||
cleanup.Pattern = strings.Replace(cleanup.Pattern, "{processid}", config.ID, -1)
|
||||
cleanup.Pattern = strings.Replace(cleanup.Pattern, "{reference}", config.Reference, -1)
|
||||
|
||||
output.Cleanup[j] = cleanup
|
||||
}
|
||||
|
||||
config.Output[i] = output
|
||||
}
|
||||
}
|
||||
|
||||
func (r *restream) createCommand(config *app.Config) []string {
|
||||
var command []string
|
||||
|
||||
// Copy global options
|
||||
command = append(command, config.Options...)
|
||||
|
||||
for _, input := range config.Input {
|
||||
// Add the resolved input to the process command
|
||||
command = append(command, input.Options...)
|
||||
command = append(command, "-i", input.Address)
|
||||
}
|
||||
|
||||
for _, output := range config.Output {
|
||||
// Add the resolved output to the process command
|
||||
command = append(command, output.Options...)
|
||||
command = append(command, output.Address)
|
||||
}
|
||||
|
||||
return command
|
||||
}
|
||||
|
||||
func (r *restream) validateConfig(config *app.Config) (bool, error) {
|
||||
if len(config.Input) == 0 {
|
||||
return false, fmt.Errorf("at least one input must be defined for the process '%s'", config.ID)
|
||||
@@ -1071,7 +983,7 @@ func (r *restream) reloadProcess(id string) error {
|
||||
|
||||
t.config = t.process.Config.Clone()
|
||||
|
||||
r.resolvePlaceholders(t.config, r.fs.diskfs.Base(), r.fs.memfs.Base())
|
||||
t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base())
|
||||
|
||||
err := r.resolveAddresses(r.tasks, t.config)
|
||||
if err != nil {
|
||||
@@ -1088,7 +1000,7 @@ func (r *restream) reloadProcess(id string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
t.command = r.createCommand(t.config)
|
||||
t.command = t.config.CreateCommand()
|
||||
|
||||
order := "stop"
|
||||
if t.process.Order == "start" {
|
||||
|
Reference in New Issue
Block a user