mirror of
https://github.com/datarhei/core.git
synced 2025-10-05 16:07:07 +08:00
Add new placeholders and parameters for placeholder
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#### Core v16.8.0 > ?
|
#### Core v16.8.0 > ?
|
||||||
|
|
||||||
|
- Add new placeholders and parameters for placeholder
|
||||||
- Allow RTMP server if RTMPS server is enabled
|
- Allow RTMP server if RTMPS server is enabled
|
||||||
- Add optional escape character to process placeholder
|
- Add optional escape character to process placeholder
|
||||||
- Fix output address validation for tee outputs
|
- Fix output address validation for tee outputs
|
||||||
|
20
README.md
20
README.md
@@ -644,14 +644,16 @@ A command is defined as:
|
|||||||
|
|
||||||
Currently supported placeholders are:
|
Currently supported placeholders are:
|
||||||
|
|
||||||
| Placeholder | Description | Location |
|
| Placeholder | Description | Location |
|
||||||
| ------------- | --------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- |
|
| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------- |
|
||||||
| `{diskfs}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` |
|
| `{diskfs}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` |
|
||||||
| `{memfs}` | Will be replace by the base URL of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` |
|
| `{memfs}` | Will be replace by the base URL of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` |
|
||||||
| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` |
|
| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` |
|
||||||
| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` |
|
| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` |
|
||||||
| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` |
|
| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` |
|
||||||
| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` |
|
| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` |
|
||||||
|
| `{rtmp}` | Will be replaced by the internal address of the RTMP server. Requires parameter `name` (name of the stream). | `input.address`, `output.address` |
|
||||||
|
| `{srt}` | Will be replaced by the internal address of the SRT server. Requires parameter `name` (name of the stream) and `mode` (either `publish` or `request`). | `input.address`, `output.address` |
|
||||||
|
|
||||||
Before replacing the placeholders in the process config, all references (see below) will be resolved.
|
Before replacing the placeholders in the process config, all references (see below) will be resolved.
|
||||||
|
|
||||||
@@ -659,6 +661,8 @@ If the value that gets filled in on the place of the placeholder needs escaping,
|
|||||||
E.g. escape all `:` in the value (`http://example.com:8080`) for `{memfs}` placeholder, write `{memfs^:}`. It will then be replaced by `http\://example.com\:8080`. The escape character is always `\`. In
|
E.g. escape all `:` in the value (`http://example.com:8080`) for `{memfs}` placeholder, write `{memfs^:}`. It will then be replaced by `http\://example.com\:8080`. The escape character is always `\`. In
|
||||||
case there are `\` in the value, they will also get escaped. If the placeholder doesn't imply escaping, the value will be uses as-is.
|
case there are `\` in the value, they will also get escaped. If the placeholder doesn't imply escaping, the value will be uses as-is.
|
||||||
|
|
||||||
|
Add parameters to a placeholder by appending a comma separated list of key/values, e.g. `{placeholder,key1=value1,key2=value2}`. This can be combined with escaping.
|
||||||
|
|
||||||
### References
|
### References
|
||||||
|
|
||||||
The input address of a process may contain a reference to the output of another process. It has the form `#[processid]:output=[id]`.
|
The input address of a process may contain a reference to the output of another process. It has the form `#[processid]:output=[id]`.
|
||||||
|
@@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/datarhei/core/v16/net"
|
"github.com/datarhei/core/v16/net"
|
||||||
"github.com/datarhei/core/v16/prometheus"
|
"github.com/datarhei/core/v16/prometheus"
|
||||||
"github.com/datarhei/core/v16/restream"
|
"github.com/datarhei/core/v16/restream"
|
||||||
|
"github.com/datarhei/core/v16/restream/replace"
|
||||||
"github.com/datarhei/core/v16/restream/store"
|
"github.com/datarhei/core/v16/restream/store"
|
||||||
"github.com/datarhei/core/v16/rtmp"
|
"github.com/datarhei/core/v16/rtmp"
|
||||||
"github.com/datarhei/core/v16/service"
|
"github.com/datarhei/core/v16/service"
|
||||||
@@ -75,6 +76,7 @@ type api struct {
|
|||||||
sidecarserver *gohttp.Server
|
sidecarserver *gohttp.Server
|
||||||
httpjwt jwt.JWT
|
httpjwt jwt.JWT
|
||||||
update update.Checker
|
update update.Checker
|
||||||
|
replacer replace.Replacer
|
||||||
|
|
||||||
errorChan chan error
|
errorChan chan error
|
||||||
|
|
||||||
@@ -439,12 +441,50 @@ func (a *api) start() error {
|
|||||||
|
|
||||||
a.ffmpeg = ffmpeg
|
a.ffmpeg = ffmpeg
|
||||||
|
|
||||||
|
a.replacer = replace.New()
|
||||||
|
|
||||||
|
{
|
||||||
|
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base())
|
||||||
|
a.replacer.RegisterTemplate("memfs", a.memfs.Base())
|
||||||
|
|
||||||
|
if cfg.RTMP.Enable {
|
||||||
|
host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address)
|
||||||
|
if len(host) == 0 {
|
||||||
|
host = "localhost"
|
||||||
|
}
|
||||||
|
|
||||||
|
template := "rtmp://" + host + ":" + port + cfg.RTMP.App + "/{name}"
|
||||||
|
if len(cfg.RTMP.Token) != 0 {
|
||||||
|
template += "?token=" + cfg.RTMP.Token
|
||||||
|
}
|
||||||
|
|
||||||
|
a.replacer.RegisterTemplate("rtmp", template)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.SRT.Enable {
|
||||||
|
host, port, _ = gonet.SplitHostPort(cfg.SRT.Address)
|
||||||
|
if len(host) == 0 {
|
||||||
|
host = "localhost"
|
||||||
|
}
|
||||||
|
|
||||||
|
template := "srt://" + host + ":" + port + "?mode=caller&transtype=live&streamid=#!:m={mode},r={name}"
|
||||||
|
if len(cfg.SRT.Token) != 0 {
|
||||||
|
template += ",token=" + cfg.SRT.Token
|
||||||
|
}
|
||||||
|
if len(cfg.SRT.Passphrase) != 0 {
|
||||||
|
template += "&passphrase=" + cfg.SRT.Passphrase
|
||||||
|
}
|
||||||
|
a.replacer.RegisterTemplate("srt", template)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
restream, err := restream.New(restream.Config{
|
restream, err := restream.New(restream.Config{
|
||||||
ID: cfg.ID,
|
ID: cfg.ID,
|
||||||
Name: cfg.Name,
|
Name: cfg.Name,
|
||||||
Store: store,
|
Store: store,
|
||||||
DiskFS: a.diskfs,
|
DiskFS: a.diskfs,
|
||||||
MemFS: a.memfs,
|
MemFS: a.memfs,
|
||||||
|
Replace: a.replacer,
|
||||||
FFmpeg: a.ffmpeg,
|
FFmpeg: a.ffmpeg,
|
||||||
MaxProcesses: cfg.FFmpeg.MaxProcesses,
|
MaxProcesses: cfg.FFmpeg.MaxProcesses,
|
||||||
Logger: a.log.logger.core.WithComponent("Process"),
|
Logger: a.log.logger.core.WithComponent("Process"),
|
||||||
@@ -940,6 +980,8 @@ func (a *api) start() error {
|
|||||||
} else {
|
} else {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sendError(err)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -351,7 +351,7 @@ func (d *Config) init() {
|
|||||||
d.val(newBoolValue(&d.RTMP.EnableTLS, false), "rtmp.enable_tls", "CORE_RTMP_ENABLE_TLS", nil, "Enable RTMPS server instead of RTMP", false, false)
|
d.val(newBoolValue(&d.RTMP.EnableTLS, false), "rtmp.enable_tls", "CORE_RTMP_ENABLE_TLS", nil, "Enable RTMPS server instead of RTMP", false, false)
|
||||||
d.val(newAddressValue(&d.RTMP.Address, ":1935"), "rtmp.address", "CORE_RTMP_ADDRESS", nil, "RTMP server listen address", false, false)
|
d.val(newAddressValue(&d.RTMP.Address, ":1935"), "rtmp.address", "CORE_RTMP_ADDRESS", nil, "RTMP server listen address", false, false)
|
||||||
d.val(newAddressValue(&d.RTMP.AddressTLS, ":1936"), "rtmp.address_tls", "CORE_RTMP_ADDRESS_TLS", nil, "RTMPS server listen address", false, false)
|
d.val(newAddressValue(&d.RTMP.AddressTLS, ":1936"), "rtmp.address_tls", "CORE_RTMP_ADDRESS_TLS", nil, "RTMPS server listen address", false, false)
|
||||||
d.val(newStringValue(&d.RTMP.App, "/"), "rtmp.app", "CORE_RTMP_APP", nil, "RTMP app for publishing", false, false)
|
d.val(newAbsolutePathValue(&d.RTMP.App, "/"), "rtmp.app", "CORE_RTMP_APP", nil, "RTMP app for publishing", false, false)
|
||||||
d.val(newStringValue(&d.RTMP.Token, ""), "rtmp.token", "CORE_RTMP_TOKEN", nil, "RTMP token for publishing and playing", false, true)
|
d.val(newStringValue(&d.RTMP.Token, ""), "rtmp.token", "CORE_RTMP_TOKEN", nil, "RTMP token for publishing and playing", false, true)
|
||||||
|
|
||||||
// SRT
|
// SRT
|
||||||
|
@@ -8,6 +8,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -772,3 +773,35 @@ func (u *urlValue) Validate() error {
|
|||||||
func (u *urlValue) IsEmpty() bool {
|
func (u *urlValue) IsEmpty() bool {
|
||||||
return len(string(*u)) == 0
|
return len(string(*u)) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// absolute path
|
||||||
|
|
||||||
|
type absolutePathValue string
|
||||||
|
|
||||||
|
func newAbsolutePathValue(p *string, val string) *absolutePathValue {
|
||||||
|
*p = filepath.Clean(val)
|
||||||
|
return (*absolutePathValue)(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *absolutePathValue) Set(val string) error {
|
||||||
|
*s = absolutePathValue(filepath.Clean(val))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *absolutePathValue) String() string {
|
||||||
|
return string(*s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *absolutePathValue) Validate() error {
|
||||||
|
path := string(*s)
|
||||||
|
|
||||||
|
if !filepath.IsAbs(path) {
|
||||||
|
return fmt.Errorf("%s is not an absolute path", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *absolutePathValue) IsEmpty() bool {
|
||||||
|
return len(string(*s)) == 0
|
||||||
|
}
|
||||||
|
@@ -1,10 +1,8 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/datarhei/core/v16/process"
|
"github.com/datarhei/core/v16/process"
|
||||||
|
"github.com/datarhei/core/v16/restream/replace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ConfigIOCleanup struct {
|
type ConfigIOCleanup struct {
|
||||||
@@ -80,35 +78,12 @@ func (config *Config) Clone() *Config {
|
|||||||
return clone
|
return clone
|
||||||
}
|
}
|
||||||
|
|
||||||
func replace(what, placeholder, value string) string {
|
|
||||||
re, err := regexp.Compile(`{` + regexp.QuoteMeta(placeholder) + `(\^(.))?}`)
|
|
||||||
if err != nil {
|
|
||||||
return what
|
|
||||||
}
|
|
||||||
|
|
||||||
what = re.ReplaceAllStringFunc(what, func(match string) string {
|
|
||||||
matches := re.FindStringSubmatch(match)
|
|
||||||
v := value
|
|
||||||
|
|
||||||
if matches[2] != "" {
|
|
||||||
if matches[2] != `\` {
|
|
||||||
v = strings.ReplaceAll(v, `\`, `\\`)
|
|
||||||
}
|
|
||||||
v = strings.ReplaceAll(v, matches[2], `\\`+matches[2])
|
|
||||||
}
|
|
||||||
|
|
||||||
return strings.Replace(match, match, v, 1)
|
|
||||||
})
|
|
||||||
|
|
||||||
return what
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReplacePlaceholders replaces all placeholders in the config. The config
|
// ReplacePlaceholders replaces all placeholders in the config. The config
|
||||||
// will be modified in place.
|
// will be modified in place.
|
||||||
func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) {
|
func (config *Config) ResolvePlaceholders(r replace.Replacer) {
|
||||||
for i, option := range config.Options {
|
for i, option := range config.Options {
|
||||||
// Replace any known placeholders
|
// Replace any known placeholders
|
||||||
option = replace(option, "diskfs", basediskfs)
|
option = r.Replace(option, "diskfs", "")
|
||||||
|
|
||||||
config.Options[i] = option
|
config.Options[i] = option
|
||||||
}
|
}
|
||||||
@@ -116,21 +91,23 @@ func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) {
|
|||||||
// Resolving the given inputs
|
// Resolving the given inputs
|
||||||
for i, input := range config.Input {
|
for i, input := range config.Input {
|
||||||
// Replace any known placeholders
|
// Replace any known placeholders
|
||||||
input.ID = replace(input.ID, "processid", config.ID)
|
input.ID = r.Replace(input.ID, "processid", config.ID)
|
||||||
input.ID = replace(input.ID, "reference", config.Reference)
|
input.ID = r.Replace(input.ID, "reference", config.Reference)
|
||||||
input.Address = replace(input.Address, "inputid", input.ID)
|
input.Address = r.Replace(input.Address, "inputid", input.ID)
|
||||||
input.Address = replace(input.Address, "processid", config.ID)
|
input.Address = r.Replace(input.Address, "processid", config.ID)
|
||||||
input.Address = replace(input.Address, "reference", config.Reference)
|
input.Address = r.Replace(input.Address, "reference", config.Reference)
|
||||||
input.Address = replace(input.Address, "diskfs", basediskfs)
|
input.Address = r.Replace(input.Address, "diskfs", "")
|
||||||
input.Address = replace(input.Address, "memfs", basememfs)
|
input.Address = r.Replace(input.Address, "memfs", "")
|
||||||
|
input.Address = r.Replace(input.Address, "rtmp", "")
|
||||||
|
input.Address = r.Replace(input.Address, "srt", "")
|
||||||
|
|
||||||
for j, option := range input.Options {
|
for j, option := range input.Options {
|
||||||
// Replace any known placeholders
|
// Replace any known placeholders
|
||||||
option = replace(option, "inputid", input.ID)
|
option = r.Replace(option, "inputid", input.ID)
|
||||||
option = replace(option, "processid", config.ID)
|
option = r.Replace(option, "processid", config.ID)
|
||||||
option = replace(option, "reference", config.Reference)
|
option = r.Replace(option, "reference", config.Reference)
|
||||||
option = replace(option, "diskfs", basediskfs)
|
option = r.Replace(option, "diskfs", "")
|
||||||
option = replace(option, "memfs", basememfs)
|
option = r.Replace(option, "memfs", "")
|
||||||
|
|
||||||
input.Options[j] = option
|
input.Options[j] = option
|
||||||
}
|
}
|
||||||
@@ -141,29 +118,31 @@ func (config *Config) ResolvePlaceholders(basediskfs, basememfs string) {
|
|||||||
// Resolving the given outputs
|
// Resolving the given outputs
|
||||||
for i, output := range config.Output {
|
for i, output := range config.Output {
|
||||||
// Replace any known placeholders
|
// Replace any known placeholders
|
||||||
output.ID = replace(output.ID, "processid", config.ID)
|
output.ID = r.Replace(output.ID, "processid", config.ID)
|
||||||
output.Address = replace(output.Address, "outputid", output.ID)
|
output.Address = r.Replace(output.Address, "outputid", output.ID)
|
||||||
output.Address = replace(output.Address, "processid", config.ID)
|
output.Address = r.Replace(output.Address, "processid", config.ID)
|
||||||
output.Address = replace(output.Address, "reference", config.Reference)
|
output.Address = r.Replace(output.Address, "reference", config.Reference)
|
||||||
output.Address = replace(output.Address, "diskfs", basediskfs)
|
output.Address = r.Replace(output.Address, "diskfs", "")
|
||||||
output.Address = replace(output.Address, "memfs", basememfs)
|
output.Address = r.Replace(output.Address, "memfs", "")
|
||||||
|
output.Address = r.Replace(output.Address, "rtmp", "")
|
||||||
|
output.Address = r.Replace(output.Address, "srt", "")
|
||||||
|
|
||||||
for j, option := range output.Options {
|
for j, option := range output.Options {
|
||||||
// Replace any known placeholders
|
// Replace any known placeholders
|
||||||
option = replace(option, "outputid", output.ID)
|
option = r.Replace(option, "outputid", output.ID)
|
||||||
option = replace(option, "processid", config.ID)
|
option = r.Replace(option, "processid", config.ID)
|
||||||
option = replace(option, "reference", config.Reference)
|
option = r.Replace(option, "reference", config.Reference)
|
||||||
option = replace(option, "diskfs", basediskfs)
|
option = r.Replace(option, "diskfs", "")
|
||||||
option = replace(option, "memfs", basememfs)
|
option = r.Replace(option, "memfs", "")
|
||||||
|
|
||||||
output.Options[j] = option
|
output.Options[j] = option
|
||||||
}
|
}
|
||||||
|
|
||||||
for j, cleanup := range output.Cleanup {
|
for j, cleanup := range output.Cleanup {
|
||||||
// Replace any known placeholders
|
// Replace any known placeholders
|
||||||
cleanup.Pattern = replace(cleanup.Pattern, "outputid", output.ID)
|
cleanup.Pattern = r.Replace(cleanup.Pattern, "outputid", output.ID)
|
||||||
cleanup.Pattern = replace(cleanup.Pattern, "processid", config.ID)
|
cleanup.Pattern = r.Replace(cleanup.Pattern, "processid", config.ID)
|
||||||
cleanup.Pattern = replace(cleanup.Pattern, "reference", config.Reference)
|
cleanup.Pattern = r.Replace(cleanup.Pattern, "reference", config.Reference)
|
||||||
|
|
||||||
output.Cleanup[j] = cleanup
|
output.Cleanup[j] = cleanup
|
||||||
}
|
}
|
||||||
|
@@ -6,26 +6,6 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReplace(t *testing.T) {
|
|
||||||
foobar := `;:.,-_$\£!^`
|
|
||||||
|
|
||||||
samples := [][2]string{
|
|
||||||
{"{foobar}", foobar},
|
|
||||||
{"{foobar^:}", `;\:.,-_$\\£!^`},
|
|
||||||
{"{foobar^:}barfoo{foobar^:}", `;\:.,-_$\\£!^barfoo;\:.,-_$\\£!^`},
|
|
||||||
{"{foobar^:.}", "{foobar^:.}"},
|
|
||||||
{"{foobar^}", "{foobar^}"},
|
|
||||||
{"{barfoo^:}", "{barfoo^:}"},
|
|
||||||
{"{foobar^^}", `;:.,-_$\\£!\^`},
|
|
||||||
{`{foobar^\}`, `;:.,-_$\\£!^`},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, e := range samples {
|
|
||||||
replaced := replace(e[0], "foobar", foobar)
|
|
||||||
require.Equal(t, e[1], replaced, e[0])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCreateCommand(t *testing.T) {
|
func TestCreateCommand(t *testing.T) {
|
||||||
config := &Config{
|
config := &Config{
|
||||||
Options: []string{"-global", "global"},
|
Options: []string{"-global", "global"},
|
||||||
|
138
restream/replace/replace.go
Normal file
138
restream/replace/replace.go
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
package replace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Replacer interface {
|
||||||
|
// RegisterTemplate registers a template for a specific placeholder. Template
|
||||||
|
// may contain placeholders as well of the form {name}. They will be replaced
|
||||||
|
// by the parameters of the placeholder (see Replace).
|
||||||
|
RegisterTemplate(placeholder, template string)
|
||||||
|
|
||||||
|
// RegisterTemplateFunc does the same as RegisterTemplate, but the template
|
||||||
|
// is returned by the template function.
|
||||||
|
RegisterTemplateFunc(placeholder string, template func() string)
|
||||||
|
|
||||||
|
// Replace replaces all occurences of placeholder in str with value. The placeholder is of the
|
||||||
|
// form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^
|
||||||
|
// and the character to escape to the placeholder name, e.g. {placeholder^:} to escape ":".
|
||||||
|
// A placeholder may also have parameters of the form {placeholder,key1=value1,key2=value2}.
|
||||||
|
// If the value has placeholders itself (see RegisterTemplate), they will be replaced by
|
||||||
|
// the value of the corresponding key in the parameters.
|
||||||
|
// If the value is an empty string, the registered templates will be searched for that
|
||||||
|
// placeholder. If no template is found, the placeholder will be replaced by the empty string.
|
||||||
|
// A placeholder name may consist on of the letters a-z.
|
||||||
|
Replace(str, placeholder, value string) string
|
||||||
|
}
|
||||||
|
|
||||||
|
type replacer struct {
|
||||||
|
templates map[string]func() string
|
||||||
|
|
||||||
|
re *regexp.Regexp
|
||||||
|
templateRe *regexp.Regexp
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a Replacer
|
||||||
|
func New() Replacer {
|
||||||
|
r := &replacer{
|
||||||
|
templates: make(map[string]func() string),
|
||||||
|
re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`),
|
||||||
|
templateRe: regexp.MustCompile(`{([a-z]+)}`),
|
||||||
|
}
|
||||||
|
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *replacer) RegisterTemplate(placeholder, template string) {
|
||||||
|
r.templates[placeholder] = func() string { return template }
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *replacer) RegisterTemplateFunc(placeholder string, template func() string) {
|
||||||
|
r.templates[placeholder] = template
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *replacer) Replace(str, placeholder, value string) string {
|
||||||
|
str = r.re.ReplaceAllStringFunc(str, func(match string) string {
|
||||||
|
matches := r.re.FindStringSubmatch(match)
|
||||||
|
if matches[1] != placeholder {
|
||||||
|
return match
|
||||||
|
}
|
||||||
|
|
||||||
|
// We need a copy from the value
|
||||||
|
v := value
|
||||||
|
|
||||||
|
// Check for a registered template
|
||||||
|
if len(v) == 0 {
|
||||||
|
tmplFunc, ok := r.templates[placeholder]
|
||||||
|
if ok {
|
||||||
|
v = tmplFunc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
v = r.compileTemplate(v, matches[3])
|
||||||
|
|
||||||
|
if len(matches[2]) != 0 {
|
||||||
|
// If there's a character to escape, we also have to escape the
|
||||||
|
// escape character, but only if it is different from the character
|
||||||
|
// to escape.
|
||||||
|
if matches[2] != "\\" {
|
||||||
|
v = strings.ReplaceAll(v, "\\", "\\\\\\")
|
||||||
|
}
|
||||||
|
v = strings.ReplaceAll(v, matches[2], "\\\\"+matches[2])
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Replace(match, match, v, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
return str
|
||||||
|
}
|
||||||
|
|
||||||
|
// compileTemplate fills in the placeholder in the template with the values from the params
|
||||||
|
// string. The placeholders in the template are delimited by {} and their name may only
|
||||||
|
// contain the letters a-z. The params string is a comma-separated string of key=value pairs.
|
||||||
|
// Example: the template is "Hello {who}!", the params string is "who=World". The key is the
|
||||||
|
// placeholder name and will be replaced with the value. The resulting string is "Hello World!".
|
||||||
|
// If a placeholder name is not present in the params string, it will not be replaced. The key
|
||||||
|
// and values can be escaped as in net/url.QueryEscape.
|
||||||
|
func (r *replacer) compileTemplate(str, params string) string {
|
||||||
|
if len(params) == 0 {
|
||||||
|
return str
|
||||||
|
}
|
||||||
|
|
||||||
|
p := make(map[string]string)
|
||||||
|
|
||||||
|
// taken from net/url.ParseQuery
|
||||||
|
for params != "" {
|
||||||
|
var key string
|
||||||
|
key, params, _ = strings.Cut(params, ",")
|
||||||
|
if key == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
key, value, _ := strings.Cut(key, "=")
|
||||||
|
key, err := url.QueryUnescape(key)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
value, err = url.QueryUnescape(value)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
str = r.templateRe.ReplaceAllStringFunc(str, func(match string) string {
|
||||||
|
matches := r.templateRe.FindStringSubmatch(match)
|
||||||
|
|
||||||
|
value, ok := p[matches[1]]
|
||||||
|
if !ok {
|
||||||
|
return match
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Replace(match, matches[0], value, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
return str
|
||||||
|
}
|
64
restream/replace/replace_test.go
Normal file
64
restream/replace/replace_test.go
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
package replace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReplace(t *testing.T) {
|
||||||
|
foobar := ";:.,-_$\\£!^"
|
||||||
|
|
||||||
|
samples := [][2]string{
|
||||||
|
{"{foobar}", foobar},
|
||||||
|
{"{foobar^:}", ";\\\\:.,-_$\\\\\\£!^"},
|
||||||
|
{"{foobar^:}barfoo{foobar^:}", ";\\\\:.,-_$\\\\\\£!^barfoo;\\\\:.,-_$\\\\\\£!^"},
|
||||||
|
{"{foobar^:.}", "{foobar^:.}"},
|
||||||
|
{"{foobar^}", "{foobar^}"},
|
||||||
|
{"{barfoo^:}", "{barfoo^:}"},
|
||||||
|
{"{foobar^^}", ";:.,-_$\\\\\\£!\\\\^"},
|
||||||
|
{`{foobar^\}`, ";:.,-_$\\\\\\£!^"},
|
||||||
|
{`{barfoo}`, "{barfoo}"},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := New()
|
||||||
|
|
||||||
|
for _, e := range samples {
|
||||||
|
replaced := r.Replace(e[0], "foobar", foobar)
|
||||||
|
require.Equal(t, e[1], replaced, e[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
replaced := r.Replace("{foobar}", "foobar", "")
|
||||||
|
require.Equal(t, "", replaced)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplaceTemplate(t *testing.T) {
|
||||||
|
r := New()
|
||||||
|
r.RegisterTemplate("foobar", "Hello {who}! {what}?")
|
||||||
|
|
||||||
|
replaced := r.Replace("{foobar,who=World}", "foobar", "")
|
||||||
|
require.Equal(t, "Hello World! {what}?", replaced)
|
||||||
|
|
||||||
|
replaced = r.Replace("{foobar,who=World,what=E%3dmc^2}", "foobar", "")
|
||||||
|
require.Equal(t, "Hello World! E=mc^2?", replaced)
|
||||||
|
|
||||||
|
replaced = r.Replace("{foobar^:,who=World,what=E%3dmc:2}", "foobar", "")
|
||||||
|
require.Equal(t, "Hello World! E=mc\\\\:2?", replaced)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReplaceCompileTemplate(t *testing.T) {
|
||||||
|
samples := [][3]string{
|
||||||
|
{"Hello {who}!", "who=World", "Hello World!"},
|
||||||
|
{"Hello {who}! {what}?", "who=World", "Hello World! {what}?"},
|
||||||
|
{"Hello {who}! {what}?", "who=World,what=Yeah", "Hello World! Yeah?"},
|
||||||
|
{"Hello {who}! {what}?", "who=World,what=", "Hello World! ?"},
|
||||||
|
{"Hello {who}!", "who=E%3dmc^2", "Hello E=mc^2!"},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := New().(*replacer)
|
||||||
|
|
||||||
|
for _, e := range samples {
|
||||||
|
replaced := r.compileTemplate(e[0], e[1])
|
||||||
|
require.Equal(t, e[2], replaced, e[0])
|
||||||
|
}
|
||||||
|
}
|
@@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/datarhei/core/v16/process"
|
"github.com/datarhei/core/v16/process"
|
||||||
"github.com/datarhei/core/v16/restream/app"
|
"github.com/datarhei/core/v16/restream/app"
|
||||||
rfs "github.com/datarhei/core/v16/restream/fs"
|
rfs "github.com/datarhei/core/v16/restream/fs"
|
||||||
|
"github.com/datarhei/core/v16/restream/replace"
|
||||||
"github.com/datarhei/core/v16/restream/store"
|
"github.com/datarhei/core/v16/restream/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -59,6 +60,7 @@ type Config struct {
|
|||||||
Store store.Store
|
Store store.Store
|
||||||
DiskFS fs.Filesystem
|
DiskFS fs.Filesystem
|
||||||
MemFS fs.Filesystem
|
MemFS fs.Filesystem
|
||||||
|
Replace replace.Replacer
|
||||||
FFmpeg ffmpeg.FFmpeg
|
FFmpeg ffmpeg.FFmpeg
|
||||||
MaxProcesses int64
|
MaxProcesses int64
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
@@ -92,6 +94,7 @@ type restream struct {
|
|||||||
memfs rfs.Filesystem
|
memfs rfs.Filesystem
|
||||||
stopObserver context.CancelFunc
|
stopObserver context.CancelFunc
|
||||||
}
|
}
|
||||||
|
replace replace.Replacer
|
||||||
tasks map[string]*task
|
tasks map[string]*task
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
metadata map[string]interface{}
|
metadata map[string]interface{}
|
||||||
@@ -109,6 +112,7 @@ func New(config Config) (Restreamer, error) {
|
|||||||
name: config.Name,
|
name: config.Name,
|
||||||
createdAt: time.Now(),
|
createdAt: time.Now(),
|
||||||
store: config.Store,
|
store: config.Store,
|
||||||
|
replace: config.Replace,
|
||||||
logger: config.Logger,
|
logger: config.Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -142,6 +146,10 @@ func New(config Config) (Restreamer, error) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.replace == nil {
|
||||||
|
r.replace = replace.New()
|
||||||
|
}
|
||||||
|
|
||||||
r.ffmpeg = config.FFmpeg
|
r.ffmpeg = config.FFmpeg
|
||||||
if r.ffmpeg == nil {
|
if r.ffmpeg == nil {
|
||||||
return nil, fmt.Errorf("ffmpeg must be provided")
|
return nil, fmt.Errorf("ffmpeg must be provided")
|
||||||
@@ -268,7 +276,7 @@ func (r *restream) load() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Replace all placeholders in the config
|
// Replace all placeholders in the config
|
||||||
t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base())
|
t.config.ResolvePlaceholders(r.replace)
|
||||||
|
|
||||||
tasks[id] = t
|
tasks[id] = t
|
||||||
}
|
}
|
||||||
@@ -418,7 +426,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
|
|||||||
logger: r.logger.WithField("id", process.ID),
|
logger: r.logger.WithField("id", process.ID),
|
||||||
}
|
}
|
||||||
|
|
||||||
t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base())
|
t.config.ResolvePlaceholders(r.replace)
|
||||||
|
|
||||||
err := r.resolveAddresses(r.tasks, t.config)
|
err := r.resolveAddresses(r.tasks, t.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -983,7 +991,7 @@ func (r *restream) reloadProcess(id string) error {
|
|||||||
|
|
||||||
t.config = t.process.Config.Clone()
|
t.config = t.process.Config.Clone()
|
||||||
|
|
||||||
t.config.ResolvePlaceholders(r.fs.diskfs.Base(), r.fs.memfs.Base())
|
t.config.ResolvePlaceholders(r.replace)
|
||||||
|
|
||||||
err := r.resolveAddresses(r.tasks, t.config)
|
err := r.resolveAddresses(r.tasks, t.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Reference in New Issue
Block a user