mirror of
				https://github.com/datarhei/core.git
				synced 2025-11-01 03:42:51 +08:00 
			
		
		
		
	Merge branch 'clusteriam' into vod
This commit is contained in:
		| @@ -7,6 +7,9 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/datarhei/core/v16/ffmpeg" | ||||
| 	"github.com/datarhei/core/v16/iam" | ||||
| 	iamaccess "github.com/datarhei/core/v16/iam/access" | ||||
| 	iamidentity "github.com/datarhei/core/v16/iam/identity" | ||||
| 	"github.com/datarhei/core/v16/internal/testhelper" | ||||
| 	"github.com/datarhei/core/v16/io/fs" | ||||
| 	"github.com/datarhei/core/v16/net" | ||||
| @@ -14,6 +17,7 @@ import ( | ||||
| 	"github.com/datarhei/core/v16/restream/app" | ||||
| 	rfs "github.com/datarhei/core/v16/restream/fs" | ||||
| 	"github.com/datarhei/core/v16/restream/replace" | ||||
| 	"github.com/datarhei/core/v16/restream/rewrite" | ||||
| 	"github.com/lestrrat-go/strftime" | ||||
|  | ||||
| 	"github.com/stretchr/testify/require" | ||||
| @@ -41,10 +45,43 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	policyAdapter, err := iamaccess.NewJSONAdapter(memfs, "./policy.json", nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	identityAdapter, err := iamidentity.NewJSONAdapter(memfs, "./users.json", nil) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	iam, err := iam.New(iam.Config{ | ||||
| 		PolicyAdapter:   policyAdapter, | ||||
| 		IdentityAdapter: identityAdapter, | ||||
| 		Superuser: iamidentity.User{ | ||||
| 			Name: "foobar", | ||||
| 		}, | ||||
| 		JWTRealm:  "", | ||||
| 		JWTSecret: "", | ||||
| 		Logger:    nil, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	iam.AddPolicy("$anon", "$none", "process:*", []string{"CREATE", "GET", "DELETE", "UPDATE", "COMMAND", "PROBE", "METADATA", "PLAYOUT"}) | ||||
|  | ||||
| 	rewriter, err := rewrite.New(rewrite.Config{}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	rs, err := New(Config{ | ||||
| 		FFmpeg:      ffmpeg, | ||||
| 		Replace:     replacer, | ||||
| 		Filesystems: []fs.Filesystem{memfs}, | ||||
| 		Rewrite:     rewriter, | ||||
| 		IAM:         iam, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -97,16 +134,18 @@ func TestAddProcess(t *testing.T) { | ||||
| 	process := getDummyProcess() | ||||
| 	require.NotNil(t, process) | ||||
|  | ||||
| 	_, err = rs.GetProcess(process.ID) | ||||
| 	require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	_, err = rs.GetProcess(tid) | ||||
| 	require.Equal(t, ErrUnknownProcess, err) | ||||
|  | ||||
| 	err = rs.AddProcess(process) | ||||
| 	require.Equal(t, nil, err, "Failed to add process (%s)", err) | ||||
|  | ||||
| 	_, err = rs.GetProcess(process.ID) | ||||
| 	_, err = rs.GetProcess(tid) | ||||
| 	require.Equal(t, nil, err, "Set process not found (%s)", process.ID) | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(process.ID) | ||||
| 	state, _ := rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "stop", state.Order, "Process should be stopped") | ||||
| } | ||||
|  | ||||
| @@ -117,12 +156,14 @@ func TestAutostartProcess(t *testing.T) { | ||||
| 	process := getDummyProcess() | ||||
| 	process.Autostart = true | ||||
|  | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(process.ID) | ||||
| 	state, _ := rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "start", state.Order, "Process should be started") | ||||
|  | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
| } | ||||
|  | ||||
| func TestAddInvalidProcess(t *testing.T) { | ||||
| @@ -198,14 +239,15 @@ func TestRemoveProcess(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	err = rs.AddProcess(process) | ||||
| 	require.Equal(t, nil, err, "Failed to add process (%s)", err) | ||||
|  | ||||
| 	err = rs.DeleteProcess(process.ID) | ||||
| 	err = rs.DeleteProcess(tid) | ||||
| 	require.Equal(t, nil, err, "Set process not found (%s)", process.ID) | ||||
|  | ||||
| 	_, err = rs.GetProcess(process.ID) | ||||
| 	_, err = rs.GetProcess(tid) | ||||
| 	require.NotEqual(t, nil, err, "Unset process found (%s)", process.ID) | ||||
| } | ||||
|  | ||||
| @@ -216,10 +258,12 @@ func TestUpdateProcess(t *testing.T) { | ||||
| 	process1 := getDummyProcess() | ||||
| 	require.NotNil(t, process1) | ||||
| 	process1.ID = "process1" | ||||
| 	tid1 := TaskID{ID: process1.ID} | ||||
|  | ||||
| 	process2 := getDummyProcess() | ||||
| 	require.NotNil(t, process2) | ||||
| 	process2.ID = "process2" | ||||
| 	tid2 := TaskID{ID: process2.ID} | ||||
|  | ||||
| 	err = rs.AddProcess(process1) | ||||
| 	require.Equal(t, nil, err) | ||||
| @@ -227,7 +271,7 @@ func TestUpdateProcess(t *testing.T) { | ||||
| 	err = rs.AddProcess(process2) | ||||
| 	require.Equal(t, nil, err) | ||||
|  | ||||
| 	process, err := rs.GetProcess(process2.ID) | ||||
| 	process, err := rs.GetProcess(tid2) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	createdAt := process.CreatedAt | ||||
| @@ -238,18 +282,20 @@ func TestUpdateProcess(t *testing.T) { | ||||
| 	process3 := getDummyProcess() | ||||
| 	require.NotNil(t, process3) | ||||
| 	process3.ID = "process2" | ||||
| 	tid3 := TaskID{ID: process3.ID} | ||||
|  | ||||
| 	err = rs.UpdateProcess("process1", process3) | ||||
| 	err = rs.UpdateProcess(tid1, process3) | ||||
| 	require.Error(t, err) | ||||
|  | ||||
| 	process3.ID = "process3" | ||||
| 	err = rs.UpdateProcess("process1", process3) | ||||
| 	tid3.ID = process3.ID | ||||
| 	err = rs.UpdateProcess(tid1, process3) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	_, err = rs.GetProcess(process1.ID) | ||||
| 	_, err = rs.GetProcess(tid1) | ||||
| 	require.Error(t, err) | ||||
|  | ||||
| 	process, err = rs.GetProcess(process3.ID) | ||||
| 	process, err = rs.GetProcess(tid3) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	require.NotEqual(t, createdAt, process.CreatedAt) // this should be equal, but will require a major version jump | ||||
| @@ -264,17 +310,19 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) { | ||||
| 	require.NotNil(t, p) | ||||
| 	p.ID = "process1" | ||||
|  | ||||
| 	tid1 := TaskID{ID: p.ID} | ||||
|  | ||||
| 	err = rs.AddProcess(p) | ||||
| 	require.Equal(t, nil, err) | ||||
|  | ||||
| 	rs.StartProcess(p.ID) | ||||
| 	rs.StartProcess(tid1) | ||||
|  | ||||
| 	require.Eventually(t, func() bool { | ||||
| 		state, _ := rs.GetProcessState(p.ID) | ||||
| 		state, _ := rs.GetProcessState(tid1) | ||||
| 		return state.State == "running" | ||||
| 	}, 10*time.Second, time.Second) | ||||
|  | ||||
| 	log, err := rs.GetProcessLog(p.ID) | ||||
| 	log, err := rs.GetProcessLog(tid1) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	require.Equal(t, 0, len(log.History)) | ||||
| @@ -283,26 +331,28 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) { | ||||
| 	require.NotNil(t, p) | ||||
|  | ||||
| 	p.ID = "process2" | ||||
| 	err = rs.UpdateProcess("process1", p) | ||||
| 	err = rs.UpdateProcess(tid1, p) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	_, err = rs.GetProcess(p.ID) | ||||
| 	tid2 := TaskID{ID: p.ID} | ||||
|  | ||||
| 	_, err = rs.GetProcess(tid2) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(p.ID) | ||||
| 	state, _ := rs.GetProcessState(tid2) | ||||
| 	require.Equal(t, "start", state.Order) | ||||
|  | ||||
| 	require.Eventually(t, func() bool { | ||||
| 		state, _ := rs.GetProcessState(p.ID) | ||||
| 		state, _ := rs.GetProcessState(tid2) | ||||
| 		return state.State == "running" | ||||
| 	}, 10*time.Second, time.Second) | ||||
|  | ||||
| 	log, err = rs.GetProcessLog(p.ID) | ||||
| 	log, err = rs.GetProcessLog(tid2) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	require.Equal(t, 1, len(log.History)) | ||||
|  | ||||
| 	err = rs.StopProcess(p.ID) | ||||
| 	err = rs.StopProcess(tid2) | ||||
| 	require.NoError(t, err) | ||||
| } | ||||
|  | ||||
| @@ -313,51 +363,64 @@ func TestGetProcess(t *testing.T) { | ||||
| 	process1 := getDummyProcess() | ||||
| 	process1.ID = "foo_aaa_1" | ||||
| 	process1.Reference = "foo_aaa_1" | ||||
| 	tid1 := TaskID{ID: process1.ID} | ||||
| 	process2 := getDummyProcess() | ||||
| 	process2.ID = "bar_bbb_2" | ||||
| 	process2.Reference = "bar_bbb_2" | ||||
| 	tid2 := TaskID{ID: process2.ID} | ||||
| 	process3 := getDummyProcess() | ||||
| 	process3.ID = "foo_ccc_3" | ||||
| 	process3.Reference = "foo_ccc_3" | ||||
| 	tid3 := TaskID{ID: process3.ID} | ||||
| 	process4 := getDummyProcess() | ||||
| 	process4.ID = "bar_ddd_4" | ||||
| 	process4.Reference = "bar_ddd_4" | ||||
| 	tid4 := TaskID{ID: process4.ID} | ||||
|  | ||||
| 	rs.AddProcess(process1) | ||||
| 	rs.AddProcess(process2) | ||||
| 	rs.AddProcess(process3) | ||||
| 	rs.AddProcess(process4) | ||||
|  | ||||
| 	_, err = rs.GetProcess(process1.ID) | ||||
| 	_, err = rs.GetProcess(tid1) | ||||
| 	require.Equal(t, nil, err) | ||||
|  | ||||
| 	list := rs.GetProcessIDs("", "") | ||||
| 	_, err = rs.GetProcess(tid2) | ||||
| 	require.Equal(t, nil, err) | ||||
|  | ||||
| 	_, err = rs.GetProcess(tid3) | ||||
| 	require.Equal(t, nil, err) | ||||
|  | ||||
| 	_, err = rs.GetProcess(tid4) | ||||
| 	require.Equal(t, nil, err) | ||||
|  | ||||
| 	list := rs.GetProcessIDs("", "", "", "") | ||||
| 	require.Len(t, list, 4) | ||||
| 	require.ElementsMatch(t, []string{"foo_aaa_1", "bar_bbb_2", "foo_ccc_3", "bar_ddd_4"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "foo_aaa_1"}, {ID: "bar_bbb_2"}, {ID: "foo_ccc_3"}, {ID: "bar_ddd_4"}}, list) | ||||
|  | ||||
| 	list = rs.GetProcessIDs("foo_*", "") | ||||
| 	list = rs.GetProcessIDs("foo_*", "", "", "") | ||||
| 	require.Len(t, list, 2) | ||||
| 	require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "foo_aaa_1"}, {ID: "foo_ccc_3"}}, list) | ||||
|  | ||||
| 	list = rs.GetProcessIDs("bar_*", "") | ||||
| 	list = rs.GetProcessIDs("bar_*", "", "", "") | ||||
| 	require.Len(t, list, 2) | ||||
| 	require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}, {ID: "bar_ddd_4"}}, list) | ||||
|  | ||||
| 	list = rs.GetProcessIDs("*_bbb_*", "") | ||||
| 	list = rs.GetProcessIDs("*_bbb_*", "", "", "") | ||||
| 	require.Len(t, list, 1) | ||||
| 	require.ElementsMatch(t, []string{"bar_bbb_2"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}}, list) | ||||
|  | ||||
| 	list = rs.GetProcessIDs("", "foo_*") | ||||
| 	list = rs.GetProcessIDs("", "foo_*", "", "") | ||||
| 	require.Len(t, list, 2) | ||||
| 	require.ElementsMatch(t, []string{"foo_aaa_1", "foo_ccc_3"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "foo_aaa_1"}, {ID: "foo_ccc_3"}}, list) | ||||
|  | ||||
| 	list = rs.GetProcessIDs("", "bar_*") | ||||
| 	list = rs.GetProcessIDs("", "bar_*", "", "") | ||||
| 	require.Len(t, list, 2) | ||||
| 	require.ElementsMatch(t, []string{"bar_bbb_2", "bar_ddd_4"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}, {ID: "bar_ddd_4"}}, list) | ||||
|  | ||||
| 	list = rs.GetProcessIDs("", "*_bbb_*") | ||||
| 	list = rs.GetProcessIDs("", "*_bbb_*", "", "") | ||||
| 	require.Len(t, list, 1) | ||||
| 	require.ElementsMatch(t, []string{"bar_bbb_2"}, list) | ||||
| 	require.ElementsMatch(t, []TaskID{{ID: "bar_bbb_2"}}, list) | ||||
| } | ||||
|  | ||||
| func TestStartProcess(t *testing.T) { | ||||
| @@ -365,25 +428,26 @@ func TestStartProcess(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	err = rs.StartProcess("foobar") | ||||
| 	err = rs.StartProcess(TaskID{ID: "foobar"}) | ||||
| 	require.NotEqual(t, nil, err, "shouldn't be able to start non-existing process") | ||||
|  | ||||
| 	err = rs.StartProcess(process.ID) | ||||
| 	err = rs.StartProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to start existing process") | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(process.ID) | ||||
| 	state, _ := rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "start", state.Order, "Process should be started") | ||||
|  | ||||
| 	err = rs.StartProcess(process.ID) | ||||
| 	err = rs.StartProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to start already running process") | ||||
|  | ||||
| 	state, _ = rs.GetProcessState(process.ID) | ||||
| 	state, _ = rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "start", state.Order, "Process should be started") | ||||
|  | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
| } | ||||
|  | ||||
| func TestStopProcess(t *testing.T) { | ||||
| @@ -391,23 +455,24 @@ func TestStopProcess(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
| 	rs.StartProcess(process.ID) | ||||
| 	rs.StartProcess(tid) | ||||
|  | ||||
| 	err = rs.StopProcess("foobar") | ||||
| 	err = rs.StopProcess(TaskID{ID: "foobar"}) | ||||
| 	require.NotEqual(t, nil, err, "shouldn't be able to stop non-existing process") | ||||
|  | ||||
| 	err = rs.StopProcess(process.ID) | ||||
| 	err = rs.StopProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to stop existing running process") | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(process.ID) | ||||
| 	state, _ := rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "stop", state.Order, "Process should be stopped") | ||||
|  | ||||
| 	err = rs.StopProcess(process.ID) | ||||
| 	err = rs.StopProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to stop already stopped process") | ||||
|  | ||||
| 	state, _ = rs.GetProcessState(process.ID) | ||||
| 	state, _ = rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "stop", state.Order, "Process should be stopped") | ||||
| } | ||||
|  | ||||
| @@ -416,24 +481,25 @@ func TestRestartProcess(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	err = rs.RestartProcess("foobar") | ||||
| 	err = rs.RestartProcess(TaskID{ID: "foobar"}) | ||||
| 	require.NotEqual(t, nil, err, "shouldn't be able to restart non-existing process") | ||||
|  | ||||
| 	err = rs.RestartProcess(process.ID) | ||||
| 	err = rs.RestartProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to restart existing stopped process") | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(process.ID) | ||||
| 	state, _ := rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "stop", state.Order, "Process should be stopped") | ||||
|  | ||||
| 	rs.StartProcess(process.ID) | ||||
| 	rs.StartProcess(tid) | ||||
|  | ||||
| 	state, _ = rs.GetProcessState(process.ID) | ||||
| 	state, _ = rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "start", state.Order, "Process should be started") | ||||
|  | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
| } | ||||
|  | ||||
| func TestReloadProcess(t *testing.T) { | ||||
| @@ -441,30 +507,31 @@ func TestReloadProcess(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	err = rs.ReloadProcess("foobar") | ||||
| 	err = rs.ReloadProcess(TaskID{ID: "foobar"}) | ||||
| 	require.NotEqual(t, nil, err, "shouldn't be able to reload non-existing process") | ||||
|  | ||||
| 	err = rs.ReloadProcess(process.ID) | ||||
| 	err = rs.ReloadProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to reload existing stopped process") | ||||
|  | ||||
| 	state, _ := rs.GetProcessState(process.ID) | ||||
| 	state, _ := rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "stop", state.Order, "Process should be stopped") | ||||
|  | ||||
| 	rs.StartProcess(process.ID) | ||||
| 	rs.StartProcess(tid) | ||||
|  | ||||
| 	state, _ = rs.GetProcessState(process.ID) | ||||
| 	state, _ = rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "start", state.Order, "Process should be started") | ||||
|  | ||||
| 	err = rs.ReloadProcess(process.ID) | ||||
| 	err = rs.ReloadProcess(tid) | ||||
| 	require.Equal(t, nil, err, "should be able to reload existing process") | ||||
|  | ||||
| 	state, _ = rs.GetProcessState(process.ID) | ||||
| 	state, _ = rs.GetProcessState(tid) | ||||
| 	require.Equal(t, "start", state.Order, "Process should be started") | ||||
|  | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
| } | ||||
|  | ||||
| func TestParseProcessPattern(t *testing.T) { | ||||
| @@ -474,14 +541,16 @@ func TestParseProcessPattern(t *testing.T) { | ||||
| 	process := getDummyProcess() | ||||
| 	process.LogPatterns = []string{"libx264"} | ||||
|  | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
| 	rs.StartProcess(process.ID) | ||||
| 	rs.StartProcess(tid) | ||||
|  | ||||
| 	time.Sleep(3 * time.Second) | ||||
|  | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
|  | ||||
| 	log, err := rs.GetProcessLog(process.ID) | ||||
| 	log, err := rs.GetProcessLog(tid) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	require.Equal(t, 1, len(log.History)) | ||||
| @@ -493,10 +562,11 @@ func TestProbeProcess(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	probe := rs.ProbeWithTimeout(process.ID, 5*time.Second) | ||||
| 	probe := rs.ProbeWithTimeout(tid, 5*time.Second) | ||||
|  | ||||
| 	require.Equal(t, 3, len(probe.Streams)) | ||||
| } | ||||
| @@ -506,15 +576,19 @@ func TestProcessMetadata(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	data, _ := rs.GetProcessMetadata(process.ID, "foobar") | ||||
| 	data, err := rs.GetProcessMetadata(tid, "foobar") | ||||
| 	require.Equal(t, ErrMetadataKeyNotFound, err) | ||||
| 	require.Equal(t, nil, data, "nothing should be stored under the key") | ||||
|  | ||||
| 	rs.SetProcessMetadata(process.ID, "foobar", process) | ||||
| 	err = rs.SetProcessMetadata(tid, "foobar", process) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	data, _ = rs.GetProcessMetadata(process.ID, "foobar") | ||||
| 	data, err = rs.GetProcessMetadata(tid, "foobar") | ||||
| 	require.NoError(t, err) | ||||
| 	require.NotEqual(t, nil, data, "there should be something stored under the key") | ||||
|  | ||||
| 	p := data.(*app.Config) | ||||
| @@ -527,33 +601,34 @@ func TestLog(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	_, err = rs.GetProcessLog("foobar") | ||||
| 	_, err = rs.GetProcessLog(TaskID{ID: "foobar"}) | ||||
| 	require.Error(t, err) | ||||
|  | ||||
| 	log, err := rs.GetProcessLog(process.ID) | ||||
| 	log, err := rs.GetProcessLog(tid) | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, 0, len(log.Prelude)) | ||||
| 	require.Equal(t, 0, len(log.Log)) | ||||
| 	require.Equal(t, 0, len(log.Matches)) | ||||
| 	require.Equal(t, 0, len(log.History)) | ||||
|  | ||||
| 	rs.StartProcess(process.ID) | ||||
| 	rs.StartProcess(tid) | ||||
|  | ||||
| 	time.Sleep(3 * time.Second) | ||||
|  | ||||
| 	log, _ = rs.GetProcessLog(process.ID) | ||||
| 	log, _ = rs.GetProcessLog(tid) | ||||
|  | ||||
| 	require.NotEqual(t, 0, len(log.Prelude)) | ||||
| 	require.NotEqual(t, 0, len(log.Log)) | ||||
| 	require.Equal(t, 0, len(log.Matches)) | ||||
| 	require.Equal(t, 0, len(log.History)) | ||||
|  | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
|  | ||||
| 	log, _ = rs.GetProcessLog(process.ID) | ||||
| 	log, _ = rs.GetProcessLog(tid) | ||||
|  | ||||
| 	require.Equal(t, 0, len(log.Prelude)) | ||||
| 	require.Equal(t, 0, len(log.Log)) | ||||
| @@ -565,22 +640,23 @@ func TestLogTransfer(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	err = rs.AddProcess(process) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	rs.StartProcess(process.ID) | ||||
| 	rs.StartProcess(tid) | ||||
| 	time.Sleep(3 * time.Second) | ||||
| 	rs.StopProcess(process.ID) | ||||
| 	rs.StopProcess(tid) | ||||
|  | ||||
| 	log, _ := rs.GetProcessLog(process.ID) | ||||
| 	log, _ := rs.GetProcessLog(tid) | ||||
|  | ||||
| 	require.Equal(t, 1, len(log.History)) | ||||
|  | ||||
| 	err = rs.UpdateProcess(process.ID, process) | ||||
| 	err = rs.UpdateProcess(tid, process) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	log, _ = rs.GetProcessLog(process.ID) | ||||
| 	log, _ = rs.GetProcessLog(tid) | ||||
|  | ||||
| 	require.Equal(t, 1, len(log.History)) | ||||
| } | ||||
| @@ -590,18 +666,19 @@ func TestPlayoutNoRange(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	process.Input[0].Address = "playout:" + process.Input[0].Address | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	_, err = rs.GetPlayout("foobar", process.Input[0].ID) | ||||
| 	require.NotEqual(t, nil, err, "playout of non-existing process should error") | ||||
| 	_, err = rs.GetPlayout(TaskID{ID: "foobar"}, process.Input[0].ID) | ||||
| 	require.Equal(t, ErrUnknownProcess, err) | ||||
|  | ||||
| 	_, err = rs.GetPlayout(process.ID, "foobar") | ||||
| 	_, err = rs.GetPlayout(tid, "foobar") | ||||
| 	require.NotEqual(t, nil, err, "playout of non-existing input should error") | ||||
|  | ||||
| 	addr, _ := rs.GetPlayout(process.ID, process.Input[0].ID) | ||||
| 	addr, _ := rs.GetPlayout(tid, process.Input[0].ID) | ||||
| 	require.Equal(t, 0, len(addr), "the playout address should be empty if no port range is given") | ||||
| } | ||||
|  | ||||
| @@ -613,23 +690,57 @@ func TestPlayoutRange(t *testing.T) { | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process := getDummyProcess() | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	process.Input[0].Address = "playout:" + process.Input[0].Address | ||||
|  | ||||
| 	rs.AddProcess(process) | ||||
|  | ||||
| 	_, err = rs.GetPlayout("foobar", process.Input[0].ID) | ||||
| 	require.NotEqual(t, nil, err, "playout of non-existing process should error") | ||||
| 	_, err = rs.GetPlayout(TaskID{ID: "foobar"}, process.Input[0].ID) | ||||
| 	require.Equal(t, ErrUnknownProcess, err) | ||||
|  | ||||
| 	_, err = rs.GetPlayout(process.ID, "foobar") | ||||
| 	_, err = rs.GetPlayout(tid, "foobar") | ||||
| 	require.NotEqual(t, nil, err, "playout of non-existing input should error") | ||||
|  | ||||
| 	addr, err := rs.GetPlayout(process.ID, process.Input[0].ID) | ||||
| 	addr, err := rs.GetPlayout(tid, process.Input[0].ID) | ||||
| 	require.NoError(t, err) | ||||
| 	require.NotEqual(t, 0, len(addr), "the playout address should not be empty if a port range is given") | ||||
| 	require.Equal(t, "127.0.0.1:3000", addr, "the playout address should be 127.0.0.1:3000") | ||||
| } | ||||
|  | ||||
| func TestParseAddressReference(t *testing.T) { | ||||
| 	matches, err := parseAddressReference("foobar") | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, "foobar", matches["address"]) | ||||
|  | ||||
| 	_, err = parseAddressReference("#foobar") | ||||
| 	require.Error(t, err) | ||||
|  | ||||
| 	_, err = parseAddressReference("#foobar:nothing=foo") | ||||
| 	require.Error(t, err) | ||||
|  | ||||
| 	matches, err = parseAddressReference("#foobar:output=foo") | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, "foobar", matches["id"]) | ||||
| 	require.Equal(t, "foo", matches["output"]) | ||||
|  | ||||
| 	matches, err = parseAddressReference("#foobar:domain=foo") | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, "foobar", matches["id"]) | ||||
| 	require.Equal(t, "foo", matches["domain"]) | ||||
|  | ||||
| 	matches, err = parseAddressReference("#foobar:nothing=foo:output=bar") | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, "foobar:nothing=foo", matches["id"]) | ||||
| 	require.Equal(t, "bar", matches["output"]) | ||||
|  | ||||
| 	matches, err = parseAddressReference("#foobar:output=foo:domain=bar") | ||||
| 	require.NoError(t, err) | ||||
| 	require.Equal(t, "foobar", matches["id"]) | ||||
| 	require.Equal(t, "foo", matches["output"]) | ||||
| 	require.Equal(t, "bar", matches["domain"]) | ||||
| } | ||||
|  | ||||
| func TestAddressReference(t *testing.T) { | ||||
| 	rs, err := getDummyRestreamer(nil, nil, nil, nil) | ||||
| 	require.NoError(t, err) | ||||
| @@ -637,10 +748,9 @@ func TestAddressReference(t *testing.T) { | ||||
| 	process1 := getDummyProcess() | ||||
| 	process2 := getDummyProcess() | ||||
|  | ||||
| 	process2.ID = "process2" | ||||
|  | ||||
| 	rs.AddProcess(process1) | ||||
|  | ||||
| 	process2.ID = "process2" | ||||
| 	process2.Input[0].Address = "#process:foobar=out" | ||||
|  | ||||
| 	err = rs.AddProcess(process2) | ||||
| @@ -662,6 +772,44 @@ func TestAddressReference(t *testing.T) { | ||||
| 	require.Equal(t, nil, err, "should resolve reference") | ||||
| } | ||||
|  | ||||
| func TestTeeAddressReference(t *testing.T) { | ||||
| 	rs, err := getDummyRestreamer(nil, nil, nil, nil) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	process1 := getDummyProcess() | ||||
| 	process2 := getDummyProcess() | ||||
| 	process3 := getDummyProcess() | ||||
| 	process4 := getDummyProcess() | ||||
|  | ||||
| 	process1.Output[0].Address = "[f=hls]http://example.com/live.m3u8|[f=flv]rtmp://example.com/live.stream?token=123" | ||||
| 	process2.ID = "process2" | ||||
| 	process3.ID = "process3" | ||||
| 	process4.ID = "process4" | ||||
|  | ||||
| 	rs.AddProcess(process1) | ||||
|  | ||||
| 	process2.Input[0].Address = "#process:output=out" | ||||
|  | ||||
| 	err = rs.AddProcess(process2) | ||||
| 	require.Equal(t, nil, err, "should resolve reference") | ||||
|  | ||||
| 	process3.Input[0].Address = "#process:output=out:source=hls" | ||||
|  | ||||
| 	err = rs.AddProcess(process3) | ||||
| 	require.Equal(t, nil, err, "should resolve reference") | ||||
|  | ||||
| 	process4.Input[0].Address = "#process:output=out:source=rtmp" | ||||
|  | ||||
| 	err = rs.AddProcess(process4) | ||||
| 	require.Equal(t, nil, err, "should resolve reference") | ||||
|  | ||||
| 	r := rs.(*restream) | ||||
|  | ||||
| 	require.Equal(t, "http://example.com/live.m3u8", r.tasks[TaskID{ID: "process2"}].config.Input[0].Address) | ||||
| 	require.Equal(t, "http://example.com/live.m3u8", r.tasks[TaskID{ID: "process3"}].config.Input[0].Address) | ||||
| 	require.Equal(t, "rtmp://example.com/live.stream?token=123", r.tasks[TaskID{ID: "process4"}].config.Input[0].Address) | ||||
| } | ||||
|  | ||||
| func TestConfigValidation(t *testing.T) { | ||||
| 	rsi, err := getDummyRestreamer(nil, nil, nil, nil) | ||||
| 	require.NoError(t, err) | ||||
| @@ -1237,7 +1385,10 @@ func TestProcessReplacer(t *testing.T) { | ||||
| 		LogPatterns:    []string{}, | ||||
| 	} | ||||
|  | ||||
| 	require.Equal(t, process, rs.tasks["314159265359"].config) | ||||
| 	task, ok := rs.tasks[TaskID{ID: "314159265359"}] | ||||
| 	require.True(t, ok) | ||||
|  | ||||
| 	require.Equal(t, process, task.config) | ||||
| } | ||||
|  | ||||
| func TestProcessLogPattern(t *testing.T) { | ||||
| @@ -1251,21 +1402,23 @@ func TestProcessLogPattern(t *testing.T) { | ||||
| 	process.Autostart = false | ||||
| 	process.Reconnect = true | ||||
|  | ||||
| 	tid := TaskID{ID: process.ID} | ||||
|  | ||||
| 	err = rs.AddProcess(process) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	err = rs.StartProcess("process") | ||||
| 	err = rs.StartProcess(tid) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	time.Sleep(5 * time.Second) | ||||
|  | ||||
| 	log, err := rs.GetProcessLog("process") | ||||
| 	log, err := rs.GetProcessLog(tid) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	require.Equal(t, 1, len(log.Matches)) | ||||
| 	require.Equal(t, "[libx264 @ 0x7fa96a800600] using cpu capabilities: MMX2 SSE2Fast SSSE3 SSE4.2 AVX FMA3 BMI2 AVX2", log.Matches[0]) | ||||
|  | ||||
| 	err = rs.StopProcess("process") | ||||
| 	err = rs.StopProcess(tid) | ||||
| 	require.NoError(t, err) | ||||
| } | ||||
|  | ||||
| @@ -1283,7 +1436,7 @@ func TestProcessLimit(t *testing.T) { | ||||
|  | ||||
| 	rs := rsi.(*restream) | ||||
|  | ||||
| 	task, ok := rs.tasks[process.ID] | ||||
| 	task, ok := rs.tasks[TaskID{ID: process.ID}] | ||||
| 	require.True(t, ok) | ||||
|  | ||||
| 	status := task.ffmpeg.Status() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Ingo Oppermann
					Ingo Oppermann