diff --git a/CHANGELOG.md b/CHANGELOG.md index c4d0803f..04f30c88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ #### Core v16.8.0 > ? +- Fix updating process config - Add experimental SRT connection stats and logs - Hide /config/reload endpoint in reade-only mode - Add SRT server (datarhei/gosrt) diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index 5a16ad8e..a72fb4c8 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -60,7 +60,9 @@ func (h *RestreamHandler) Add(c echo.Context) error { return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) } - return c.JSON(http.StatusOK, process) + p, _ := h.getProcess(config.ID, "config") + + return c.JSON(http.StatusOK, p.Config) } // GetAll returns all known processes @@ -182,40 +184,15 @@ func (h *RestreamHandler) Update(c echo.Context) error { return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - if process.Type != "ffmpeg" { - return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg") - } - - if len(process.Input) == 0 && len(process.Output) == 0 { - return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") - } - config := process.Marshal() - fstate, err := h.restream.GetProcessState(id) - if err != nil { - return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) + if err := h.restream.UpdateProcess(id, config); err != nil { + return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) } - order := fstate.Order + p, _ := h.getProcess(config.ID, "config") - if err := h.restream.StopProcess(id); err != nil { - return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) - } - - if err := h.restream.DeleteProcess(id); err != nil { - return api.Err(http.StatusBadRequest, "Process can't be deleted", "%s", err) - } - - if err := h.restream.AddProcess(config); err != nil { - return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err) - } - - if order == "start" { - h.restream.StartProcess(process.ID) - } - - return c.JSON(http.StatusOK, process) + return c.JSON(http.StatusOK, p.Config) } // Command issues a command to a process diff --git a/http/handler/api/restream_test.go b/http/handler/api/restream_test.go index 5d7ac3d9..713a4f10 100644 --- a/http/handler/api/restream_test.go +++ b/http/handler/api/restream_test.go @@ -1,6 +1,8 @@ package api import ( + "bytes" + "encoding/json" "net/http" "testing" @@ -76,6 +78,74 @@ func TestAddProcess(t *testing.T) { mock.Validate(t, &api.ProcessConfig{}, response.Data) } +func TestUpdateProcessInvalid(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + response := mock.Request(t, http.StatusOK, router, "POST", "/", data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + update := bytes.Buffer{} + _, err = update.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + proc := api.ProcessConfig{} + err = json.Unmarshal(update.Bytes(), &proc) + require.NoError(t, err) + + // invalid address + proc.Output[0].Address = "" + + encoded, err := json.Marshal(&proc) + require.NoError(t, err) + + update.Reset() + _, err = update.Write(encoded) + require.NoError(t, err) + + mock.Request(t, http.StatusBadRequest, router, "PUT", "/"+proc.ID, &update) + mock.Request(t, http.StatusOK, router, "GET", "/"+proc.ID, nil) +} + +func TestUpdateProcess(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + response := mock.Request(t, http.StatusOK, router, "POST", "/", data) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + update := bytes.Buffer{} + _, err = update.ReadFrom(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + proc := api.ProcessConfig{} + err = json.Unmarshal(update.Bytes(), &proc) + require.NoError(t, err) + + // invalid address + proc.ID = "test2" + + encoded, err := json.Marshal(&proc) + require.NoError(t, err) + + update.Reset() + _, err = update.Write(encoded) + require.NoError(t, err) + + response = mock.Request(t, http.StatusOK, router, "PUT", "/test", &update) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) + + mock.Request(t, http.StatusNotFound, router, "GET", "/test", nil) + mock.Request(t, http.StatusOK, router, "GET", "/test2", nil) +} + func TestRemoveUnknownProcess(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) diff --git a/restream/restream.go b/restream/restream.go index a31c009b..4c749ea3 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -33,6 +33,7 @@ type Restreamer interface { AddProcess(config *app.Config) error // add a new process GetProcessIDs() []string // get a list of all process IDs DeleteProcess(id string) error // delete a process + UpdateProcess(id string, config *app.Config) error // update a process StartProcess(id string) error // start a process StopProcess(id string) error // stop a process RestartProcess(id string) error // restart a process @@ -823,6 +824,51 @@ func (r *restream) resolveAddress(tasks map[string]*task, id, address string) (s return address, fmt.Errorf("the process '%s' has no outputs with the ID '%s' (%s)", matches[1], matches[2], address) } +func (r *restream) UpdateProcess(id string, config *app.Config) error { + r.lock.Lock() + defer r.lock.Unlock() + + t, err := r.createTask(config) + if err != nil { + return err + } + + task, ok := r.tasks[id] + if !ok { + return fmt.Errorf("unknown process ID (%s)", id) + } + + t.process.Order = task.process.Order + + if id != t.id { + _, ok := r.tasks[t.id] + if ok { + return fmt.Errorf("the process ID '%s' already exists", t.id) + } + } + + if err := r.stopProcess(id); err != nil { + return err + } + + if err := r.deleteProcess(id); err != nil { + return err + } + + r.tasks[t.id] = t + + // set filesystem cleanup rules + r.setCleanup(t.id, t.config) + + if t.process.Order == "start" { + r.startProcess(t.id) + } + + r.save() + + return nil +} + func (r *restream) GetProcessIDs() []string { r.lock.RLock() defer r.lock.RUnlock()