Add tests

This commit is contained in:
Ingo Oppermann
2023-01-23 11:42:17 +01:00
parent 0147651de6
commit 505fbff03f
3 changed files with 305 additions and 33 deletions

View File

@@ -69,7 +69,7 @@ func (r *replacer) RegisterTemplateFunc(placeholder string, templateFn TemplateF
}
}
func (r *replacer) Replace(str, placeholder, value string, vars map[string]string, config *app.Config, kind string) string {
func (r *replacer) Replace(str, placeholder, value string, vars map[string]string, config *app.Config, section string) string {
str = r.re.ReplaceAllStringFunc(str, func(match string) string {
matches := r.re.FindStringSubmatch(match)
@@ -93,7 +93,7 @@ func (r *replacer) Replace(str, placeholder, value string, vars map[string]strin
}
}
v = tmpl.fn(config, kind)
v = tmpl.fn(config, section)
v = r.compileTemplate(v, matches[3], vars, tmpl.defaults)
if len(matches[2]) != 0 {

View File

@@ -1456,11 +1456,12 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
// Resolving the given inputs
for i, input := range config.Input {
vars["inputid"] = input.ID
// Replace any known placeholders
input.ID = r.Replace(input.ID, "processid", config.ID, nil, nil, "input")
input.ID = r.Replace(input.ID, "reference", config.Reference, nil, nil, "input")
vars["inputid"] = input.ID
input.Address = r.Replace(input.Address, "inputid", input.ID, nil, nil, "input")
input.Address = r.Replace(input.Address, "processid", config.ID, nil, nil, "input")
input.Address = r.Replace(input.Address, "reference", config.Reference, nil, nil, "input")
@@ -1489,10 +1490,12 @@ func resolvePlaceholders(config *app.Config, r replace.Replacer) {
// Resolving the given outputs
for i, output := range config.Output {
vars["outputid"] = output.ID
// Replace any known placeholders
output.ID = r.Replace(output.ID, "processid", config.ID, nil, nil, "output")
output.ID = r.Replace(output.ID, "reference", config.Reference, nil, nil, "output")
vars["outputid"] = output.ID
output.Address = r.Replace(output.Address, "outputid", output.ID, nil, nil, "output")
output.Address = r.Replace(output.Address, "processid", config.ID, nil, nil, "output")
output.Address = r.Replace(output.Address, "reference", config.Reference, nil, nil, "output")

View File

@@ -9,11 +9,12 @@ import (
"github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/replace"
"github.com/stretchr/testify/require"
)
func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator) (Restreamer, error) {
func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator, replacer replace.Replacer) (Restreamer, error) {
binary, err := testhelper.BuildBinary("ffmpeg", "../internal/testhelper")
if err != nil {
return nil, fmt.Errorf("failed to build helper program: %w", err)
@@ -30,7 +31,8 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
}
rs, err := New(Config{
FFmpeg: ffmpeg,
FFmpeg: ffmpeg,
Replace: replacer,
})
if err != nil {
return nil, err
@@ -77,7 +79,7 @@ func getDummyProcess() *app.Config {
}
func TestAddProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -97,7 +99,7 @@ func TestAddProcess(t *testing.T) {
}
func TestAutostartProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -112,7 +114,7 @@ func TestAutostartProcess(t *testing.T) {
}
func TestAddInvalidProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
// Invalid process ID
@@ -180,7 +182,7 @@ func TestAddInvalidProcess(t *testing.T) {
}
func TestRemoveProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -195,24 +197,98 @@ func TestRemoveProcess(t *testing.T) {
require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID)
}
func TestGetProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
func TestUpdateProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
process1 := getDummyProcess()
require.NotNil(t, process1)
process1.ID = "process1"
rs.AddProcess(process)
process2 := getDummyProcess()
require.NotNil(t, process2)
process2.ID = "process2"
_, err = rs.GetProcess(process.ID)
require.Equal(t, nil, err, "Process not found (%s)", process.ID)
err = rs.AddProcess(process1)
require.Equal(t, nil, err)
err = rs.AddProcess(process2)
require.Equal(t, nil, err)
process3 := getDummyProcess()
require.NotNil(t, process3)
process3.ID = "process2"
err = rs.UpdateProcess("process1", process3)
require.Error(t, err)
process3.ID = "process3"
err = rs.UpdateProcess("process1", process3)
require.NoError(t, err)
_, err = rs.GetProcess(process1.ID)
require.Error(t, err)
_, err = rs.GetProcess(process3.ID)
require.NoError(t, err)
}
func TestGetProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process1 := getDummyProcess()
process1.ID = "foo_aaa_1"
process1.Reference = "foo_aaa_1"
process2 := getDummyProcess()
process2.ID = "bar_bbb_2"
process2.Reference = "bar_bbb_2"
process3 := getDummyProcess()
process3.ID = "foo_ccc_3"
process3.Reference = "foo_ccc_3"
process4 := getDummyProcess()
process4.ID = "bar_ddd_4"
process4.Reference = "bar_ddd_4"
rs.AddProcess(process1)
rs.AddProcess(process2)
rs.AddProcess(process3)
rs.AddProcess(process4)
_, err = rs.GetProcess(process1.ID)
require.Equal(t, nil, err)
list := rs.GetProcessIDs("", "")
require.Len(t, list, 1, "expected 1 process")
require.Equal(t, process.ID, list[0], "expected same process ID")
require.Len(t, list, 4)
require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list)
list = rs.GetProcessIDs("foo_*", "")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list)
list = rs.GetProcessIDs("bar_*", "")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list)
list = rs.GetProcessIDs("*_bbb_*", "")
require.Len(t, list, 1)
require.ElementsMatch(t, []string{"bar_bbb_2"}, list)
list = rs.GetProcessIDs("", "foo_*")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list)
list = rs.GetProcessIDs("", "bar_*")
require.Len(t, list, 2)
require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list)
list = rs.GetProcessIDs("", "*_bbb_*")
require.Len(t, list, 1)
require.ElementsMatch(t, []string{"bar_bbb_2"}, list)
}
func TestStartProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -238,7 +314,7 @@ func TestStartProcess(t *testing.T) {
}
func TestStopProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -263,7 +339,7 @@ func TestStopProcess(t *testing.T) {
}
func TestRestartProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -288,7 +364,7 @@ func TestRestartProcess(t *testing.T) {
}
func TestReloadProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -318,8 +394,8 @@ func TestReloadProcess(t *testing.T) {
rs.StopProcess(process.ID)
}
func TestProcessData(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
func TestProcessMetadata(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -340,7 +416,7 @@ func TestProcessData(t *testing.T) {
}
func TestLog(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -373,7 +449,7 @@ func TestLog(t *testing.T) {
}
func TestPlayoutNoRange(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -396,7 +472,7 @@ func TestPlayoutRange(t *testing.T) {
portrange, err := net.NewPortrange(3000, 3001)
require.NoError(t, err)
rs, err := getDummyRestreamer(portrange, nil, nil)
rs, err := getDummyRestreamer(portrange, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
@@ -417,7 +493,7 @@ func TestPlayoutRange(t *testing.T) {
}
func TestAddressReference(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process1 := getDummyProcess()
@@ -449,7 +525,7 @@ func TestAddressReference(t *testing.T) {
}
func TestConfigValidation(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -496,7 +572,7 @@ func TestConfigValidationFFmpeg(t *testing.T) {
valOut, err := ffmpeg.NewValidator([]string{"^https?://", "^rtmp://"}, nil)
require.NoError(t, err)
rsi, err := getDummyRestreamer(nil, valIn, valOut)
rsi, err := getDummyRestreamer(nil, valIn, valOut, nil)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -522,7 +598,7 @@ func TestConfigValidationFFmpeg(t *testing.T) {
}
func TestOutputAddressValidation(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -561,3 +637,196 @@ func TestOutputAddressValidation(t *testing.T) {
require.Equal(t, r.path, path)
}
}
func TestMetadata(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
data, _ := rs.GetMetadata("foobar")
require.Equal(t, nil, data, "nothing should be stored under the key")
rs.SetMetadata("foobar", process)
data, _ = rs.GetMetadata("foobar")
require.NotEqual(t, nil, data, "there should be something stored under the key")
p := data.(*app.Config)
require.Equal(t, process.ID, p.ID, "failed to retrieve stored data")
}
func TestReplacer(t *testing.T) {
replacer := replace.New()
replacer.RegisterTemplateFunc("diskfs", func(config *app.Config, section string) string {
return "/mnt/diskfs"
}, nil)
replacer.RegisterTemplateFunc("fs:disk", func(config *app.Config, section string) string {
return "/mnt/diskfs"
}, nil)
replacer.RegisterTemplateFunc("memfs", func(config *app.Config, section string) string {
return "http://localhost/mnt/memfs"
}, nil)
replacer.RegisterTemplateFunc("fs:mem", func(config *app.Config, section string) string {
return "http://localhost/mnt/memfs"
}, nil)
replacer.RegisterTemplateFunc("rtmp", func(config *app.Config, section string) string {
return "rtmp://localhost/app/{name}?token=foobar"
}, nil)
replacer.RegisterTemplateFunc("srt", func(config *app.Config, section string) string {
template := "srt://localhost:6000?mode=caller&transtype=live&latency={latency}&streamid={name}"
if section == "output" {
template += ",mode:publish"
} else {
template += ",mode:request"
}
template += ",token:abcfoobar&passphrase=secret"
return template
}, map[string]string{
"latency": "20000", // 20 milliseconds, FFmpeg requires microseconds
})
rsi, err := getDummyRestreamer(nil, nil, nil, replacer)
require.NoError(t, err)
process := &app.Config{
ID: "314159265359",
Reference: "refref",
Input: []app.ConfigIO{
{
ID: "in_{processid}_{reference}",
Address: "input:{inputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=pmtr}_srt:{srt,name=trs}_rtmp:{rtmp,name=$inputid}",
Options: []string{
"-f",
"lavfi",
"-re",
"input:{inputid}",
"process:{processid}",
"reference:{reference}",
"diskfs:{diskfs}/disk.txt",
"memfs:{memfs}/mem.txt",
"fsdisk:{fs:disk}/fsdisk.txt",
"fsmem:{fs:mem}/$inputid.txt",
},
},
},
Output: []app.ConfigIO{
{
ID: "out_{processid}_{reference}",
Address: "output:{outputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=$processid}_srt:{srt,name=$reference,latency=42}_rtmp:{rtmp,name=$outputid}",
Options: []string{
"-codec",
"copy",
"-f",
"null",
"output:{outputid}",
"process:{processid}",
"reference:{reference}",
"diskfs:{diskfs}/disk.txt",
"memfs:{memfs}/mem.txt",
"fsdisk:{fs:disk}/fsdisk.txt",
"fsmem:{fs:mem}/$outputid.txt",
},
Cleanup: []app.ConfigIOCleanup{
{
Pattern: "pattern_{outputid}_{processid}_{reference}_{rtmp,name=$outputid}",
MaxFiles: 0,
MaxFileAge: 0,
PurgeOnDelete: false,
},
},
},
},
Options: []string{
"-loglevel",
"info",
"{diskfs}/foobar_on_disk.txt",
"{memfs}/foobar_in_mem.txt",
"{fs:disk}/foobar_on_disk_aswell.txt",
"{fs:mem}/foobar_in_mem_aswell.txt",
},
Reconnect: true,
ReconnectDelay: 10,
Autostart: false,
StaleTimeout: 0,
}
err = rsi.AddProcess(process)
require.NoError(t, err)
rs := rsi.(*restream)
process = &app.Config{
ID: "314159265359",
Reference: "refref",
FFVersion: "^4.0.2",
Input: []app.ConfigIO{
{
ID: "in_314159265359_refref",
Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar",
Options: []string{
"-f",
"lavfi",
"-re",
"input:in_314159265359_refref",
"process:314159265359",
"reference:refref",
"diskfs:/mnt/diskfs/disk.txt",
"memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk.txt",
"fsmem:http://localhost/mnt/memfs/$inputid.txt",
},
Cleanup: []app.ConfigIOCleanup{},
},
},
Output: []app.ConfigIO{
{
ID: "out_314159265359_refref",
Address: "output:out_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/314159265359?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=42&streamid=refref,mode:publish,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/out_314159265359_refref?token=foobar",
Options: []string{
"-codec",
"copy",
"-f",
"null",
"output:out_314159265359_refref",
"process:314159265359",
"reference:refref",
"diskfs:/mnt/diskfs/disk.txt",
"memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk.txt",
"fsmem:http://localhost/mnt/memfs/$outputid.txt",
},
Cleanup: []app.ConfigIOCleanup{
{
Pattern: "pattern_out_314159265359_refref_314159265359_refref_{rtmp,name=$outputid}",
MaxFiles: 0,
MaxFileAge: 0,
PurgeOnDelete: false,
},
},
},
},
Options: []string{
"-loglevel",
"info",
"/mnt/diskfs/foobar_on_disk.txt",
"{memfs}/foobar_in_mem.txt",
"/mnt/diskfs/foobar_on_disk_aswell.txt",
"http://localhost/mnt/memfs/foobar_in_mem_aswell.txt",
},
Reconnect: true,
ReconnectDelay: 10,
Autostart: false,
StaleTimeout: 0,
}
require.Equal(t, process, rs.tasks["314159265359"].config)
}