Add replacer tests, fix command in state with dynamic placeholders

This commit is contained in:
Ingo Oppermann
2024-10-29 16:43:47 +01:00
parent de9a30a108
commit 2393dbc4c0
4 changed files with 146 additions and 16 deletions

View File

@@ -114,7 +114,7 @@ func (p *mockPSUtilProcess) GPU() (*psutil.GPUInfo, error) {
MemoryUsed: 42, MemoryUsed: 42,
Usage: 5, Usage: 5,
Encoder: 9, Encoder: 9,
Decoder: 7, Decoder: 11,
}, nil }, nil
} }
func (p *mockPSUtilProcess) Cancel() {} func (p *mockPSUtilProcess) Cancel() {}

View File

@@ -467,9 +467,9 @@ func (l *limiter) collect() {
l.lastUsage.Memory.Max = l.memory.Max() l.lastUsage.Memory.Max = l.memory.Max()
l.lastUsage.GPU.Index = gindex l.lastUsage.GPU.Index = gindex
l.lastUsage.GPU.Memory.Current = l.gpu.memory.Current() * 100 l.lastUsage.GPU.Memory.Current = l.gpu.memory.Current()
l.lastUsage.GPU.Memory.Average = l.gpu.memory.Avg() * 100 l.lastUsage.GPU.Memory.Average = l.gpu.memory.Avg()
l.lastUsage.GPU.Memory.Max = l.gpu.memory.Max() * 100 l.lastUsage.GPU.Memory.Max = l.gpu.memory.Max()
l.lastUsage.GPU.Usage.Current = l.gpu.usage.Current() * 100 l.lastUsage.GPU.Usage.Current = l.gpu.usage.Current() * 100
l.lastUsage.GPU.Usage.Average = l.gpu.usage.Avg() * 100 l.lastUsage.GPU.Usage.Average = l.gpu.usage.Avg() * 100

View File

@@ -198,12 +198,13 @@ type States struct {
// Process represents a ffmpeg process // Process represents a ffmpeg process
type process struct { type process struct {
binary string binary string
args []string args []string
cmd *exec.Cmd cmdArgs []string
pid int32 cmd *exec.Cmd
stdout io.ReadCloser pid int32
state struct { stdout io.ReadCloser
state struct {
state stateType state stateType
time time.Time time time.Time
states States states States
@@ -268,6 +269,8 @@ func New(config Config) (Process, error) {
p.args = make([]string, len(config.Args)) p.args = make([]string, len(config.Args))
copy(p.args, config.Args) copy(p.args, config.Args)
p.cmdArgs = make([]string, len(config.Args))
copy(p.cmdArgs, config.Args)
// This is a loose check on purpose. If the e.g. the binary // This is a loose check on purpose. If the e.g. the binary
// doesn't exist or it is not executable, it will be // doesn't exist or it is not executable, it will be
@@ -560,8 +563,8 @@ func (p *process) Status() Status {
}, },
} }
s.CommandArgs = make([]string, len(p.args)) s.CommandArgs = make([]string, len(p.cmdArgs))
copy(s.CommandArgs, p.args) copy(s.CommandArgs, p.cmdArgs)
if order == "start" && !state.IsRunning() { if order == "start" && !state.IsRunning() {
p.reconn.lock.Lock() p.reconn.lock.Lock()
@@ -664,6 +667,8 @@ func (p *process) start() error {
return err return err
} }
p.cmdArgs = args
} }
p.cmd = exec.Command(p.binary, args...) p.cmd = exec.Command(p.binary, args...)

View File

@@ -522,6 +522,68 @@ func TestStartProcess(t *testing.T) {
rs.StopProcess(tid) rs.StopProcess(tid)
} }
func TestProcessResources(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
process := getDummyProcess()
tid := app.ProcessID{ID: process.ID}
rs.AddProcess(process)
err = rs.StartProcess(tid)
require.Equal(t, nil, err, "should be able to start existing process")
time.Sleep(2 * time.Second)
state, _ := rs.GetProcessState(tid)
require.Equal(t, "start", state.Order, "Process should be started")
require.Equal(t, app.ProcessUsage{
CPU: app.ProcessUsageCPU{
NCPU: 2,
Current: 12,
Average: 12,
Max: 12,
Limit: 0,
IsThrottling: false,
},
Memory: app.ProcessUsageMemory{
Current: 42,
Average: 42,
Max: 42,
Limit: 0,
},
GPU: app.ProcessUsageGPU{
Index: 0,
Usage: app.ProcessUsageGPUUsage{
Current: 5,
Average: 5,
Max: 5,
Limit: 0,
},
Encoder: app.ProcessUsageGPUUsage{
Current: 9,
Average: 9,
Max: 9,
Limit: 0,
},
Decoder: app.ProcessUsageGPUUsage{
Current: 11,
Average: 11,
Max: 11,
Limit: 0,
},
Memory: app.ProcessUsageGPUMemory{
Current: 42,
Average: 42,
Max: 42,
Limit: 0,
},
},
}, state.Resources)
}
func TestStopProcess(t *testing.T) { func TestStopProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil) rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@@ -1142,7 +1204,7 @@ func TestReplacer(t *testing.T) {
Input: []app.ConfigIO{ Input: []app.ConfigIO{
{ {
ID: "in_{processid}_{reference}", ID: "in_{processid}_{reference}",
Address: "input:{inputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=pmtr}_srt:{srt,name=trs}_rtmp:{rtmp,name=$inputid}", Address: "input:{inputid}_process:{processid}_reference:{reference}_diskfs:{diskfs}/disk.txt_memfs:{memfs}/mem.txt_fsdisk:{fs:disk}/fsdisk.txt_fsmem:{fs:mem}/fsmem.txt_rtmp:{rtmp,name=pmtr}_srt:{srt,name=trs}_rtmp:{rtmp,name=$inputid}_hwdevice:{hwdevice}",
Options: []string{ Options: []string{
"-f", "-f",
"lavfi", "lavfi",
@@ -1154,6 +1216,7 @@ func TestReplacer(t *testing.T) {
"memfs:{memfs}/mem.txt", "memfs:{memfs}/mem.txt",
"fsdisk:{fs:disk}/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsdisk:{fs:disk}/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt",
"fsmem:{fs:mem}/$inputid.txt", "fsmem:{fs:mem}/$inputid.txt",
"hwdevice:{hwdevice}",
}, },
}, },
}, },
@@ -1191,6 +1254,7 @@ func TestReplacer(t *testing.T) {
"{memfs}/foobar_in_mem.txt", "{memfs}/foobar_in_mem.txt",
"{fs:disk}/foobar_on_disk_aswell.txt", "{fs:disk}/foobar_on_disk_aswell.txt",
"{fs:mem}/foobar_in_mem_aswell.txt", "{fs:mem}/foobar_in_mem_aswell.txt",
"hwdevice:{hwdevice}",
}, },
Reconnect: true, Reconnect: true,
ReconnectDelay: 10, ReconnectDelay: 10,
@@ -1207,7 +1271,7 @@ func TestReplacer(t *testing.T) {
Input: []app.ConfigIO{ Input: []app.ConfigIO{
{ {
ID: "in_314159265359_refref", ID: "in_314159265359_refref",
Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar", Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar_hwdevice:{hwdevice}",
Options: []string{ Options: []string{
"-f", "-f",
"lavfi", "lavfi",
@@ -1219,6 +1283,7 @@ func TestReplacer(t *testing.T) {
"memfs:http://localhost/mnt/memfs/mem.txt", "memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsdisk:/mnt/diskfs/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt",
"fsmem:http://localhost/mnt/memfs/$inputid.txt", "fsmem:http://localhost/mnt/memfs/$inputid.txt",
"hwdevice:{hwdevice}",
}, },
}, },
}, },
@@ -1256,6 +1321,7 @@ func TestReplacer(t *testing.T) {
"{memfs}/foobar_in_mem.txt", "{memfs}/foobar_in_mem.txt",
"/mnt/diskfs/foobar_on_disk_aswell.txt", "/mnt/diskfs/foobar_on_disk_aswell.txt",
"http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt",
"hwdevice:{hwdevice}",
}, },
Reconnect: true, Reconnect: true,
ReconnectDelay: 10, ReconnectDelay: 10,
@@ -1265,12 +1331,23 @@ func TestReplacer(t *testing.T) {
require.Equal(t, wantprocess, process) require.Equal(t, wantprocess, process)
resolveDynamicPlaceholder(process, replacer, nil, nil) resolveDynamicPlaceholder(process, replacer, map[string]string{
"hwdevice": fmt.Sprintf("%d", -1),
}, nil)
wantprocess.Options = []string{
"-loglevel",
"info",
"/mnt/diskfs/foobar_on_disk.txt",
"{memfs}/foobar_in_mem.txt",
"/mnt/diskfs/foobar_on_disk_aswell.txt",
"http://localhost/mnt/memfs/foobar_in_mem_aswell.txt",
"hwdevice:-1",
}
wantprocess.Input = []app.ConfigIO{ wantprocess.Input = []app.ConfigIO{
{ {
ID: "in_314159265359_refref", ID: "in_314159265359_refref",
Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar", Address: "input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar_hwdevice:-1",
Options: []string{ Options: []string{
"-f", "-f",
"lavfi", "lavfi",
@@ -1282,6 +1359,7 @@ func TestReplacer(t *testing.T) {
"memfs:http://localhost/mnt/memfs/mem.txt", "memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk_20191012_072050.txt", "fsdisk:/mnt/diskfs/fsdisk_20191012_072050.txt",
"fsmem:http://localhost/mnt/memfs/$inputid.txt", "fsmem:http://localhost/mnt/memfs/$inputid.txt",
"hwdevice:-1",
}, },
}, },
} }
@@ -1365,6 +1443,7 @@ func TestProcessReplacer(t *testing.T) {
"memfs:{memfs}/mem.txt", "memfs:{memfs}/mem.txt",
"fsdisk:{fs:disk}/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsdisk:{fs:disk}/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt",
"fsmem:{fs:mem}/$inputid.txt", "fsmem:{fs:mem}/$inputid.txt",
"hwdevice:{hwdevice}",
}, },
}, },
}, },
@@ -1402,6 +1481,7 @@ func TestProcessReplacer(t *testing.T) {
"{memfs}/foobar_in_mem.txt", "{memfs}/foobar_in_mem.txt",
"{fs:disk}/foobar_on_disk_aswell.txt", "{fs:disk}/foobar_on_disk_aswell.txt",
"{fs:mem}/foobar_in_mem_aswell.txt", "{fs:mem}/foobar_in_mem_aswell.txt",
"hwdevice:{hwdevice}",
}, },
Reconnect: true, Reconnect: true,
ReconnectDelay: 10, ReconnectDelay: 10,
@@ -1433,6 +1513,7 @@ func TestProcessReplacer(t *testing.T) {
"memfs:http://localhost/mnt/memfs/mem.txt", "memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt", "fsdisk:/mnt/diskfs/fsdisk_{date,format=%Y%m%d_%H%M%S}.txt",
"fsmem:http://localhost/mnt/memfs/$inputid.txt", "fsmem:http://localhost/mnt/memfs/$inputid.txt",
"hwdevice:{hwdevice}",
}, },
Cleanup: []app.ConfigIOCleanup{}, Cleanup: []app.ConfigIOCleanup{},
}, },
@@ -1471,6 +1552,7 @@ func TestProcessReplacer(t *testing.T) {
"{memfs}/foobar_in_mem.txt", "{memfs}/foobar_in_mem.txt",
"/mnt/diskfs/foobar_on_disk_aswell.txt", "/mnt/diskfs/foobar_on_disk_aswell.txt",
"http://localhost/mnt/memfs/foobar_in_mem_aswell.txt", "http://localhost/mnt/memfs/foobar_in_mem_aswell.txt",
"hwdevice:{hwdevice}",
}, },
Reconnect: true, Reconnect: true,
ReconnectDelay: 10, ReconnectDelay: 10,
@@ -1483,6 +1565,49 @@ func TestProcessReplacer(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Equal(t, process, task.config) require.Equal(t, process, task.config)
err = rsi.StartProcess(app.ProcessID{ID: "314159265359"})
require.NoError(t, err)
state, err := rsi.GetProcessState(app.ProcessID{ID: "314159265359"})
require.NoError(t, err)
require.Equal(t, []string{
"-loglevel",
"info",
"/mnt/diskfs/foobar_on_disk.txt",
"{memfs}/foobar_in_mem.txt",
"/mnt/diskfs/foobar_on_disk_aswell.txt",
"http://localhost/mnt/memfs/foobar_in_mem_aswell.txt",
"hwdevice:-1",
"-f",
"lavfi",
"-re",
"input:in_314159265359_refref",
"process:314159265359",
"reference:refref",
"diskfs:/mnt/diskfs/disk.txt",
"memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk_20191012_072050.txt",
"fsmem:http://localhost/mnt/memfs/$inputid.txt",
"hwdevice:-1",
"-i",
"input:in_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/pmtr?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=20000&streamid=trs,mode:request,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/in_314159265359_refref?token=foobar",
"-codec",
"copy",
"-f",
"null",
"output:out_314159265359_refref",
"process:314159265359",
"reference:refref",
"diskfs:/mnt/diskfs/disk.txt",
"memfs:http://localhost/mnt/memfs/mem.txt",
"fsdisk:/mnt/diskfs/fsdisk.txt",
"fsmem:http://localhost/mnt/memfs/$outputid.txt",
"output:out_314159265359_refref_process:314159265359_reference:refref_diskfs:/mnt/diskfs/disk.txt_memfs:http://localhost/mnt/memfs/mem.txt_fsdisk:/mnt/diskfs/fsdisk.txt_fsmem:http://localhost/mnt/memfs/fsmem.txt_rtmp:rtmp://localhost/app/314159265359?token=foobar_srt:srt://localhost:6000?mode=caller&transtype=live&latency=42&streamid=refref,mode:publish,token:abcfoobar&passphrase=secret_rtmp:rtmp://localhost/app/out_314159265359_refref?token=foobar",
}, state.Command)
rsi.StopProcess(app.ProcessID{ID: "314159265359"})
} }
func TestProcessLogPattern(t *testing.T) { func TestProcessLogPattern(t *testing.T) {