From f59a63320c687002d6907f0b64ec02d5fa70a75e Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 1 Jun 2023 21:24:33 +0200 Subject: [PATCH] Allow to pass metadata with process config, non-cluster only --- http/api/process.go | 37 +++---- http/handler/api/cluster.go | 4 +- http/handler/api/restream.go | 12 ++- http/handler/api/restream_test.go | 169 +++++++++++++++++++++++++++--- http/handler/api/widget_test.go | 4 +- restream/restream.go | 5 + restream/restream_test.go | 35 +++++++ 7 files changed, 228 insertions(+), 38 deletions(-) diff --git a/http/api/process.go b/http/api/process.go index 107dbfbf..9bb4dfdf 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -44,26 +44,27 @@ type ProcessConfigLimits struct { // ProcessConfig represents the configuration of an ffmpeg process type ProcessConfig struct { - ID string `json:"id"` - Owner string `json:"owner"` - Domain string `json:"domain"` - Type string `json:"type" validate:"oneof='ffmpeg' ''" jsonschema:"enum=ffmpeg,enum="` - Reference string `json:"reference"` - Input []ProcessConfigIO `json:"input" validate:"required"` - Output []ProcessConfigIO `json:"output" validate:"required"` - Options []string `json:"options"` - Reconnect bool `json:"reconnect"` - ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"` - Autostart bool `json:"autostart"` - StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` - Timeout uint64 `json:"runtime_duration_seconds" format:"uint64"` - Scheduler string `json:"scheduler"` - LogPatterns []string `json:"log_patterns"` - Limits ProcessConfigLimits `json:"limits"` + ID string `json:"id"` + Owner string `json:"owner"` + Domain string `json:"domain"` + Type string `json:"type" validate:"oneof='ffmpeg' ''" jsonschema:"enum=ffmpeg,enum="` + Reference string `json:"reference"` + Input []ProcessConfigIO `json:"input" validate:"required"` + Output []ProcessConfigIO `json:"output" validate:"required"` + Options []string `json:"options"` + Reconnect bool `json:"reconnect"` + ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"` + Autostart bool `json:"autostart"` + StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` + Timeout uint64 `json:"runtime_duration_seconds" format:"uint64"` + Scheduler string `json:"scheduler"` + LogPatterns []string `json:"log_patterns"` + Limits ProcessConfigLimits `json:"limits"` + Metadata map[string]interface{} `json:"metadata,omitempty"` } // Marshal converts a process config in API representation to a restreamer process config -func (cfg *ProcessConfig) Marshal() *app.Config { +func (cfg *ProcessConfig) Marshal() (*app.Config, map[string]interface{}) { p := &app.Config{ ID: cfg.ID, Owner: cfg.Owner, @@ -116,7 +117,7 @@ func (cfg *ProcessConfig) Marshal() *app.Config { p.LogPatterns = make([]string, len(cfg.LogPatterns)) copy(p.LogPatterns, cfg.LogPatterns) - return p + return p, cfg.Metadata } func (cfg *ProcessConfig) generateInputOutputIDs(ioconfig []ProcessConfigIO) { diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 92eff58d..6a756a21 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -404,7 +404,7 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error { return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") } - config := process.Marshal() + config, _ := process.Marshal() if err := h.cluster.AddProcess("", config); err != nil { return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) @@ -468,7 +468,7 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error { } } - config := process.Marshal() + config, _ := process.Marshal() if err := h.cluster.UpdateProcess("", id, config); err != nil { if err == restream.ErrUnknownProcess { diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 7dd4ab97..50a87e93 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -76,7 +76,7 @@ func (h *RestreamHandler) Add(c echo.Context) error { return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") } - config := process.Marshal() + config, metadata := process.Marshal() if err := h.restream.AddProcess(config); err != nil { return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) @@ -87,6 +87,10 @@ func (h *RestreamHandler) Add(c echo.Context) error { Domain: config.Domain, } + for key, data := range metadata { + h.restream.SetProcessMetadata(tid, key, data) + } + p, _ := h.getProcess(tid, "config") return c.JSON(http.StatusOK, p.Config) @@ -297,7 +301,7 @@ func (h *RestreamHandler) Update(c echo.Context) error { } } - config := process.Marshal() + config, metadata := process.Marshal() tid = restream.TaskID{ ID: id, @@ -317,6 +321,10 @@ func (h *RestreamHandler) Update(c echo.Context) error { Domain: config.Domain, } + for key, data := range metadata { + h.restream.SetProcessMetadata(tid, key, data) + } + p, _ := h.getProcess(tid, "config") return c.JSON(http.StatusOK, p.Config) diff --git a/http/handler/api/restream_test.go b/http/handler/api/restream_test.go index e6cac35a..e3202bf2 100644 --- a/http/handler/api/restream_test.go +++ b/http/handler/api/restream_test.go @@ -129,6 +129,39 @@ func TestAddProcess(t *testing.T) { mock.Validate(t, &api.ProcessConfig{}, response.Data) } +func TestAddProcessWithMetadata(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := bytes.Buffer{} + _, err = data.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + process := api.ProcessConfig{} + err = json.Unmarshal(data.Bytes(), &process) + require.NoError(t, err) + + process.Metadata = map[string]interface{}{ + "foo": "bar", + } + + encoded, err := json.Marshal(&process) + require.NoError(t, err) + + data.Reset() + _, err = data.Write(encoded) + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "POST", "/", &data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + response = mock.Request(t, http.StatusOK, router, "GET", "/"+process.ID+"/metadata", nil) + require.Equal(t, map[string]interface{}{ + "foo": "bar", + }, response.Data) +} + func TestUpdateProcessInvalid(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) @@ -171,28 +204,73 @@ func TestUpdateReplaceProcess(t *testing.T) { mock.Validate(t, &api.ProcessConfig{}, response.Data) - update := bytes.Buffer{} - _, err = update.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) - require.NoError(t, err) + data = mock.Read(t, "./fixtures/addProcess.json") - proc := api.ProcessConfig{} - err = json.Unmarshal(update.Bytes(), &proc) - require.NoError(t, err) - - encoded, err := json.Marshal(&proc) - require.NoError(t, err) - - update.Reset() - _, err = update.Write(encoded) - require.NoError(t, err) - - response = mock.Request(t, http.StatusOK, router, "PUT", "/test", &update) + response = mock.Request(t, http.StatusOK, router, "PUT", "/test", data) mock.Validate(t, &api.ProcessConfig{}, response.Data) mock.Request(t, http.StatusOK, router, "GET", "/test", nil) } +func TestUpdateReplaceProcessWithMetadata(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := bytes.Buffer{} + _, err = data.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + process := api.ProcessConfig{} + err = json.Unmarshal(data.Bytes(), &process) + require.NoError(t, err) + + process.Metadata = map[string]interface{}{ + "foo": "bar", + } + + encoded, err := json.Marshal(&process) + require.NoError(t, err) + + data.Reset() + _, err = data.Write(encoded) + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "POST", "/", &data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + data.Reset() + _, err = data.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + process = api.ProcessConfig{} + err = json.Unmarshal(data.Bytes(), &process) + require.NoError(t, err) + + process.Metadata = map[string]interface{}{ + "foo": "baz", + } + + encoded, err = json.Marshal(&process) + require.NoError(t, err) + + data.Reset() + _, err = data.Write(encoded) + require.NoError(t, err) + + response = mock.Request(t, http.StatusOK, router, "PUT", "/test", &data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + response = mock.Request(t, http.StatusOK, router, "GET", "/"+process.ID+"/metadata", nil) + require.Equal(t, map[string]interface{}{ + "foo": "baz", + }, response.Data) +} + func TestUpdateNewProcess(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) @@ -228,6 +306,67 @@ func TestUpdateNewProcess(t *testing.T) { mock.Request(t, http.StatusOK, router, "GET", "/test2", nil) } +func TestUpdateNewProcessWithMetadata(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := bytes.Buffer{} + _, err = data.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + process := api.ProcessConfig{} + err = json.Unmarshal(data.Bytes(), &process) + require.NoError(t, err) + + process.Metadata = map[string]interface{}{ + "foo": "bar", + } + + encoded, err := json.Marshal(&process) + require.NoError(t, err) + + data.Reset() + _, err = data.Write(encoded) + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "POST", "/", &data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + data.Reset() + _, err = data.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + process = api.ProcessConfig{} + err = json.Unmarshal(data.Bytes(), &process) + require.NoError(t, err) + + process.ID = "test2" + process.Metadata = map[string]interface{}{ + "bar": "foo", + } + + encoded, err = json.Marshal(&process) + require.NoError(t, err) + + data.Reset() + _, err = data.Write(encoded) + require.NoError(t, err) + + response = mock.Request(t, http.StatusOK, router, "PUT", "/test", &data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + mock.Request(t, http.StatusNotFound, router, "GET", "/test", nil) + mock.Request(t, http.StatusOK, router, "GET", "/test2", nil) + + response = mock.Request(t, http.StatusOK, router, "GET", "/"+process.ID+"/metadata", nil) + require.Equal(t, map[string]interface{}{ + "foo": "bar", + "bar": "foo", + }, response.Data) +} + func TestUpdateNonExistentProcess(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) diff --git a/http/handler/api/widget_test.go b/http/handler/api/widget_test.go index 83c5ba5e..69505872 100644 --- a/http/handler/api/widget_test.go +++ b/http/handler/api/widget_test.go @@ -50,7 +50,9 @@ func TestWidget(t *testing.T) { err = json.Unmarshal(data, &process) require.NoError(t, err) - err = rs.AddProcess(process.Marshal()) + config, _ := process.Marshal() + + err = rs.AddProcess(config) require.NoError(t, err) response := mock.Request(t, http.StatusOK, router, "GET", "/"+process.ID, nil) diff --git a/restream/restream.go b/restream/restream.go index ff745888..40284a43 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1220,8 +1220,13 @@ func (r *restream) UpdateProcess(id TaskID, config *app.Config) error { // This would require a major version jump //t.process.CreatedAt = task.process.CreatedAt t.process.UpdatedAt = time.Now().Unix() + + // Transfer the report history to the new process task.parser.TransferReportHistory(t.parser) + // Transfer the metadata to the new process + t.metadata = task.metadata + r.tasks[tid] = t // set filesystem cleanup rules diff --git a/restream/restream_test.go b/restream/restream_test.go index d36f8efe..1d8ddcc5 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -385,6 +385,41 @@ func TestUpdateProcessLogHistoryTransfer(t *testing.T) { require.NoError(t, err) } +func TestUpdateProcessMetadataTransfer(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + p := getDummyProcess() + require.NotNil(t, p) + p.ID = "process1" + + tid1 := TaskID{ID: p.ID} + + err = rs.AddProcess(p) + require.Equal(t, nil, err) + + err = rs.SetProcessMetadata(tid1, "foo", "bar") + require.Equal(t, nil, err) + + p = getDummyProcess() + require.NotNil(t, p) + + p.ID = "process2" + err = rs.UpdateProcess(tid1, p) + require.NoError(t, err) + + tid2 := TaskID{ID: p.ID} + + _, err = rs.GetProcess(tid2) + require.NoError(t, err) + + metadata, err := rs.GetProcessMetadata(tid2, "") + require.NoError(t, err) + require.Equal(t, map[string]interface{}{ + "foo": "bar", + }, metadata) +} + func TestGetProcess(t *testing.T) { rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err)