diff --git a/internal/mock/psutil/psutil.go b/internal/mock/psutil/psutil.go index 942bf405..c0f37048 100644 --- a/internal/mock/psutil/psutil.go +++ b/internal/mock/psutil/psutil.go @@ -114,7 +114,7 @@ func (p *mockPSUtilProcess) GPU() (*psutil.GPUInfo, error) { MemoryUsed: 42, Usage: 5, Encoder: 9, - Decoder: 7, + Decoder: 11, }, nil } func (p *mockPSUtilProcess) Cancel() {} diff --git a/process/limiter.go b/process/limiter.go index b30d4731..dfe90d60 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -467,9 +467,9 @@ func (l *limiter) collect() { l.lastUsage.Memory.Max = l.memory.Max() l.lastUsage.GPU.Index = gindex - l.lastUsage.GPU.Memory.Current = l.gpu.memory.Current() * 100 - l.lastUsage.GPU.Memory.Average = l.gpu.memory.Avg() * 100 - l.lastUsage.GPU.Memory.Max = l.gpu.memory.Max() * 100 + l.lastUsage.GPU.Memory.Current = l.gpu.memory.Current() + l.lastUsage.GPU.Memory.Average = l.gpu.memory.Avg() + l.lastUsage.GPU.Memory.Max = l.gpu.memory.Max() l.lastUsage.GPU.Usage.Current = l.gpu.usage.Current() * 100 l.lastUsage.GPU.Usage.Average = l.gpu.usage.Avg() * 100 diff --git a/process/process.go b/process/process.go index be0a9854..1f757d74 100644 --- a/process/process.go +++ b/process/process.go @@ -198,12 +198,13 @@ type States struct { // Process represents a ffmpeg process type process struct { - binary string - args []string - cmd *exec.Cmd - pid int32 - stdout io.ReadCloser - state struct { + binary string + args []string + cmdArgs []string + cmd *exec.Cmd + pid int32 + stdout io.ReadCloser + state struct { state stateType time time.Time states States @@ -268,6 +269,8 @@ func New(config Config) (Process, error) { p.args = make([]string, len(config.Args)) copy(p.args, config.Args) + p.cmdArgs = make([]string, len(config.Args)) + copy(p.cmdArgs, config.Args) // This is a loose check on purpose. If the e.g. the binary // doesn't exist or it is not executable, it will be @@ -560,8 +563,8 @@ func (p *process) Status() Status { }, } - s.CommandArgs = make([]string, len(p.args)) - copy(s.CommandArgs, p.args) + s.CommandArgs = make([]string, len(p.cmdArgs)) + copy(s.CommandArgs, p.cmdArgs) if order == "start" && !state.IsRunning() { p.reconn.lock.Lock() @@ -664,6 +667,8 @@ func (p *process) start() error { return err } + + p.cmdArgs = args } p.cmd = exec.Command(p.binary, args...) diff --git a/restream/core_test.go b/restream/core_test.go index eb05d878..3c8dfd02 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -522,6 +522,68 @@ func TestStartProcess(t *testing.T) { rs.StopProcess(tid) } +func TestProcessResources(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + process := getDummyProcess() + tid := app.ProcessID{ID: process.ID} + + rs.AddProcess(process) + + err = rs.StartProcess(tid) + require.Equal(t, nil, err, "should be able to start existing process") + + time.Sleep(2 * time.Second) + + state, _ := rs.GetProcessState(tid) + require.Equal(t, "start", state.Order, "Process should be started") + + require.Equal(t, app.ProcessUsage{ + CPU: app.ProcessUsageCPU{ + NCPU: 2, + Current: 12, + Average: 12, + Max: 12, + Limit: 0, + IsThrottling: false, + }, + Memory: app.ProcessUsageMemory{ + Current: 42, + Average: 42, + Max: 42, + Limit: 0, + }, + GPU: app.ProcessUsageGPU{ + Index: 0, + Usage: app.ProcessUsageGPUUsage{ + Current: 5, + Average: 5, + Max: 5, + Limit: 0, + }, + Encoder: app.ProcessUsageGPUUsage{ + Current: 9, + Average: 9, + Max: 9, + Limit: 0, + }, + Decoder: app.ProcessUsageGPUUsage{ + Current: 11, + Average: 11, + Max: 11, + Limit: 0, + }, + Memory: app.ProcessUsageGPUMemory{ + Current: 42, + Average: 42, + Max: 42, + Limit: 0, + }, + }, + }, state.Resources) +} + func TestStopProcess(t *testing.T) { rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err) @@ -1142,7 +1204,7 @@ func TestReplacer(t *testing.T) { Input: []app.ConfigIO{ { ID: "in_{processid}_{reference}", - Address: "input:{inputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=pmtr}_srt:{srt,name=trs}_rtmp:{rtmp,name=$inputid}", + Address: "input:{inputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=pmtr}_srt:{srt,name=trs}_rtmp:{rtmp,name=$inputid}_hwdevice:{hwdevice}", Options: []string{ "-f", "lavfi", @@ -1154,6 +1216,7 @@ func TestReplacer(t *testing.T) { "memfs:{memfs}/mem.txt", "fsdisk:{fs:disk}/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsmem:{fs:mem}/$inputid.txt", + "hwdevice:{hwdevice}", }, }, }, @@ -1191,6 +1254,7 @@ func TestReplacer(t *testing.T) { "{memfs}/foobar_in_mem.txt", "{fs:disk}/foobar_on_disk_aswell.txt", "{fs:mem}/foobar_in_mem_aswell.txt", + "hwdevice:{hwdevice}", }, Reconnect: true, ReconnectDelay: 10, @@ -1207,7 +1271,7 @@ func TestReplacer(t *testing.T) { Input: []app.ConfigIO{ { ID: "in_314159265359_refref", - Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar", + Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar_hwdevice:{hwdevice}", Options: []string{ "-f", "lavfi", @@ -1219,6 +1283,7 @@ func TestReplacer(t *testing.T) { "memfs:http://localhost/mnt/memfs/mem.txt", "fsdisk:/mnt/diskfs/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsmem:http://localhost/mnt/memfs/$inputid.txt", + "hwdevice:{hwdevice}", }, }, }, @@ -1256,6 +1321,7 @@ func TestReplacer(t *testing.T) { "{memfs}/foobar_in_mem.txt", "/mnt/diskfs/foobar_on_disk_aswell.txt", "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", + "hwdevice:{hwdevice}", }, Reconnect: true, ReconnectDelay: 10, @@ -1265,12 +1331,23 @@ func TestReplacer(t *testing.T) { require.Equal(t, wantprocess, process) - resolveDynamicPlaceholder(process, replacer, nil, nil) + resolveDynamicPlaceholder(process, replacer, map[string]string{ + "hwdevice": fmt.Sprintf("%d", -1), + }, nil) + wantprocess.Options = []string{ + "-loglevel", + "info", + "/mnt/diskfs/foobar_on_disk.txt", + "{memfs}/foobar_in_mem.txt", + "/mnt/diskfs/foobar_on_disk_aswell.txt", + "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", + "hwdevice:-1", + } wantprocess.Input = []app.ConfigIO{ { ID: "in_314159265359_refref", - Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar", + Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar_hwdevice:-1", Options: []string{ "-f", "lavfi", @@ -1282,6 +1359,7 @@ func TestReplacer(t *testing.T) { "memfs:http://localhost/mnt/memfs/mem.txt", "fsdisk:/mnt/diskfs/fsdisk_20191012_072050.txt", "fsmem:http://localhost/mnt/memfs/$inputid.txt", + "hwdevice:-1", }, }, } @@ -1365,6 +1443,7 @@ func TestProcessReplacer(t *testing.T) { "memfs:{memfs}/mem.txt", "fsdisk:{fs:disk}/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsmem:{fs:mem}/$inputid.txt", + "hwdevice:{hwdevice}", }, }, }, @@ -1402,6 +1481,7 @@ func TestProcessReplacer(t *testing.T) { "{memfs}/foobar_in_mem.txt", "{fs:disk}/foobar_on_disk_aswell.txt", "{fs:mem}/foobar_in_mem_aswell.txt", + "hwdevice:{hwdevice}", }, Reconnect: true, ReconnectDelay: 10, @@ -1433,6 +1513,7 @@ func TestProcessReplacer(t *testing.T) { "memfs:http://localhost/mnt/memfs/mem.txt", "fsdisk:/mnt/diskfs/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsmem:http://localhost/mnt/memfs/$inputid.txt", + "hwdevice:{hwdevice}", }, Cleanup: []app.ConfigIOCleanup{}, }, @@ -1471,6 +1552,7 @@ func TestProcessReplacer(t *testing.T) { "{memfs}/foobar_in_mem.txt", "/mnt/diskfs/foobar_on_disk_aswell.txt", "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", + "hwdevice:{hwdevice}", }, Reconnect: true, ReconnectDelay: 10, @@ -1483,6 +1565,49 @@ func TestProcessReplacer(t *testing.T) { require.True(t, ok) require.Equal(t, process, task.config) + + err = rsi.StartProcess(app.ProcessID{ID: "314159265359"}) + require.NoError(t, err) + + state, err := rsi.GetProcessState(app.ProcessID{ID: "314159265359"}) + require.NoError(t, err) + + require.Equal(t, []string{ + "-loglevel", + "info", + "/mnt/diskfs/foobar_on_disk.txt", + "{memfs}/foobar_in_mem.txt", + "/mnt/diskfs/foobar_on_disk_aswell.txt", + "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", + "hwdevice:-1", + "-f", + "lavfi", + "-re", + "input:in_314159265359_refref", + "process:314159265359", + "reference:refref", + "diskfs:/mnt/diskfs/disk.txt", + "memfs:http://localhost/mnt/memfs/mem.txt", + "fsdisk:/mnt/diskfs/fsdisk_20191012_072050.txt", + "fsmem:http://localhost/mnt/memfs/$inputid.txt", + "hwdevice:-1", + "-i", + "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar", + "-codec", + "copy", + "-f", + "null", + "output:out_314159265359_refref", + "process:314159265359", + "reference:refref", + "diskfs:/mnt/diskfs/disk.txt", + "memfs:http://localhost/mnt/memfs/mem.txt", + "fsdisk:/mnt/diskfs/fsdisk.txt", + "fsmem:http://localhost/mnt/memfs/$outputid.txt", + "output:out_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/314159265359?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=42&streamid=refref,mode:publish,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/out_314159265359_refref?token=foobar", + }, state.Command) + + rsi.StopProcess(app.ProcessID{ID: "314159265359"}) } func TestProcessLogPattern(t *testing.T) {