From dd8906e56f3611fc4afde0fb80db2c512e3dbeda Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 29 Oct 2024 17:04:07 +0100 Subject: [PATCH] Add test for dynamic placeholder via restream --- internal/mock/resources/resources.go | 12 +++ restream/core_test.go | 119 ++++++++++++++++++--------- 2 files changed, 90 insertions(+), 41 deletions(-) diff --git a/internal/mock/resources/resources.go b/internal/mock/resources/resources.go index 84f050f5..127e870d 100644 --- a/internal/mock/resources/resources.go +++ b/internal/mock/resources/resources.go @@ -12,3 +12,15 @@ func New() resources.Resources { return res } + +func NewWithLimits() resources.Resources { + res, _ := resources.New(resources.Config{ + MaxCPU: 100, + MaxMemory: 100, + MaxGPU: 100, + MaxGPUMemory: 100, + PSUtil: psutil.New(1), + }) + + return res +} diff --git a/restream/core_test.go b/restream/core_test.go index 3c8dfd02..306f4cf0 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -13,10 +13,11 @@ import ( "github.com/datarhei/core/v16/iam" iamidentity "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/iam/policy" - "github.com/datarhei/core/v16/internal/mock/resources" + mock "github.com/datarhei/core/v16/internal/mock/resources" "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/net" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/restream/app" rfs "github.com/datarhei/core/v16/restream/fs" "github.com/datarhei/core/v16/restream/replace" @@ -26,13 +27,18 @@ import ( "github.com/stretchr/testify/require" ) -func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator, replacer replace.Replacer) (Restreamer, error) { +func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmpeg.Validator, replacer replace.Replacer, limits bool) (Restreamer, error) { binary, err := testhelper.BuildBinary("ffmpeg") if err != nil { return nil, fmt.Errorf("failed to build helper program: %w", err) } - resources := resources.New() + var res resources.Resources + if limits { + res = mock.NewWithLimits() + } else { + res = mock.New() + } ffmpeg, err := ffmpeg.New(ffmpeg.Config{ Binary: binary, @@ -40,7 +46,7 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp Portrange: portrange, ValidatorInput: validatorIn, ValidatorOutput: validatorOut, - Resource: resources, + Resource: res, }) if err != nil { return nil, err @@ -89,7 +95,7 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp Replace: replacer, Filesystems: []fs.Filesystem{memfs}, Rewrite: rewriter, - Resources: resources, + Resources: res, }) if err != nil { return nil, err @@ -136,7 +142,7 @@ func getDummyProcess() *app.Config { } func TestAddProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -158,7 +164,7 @@ func TestAddProcess(t *testing.T) { } func TestAutostartProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -175,7 +181,7 @@ func TestAutostartProcess(t *testing.T) { } func TestAddInvalidProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) // Invalid process ID @@ -243,7 +249,7 @@ func TestAddInvalidProcess(t *testing.T) { } func TestRemoveProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -260,7 +266,7 @@ func TestRemoveProcess(t *testing.T) { } func TestUpdateProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process1 := getDummyProcess() @@ -311,7 +317,7 @@ func TestUpdateProcess(t *testing.T) { } func TestUpdateSameHashProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) config := getDummyProcess() @@ -340,7 +346,7 @@ func TestUpdateSameHashProcess(t *testing.T) { } func TestUpdateProcessLogHistoryTransfer(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) p := getDummyProcess() @@ -394,7 +400,7 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) { } func TestUpdateProcessMetadataTransfer(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) p := getDummyProcess() @@ -429,7 +435,7 @@ func TestUpdateProcessMetadataTransfer(t *testing.T) { } func TestGetProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process1 := getDummyProcess() @@ -496,7 +502,7 @@ func TestGetProcess(t *testing.T) { } func TestStartProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -522,8 +528,39 @@ func TestStartProcess(t *testing.T) { rs.StopProcess(tid) } +func TestStartProcessWithLimits(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil, true) + require.NoError(t, err) + + process := getDummyProcess() + process.LimitCPU = 1 + process.LimitMemory = 1 + process.LimitGPU = app.ConfigLimitGPU{ + Usage: 1, + Encoder: 1, + Decoder: 1, + Memory: 1, + } + process.Options = append(process.Options, "-hwdevice", "{hwdevice}") + tid := app.ProcessID{ID: process.ID} + + rs.AddProcess(process) + + err = rs.StartProcess(tid) + require.Equal(t, nil, err, "should be able to start existing process") + + state, _ := rs.GetProcessState(tid) + require.Equal(t, "start", state.Order, "Process should be started") + + require.Equal(t, []string{ + "-loglevel", "info", "-hwdevice", "0", "-f", "lavfi", "-re", "-i", "testsrc=size=1280x720:rate=25", "-codec", "copy", "-f", "null", "-", + }, state.Command) + + rs.StopProcess(tid) +} + func TestProcessResources(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -585,7 +622,7 @@ func TestProcessResources(t *testing.T) { } func TestStopProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -611,7 +648,7 @@ func TestStopProcess(t *testing.T) { } func TestRestartProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -637,7 +674,7 @@ func TestRestartProcess(t *testing.T) { } func TestReloadProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -669,7 +706,7 @@ func TestReloadProcess(t *testing.T) { } func TestParseProcessPattern(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -692,7 +729,7 @@ func TestParseProcessPattern(t *testing.T) { } func TestProbeProcess(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -702,7 +739,7 @@ func TestProbeProcess(t *testing.T) { } func TestProbeProcessWithReference(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -718,7 +755,7 @@ func TestProbeProcessWithReference(t *testing.T) { } func TestProcessMetadata(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -743,7 +780,7 @@ func TestProcessMetadata(t *testing.T) { } func TestLog(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -782,7 +819,7 @@ func TestLog(t *testing.T) { } func TestLogTransfer(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -808,7 +845,7 @@ func TestLogTransfer(t *testing.T) { } func TestPlayoutNoRange(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -832,7 +869,7 @@ func TestPlayoutRange(t *testing.T) { portrange, err := net.NewPortrange(3000, 3001) require.NoError(t, err) - rs, err := getDummyRestreamer(portrange, nil, nil, nil) + rs, err := getDummyRestreamer(portrange, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -888,7 +925,7 @@ func TestParseAddressReference(t *testing.T) { } func TestAddressReference(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process1 := getDummyProcess() @@ -919,7 +956,7 @@ func TestAddressReference(t *testing.T) { } func TestTeeAddressReference(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process1 := getDummyProcess() @@ -965,7 +1002,7 @@ func TestTeeAddressReference(t *testing.T) { } func TestConfigValidation(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) rs := rsi.(*restream) @@ -1013,7 +1050,7 @@ func TestConfigValidation(t *testing.T) { } func TestConfigValidationWithMkdir(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) rs := rsi.(*restream) @@ -1057,7 +1094,7 @@ func TestConfigValidationFFmpeg(t *testing.T) { valOut, err := ffmpeg.NewValidator([]string{"^https?://", "^rtmp://"}, nil) require.NoError(t, err) - rsi, err := getDummyRestreamer(nil, valIn, valOut, nil) + rsi, err := getDummyRestreamer(nil, valIn, valOut, nil, false) require.NoError(t, err) rs := rsi.(*restream) @@ -1083,7 +1120,7 @@ func TestConfigValidationFFmpeg(t *testing.T) { } func TestOutputAddressValidation(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) rs := rsi.(*restream) @@ -1124,7 +1161,7 @@ func TestOutputAddressValidation(t *testing.T) { } func TestMetadata(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -1422,7 +1459,7 @@ func TestProcessReplacer(t *testing.T) { "latency": "20000", // 20 milliseconds, FFmpeg requires microseconds }) - rsi, err := getDummyRestreamer(nil, nil, nil, replacer) + rsi, err := getDummyRestreamer(nil, nil, nil, replacer, false) require.NoError(t, err) process := &app.Config{ @@ -1611,7 +1648,7 @@ func TestProcessReplacer(t *testing.T) { } func TestProcessLogPattern(t *testing.T) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -1642,7 +1679,7 @@ func TestProcessLogPattern(t *testing.T) { } func TestProcessLimit(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) process := getDummyProcess() @@ -1667,7 +1704,7 @@ func TestProcessLimit(t *testing.T) { } func BenchmarkGetProcessIDs(b *testing.B) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(b, err) for i := 0; i < 1000; i++ { @@ -1688,7 +1725,7 @@ func BenchmarkGetProcessIDs(b *testing.B) { } func BenchmarkGetProcess(b *testing.B) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(b, err) for i := 0; i < 1000; i++ { @@ -1712,7 +1749,7 @@ func BenchmarkGetProcess(b *testing.B) { } func BenchmarkGetProcessState(b *testing.B) { - rs, err := getDummyRestreamer(nil, nil, nil, nil) + rs, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(b, err) n := 10 @@ -1745,7 +1782,7 @@ func BenchmarkGetProcessState(b *testing.B) { } func TestProcessCleanup(t *testing.T) { - rsi, err := getDummyRestreamer(nil, nil, nil, nil) + rsi, err := getDummyRestreamer(nil, nil, nil, nil, false) require.NoError(t, err) rsi.Start()