Add test for dynamic placeholder via restream

This commit is contained in:
Ingo Oppermann
2024-10-29 17:04:07 +01:00
parent 2393dbc4c0
commit dd8906e56f
2 changed files with 90 additions and 41 deletions

View File

@@ -12,3 +12,15 @@ func New() resources.Resources {
return res
}
func NewWithLimits() resources.Resources {
res, _ := resources.New(resources.Config{
MaxCPU: 100,
MaxMemory: 100,
MaxGPU: 100,
MaxGPUMemory: 100,
PSUtil: psutil.New(1),
})
return res
}

View File

@@ -13,10 +13,11 @@ import (
"github.com/datarhei/core/v16/iam"
iamidentity "github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/iam/policy"
"github.com/datarhei/core/v16/internal/mock/resources"
mock "github.com/datarhei/core/v16/internal/mock/resources"
"github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/restream/app"
rfs "github.com/datarhei/core/v16/restream/fs"
"github.com/datarhei/core/v16/restream/replace"
@@ -26,13 +27,18 @@ import (
"github.com/stretchr/testify/require"
)
func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator, replacer replace.Replacer) (Restreamer, error) {
func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator, replacer replace.Replacer, limits bool) (Restreamer, error) {
binary, err := testhelper.BuildBinary("ffmpeg")
if err != nil {
return nil, fmt.Errorf("failed to build helper program: %w", err)
}
resources := resources.New()
var res resources.Resources
if limits {
res = mock.NewWithLimits()
} else {
res = mock.New()
}
ffmpeg, err := ffmpeg.New(ffmpeg.Config{
Binary: binary,
@@ -40,7 +46,7 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
Portrange: portrange,
ValidatorInput: validatorIn,
ValidatorOutput: validatorOut,
Resource: resources,
Resource: res,
})
if err != nil {
return nil, err
@@ -89,7 +95,7 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
Replace: replacer,
Filesystems: []fs.Filesystem{memfs},
Rewrite: rewriter,
Resources: resources,
Resources: res,
})
if err != nil {
return nil, err
@@ -136,7 +142,7 @@ func getDummyProcess() *app.Config {
}
func TestAddProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -158,7 +164,7 @@ func TestAddProcess(t *testing.T) {
}
func TestAutostartProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -175,7 +181,7 @@ func TestAutostartProcess(t *testing.T) {
}
func TestAddInvalidProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
// Invalid process ID
@@ -243,7 +249,7 @@ func TestAddInvalidProcess(t *testing.T) {
}
func TestRemoveProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -260,7 +266,7 @@ func TestRemoveProcess(t *testing.T) {
}
func TestUpdateProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process1 := getDummyProcess()
@@ -311,7 +317,7 @@ func TestUpdateProcess(t *testing.T) {
}
func TestUpdateSameHashProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
config := getDummyProcess()
@@ -340,7 +346,7 @@ func TestUpdateSameHashProcess(t *testing.T) {
}
func TestUpdateProcessLogHistoryTransfer(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
p := getDummyProcess()
@@ -394,7 +400,7 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) {
}
func TestUpdateProcessMetadataTransfer(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
p := getDummyProcess()
@@ -429,7 +435,7 @@ func TestUpdateProcessMetadataTransfer(t *testing.T) {
}
func TestGetProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process1 := getDummyProcess()
@@ -496,7 +502,7 @@ func TestGetProcess(t *testing.T) {
}
func TestStartProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -522,8 +528,39 @@ func TestStartProcess(t *testing.T) {
rs.StopProcess(tid)
}
func TestStartProcessWithLimits(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil, true)
require.NoError(t, err)
process := getDummyProcess()
process.LimitCPU = 1
process.LimitMemory = 1
process.LimitGPU = app.ConfigLimitGPU{
Usage: 1,
Encoder: 1,
Decoder: 1,
Memory: 1,
}
process.Options = append(process.Options, "-hwdevice", "{hwdevice}")
tid := app.ProcessID{ID: process.ID}
rs.AddProcess(process)
err = rs.StartProcess(tid)
require.Equal(t, nil, err, "should be able to start existing process")
state, _ := rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
require.Equal(t, []string{
"-loglevel", "info", "-hwdevice", "0", "-f", "lavfi", "-re", "-i", "testsrc=size=1280x720:rate=25", "-codec", "copy", "-f", "null", "-",
}, state.Command)
rs.StopProcess(tid)
}
func TestProcessResources(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -585,7 +622,7 @@ func TestProcessResources(t *testing.T) {
}
func TestStopProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -611,7 +648,7 @@ func TestStopProcess(t *testing.T) {
}
func TestRestartProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -637,7 +674,7 @@ func TestRestartProcess(t *testing.T) {
}
func TestReloadProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -669,7 +706,7 @@ func TestReloadProcess(t *testing.T) {
}
func TestParseProcessPattern(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -692,7 +729,7 @@ func TestParseProcessPattern(t *testing.T) {
}
func TestProbeProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -702,7 +739,7 @@ func TestProbeProcess(t *testing.T) {
}
func TestProbeProcessWithReference(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -718,7 +755,7 @@ func TestProbeProcessWithReference(t *testing.T) {
}
func TestProcessMetadata(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -743,7 +780,7 @@ func TestProcessMetadata(t *testing.T) {
}
func TestLog(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -782,7 +819,7 @@ func TestLog(t *testing.T) {
}
func TestLogTransfer(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -808,7 +845,7 @@ func TestLogTransfer(t *testing.T) {
}
func TestPlayoutNoRange(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -832,7 +869,7 @@ func TestPlayoutRange(t *testing.T) {
portrange, err := net.NewPortrange(3000, 3001)
require.NoError(t, err)
rs, err := getDummyRestreamer(portrange, nil, nil, nil)
rs, err := getDummyRestreamer(portrange, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -888,7 +925,7 @@ func TestParseAddressReference(t *testing.T) {
}
func TestAddressReference(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process1 := getDummyProcess()
@@ -919,7 +956,7 @@ func TestAddressReference(t *testing.T) {
}
func TestTeeAddressReference(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process1 := getDummyProcess()
@@ -965,7 +1002,7 @@ func TestTeeAddressReference(t *testing.T) {
}
func TestConfigValidation(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -1013,7 +1050,7 @@ func TestConfigValidation(t *testing.T) {
}
func TestConfigValidationWithMkdir(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -1057,7 +1094,7 @@ func TestConfigValidationFFmpeg(t *testing.T) {
valOut, err := ffmpeg.NewValidator([]string{"^https?://", "^rtmp://"}, nil)
require.NoError(t, err)
rsi, err := getDummyRestreamer(nil, valIn, valOut, nil)
rsi, err := getDummyRestreamer(nil, valIn, valOut, nil, false)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -1083,7 +1120,7 @@ func TestConfigValidationFFmpeg(t *testing.T) {
}
func TestOutputAddressValidation(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
rs := rsi.(*restream)
@@ -1124,7 +1161,7 @@ func TestOutputAddressValidation(t *testing.T) {
}
func TestMetadata(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -1422,7 +1459,7 @@ func TestProcessReplacer(t *testing.T) {
"latency": "20000", // 20 milliseconds, FFmpeg requires microseconds
})
rsi, err := getDummyRestreamer(nil, nil, nil, replacer)
rsi, err := getDummyRestreamer(nil, nil, nil, replacer, false)
require.NoError(t, err)
process := &app.Config{
@@ -1611,7 +1648,7 @@ func TestProcessReplacer(t *testing.T) {
}
func TestProcessLogPattern(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -1642,7 +1679,7 @@ func TestProcessLogPattern(t *testing.T) {
}
func TestProcessLimit(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
process := getDummyProcess()
@@ -1667,7 +1704,7 @@ func TestProcessLimit(t *testing.T) {
}
func BenchmarkGetProcessIDs(b *testing.B) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(b, err)
for i := 0; i < 1000; i++ {
@@ -1688,7 +1725,7 @@ func BenchmarkGetProcessIDs(b *testing.B) {
}
func BenchmarkGetProcess(b *testing.B) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(b, err)
for i := 0; i < 1000; i++ {
@@ -1712,7 +1749,7 @@ func BenchmarkGetProcess(b *testing.B) {
}
func BenchmarkGetProcessState(b *testing.B) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
rs, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(b, err)
n := 10
@@ -1745,7 +1782,7 @@ func BenchmarkGetProcessState(b *testing.B) {
}
func TestProcessCleanup(t *testing.T) {
rsi, err := getDummyRestreamer(nil, nil, nil, nil)
rsi, err := getDummyRestreamer(nil, nil, nil, nil, false)
require.NoError(t, err)
rsi.Start()