From 879819f10f60a9c3bf736cdc9bb7c20b890bb4f4 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 22 Jul 2024 16:58:57 +0200 Subject: [PATCH] Retrieve current process from leader, clone metadata, introduce new state 'deploying' --- cluster/api.go | 40 ++++++ cluster/client/client.go | 6 + cluster/client/{proces.go => process.go} | 17 +++ cluster/cluster.go | 1 + cluster/forwarder/process.go | 15 +++ cluster/leader_rebalance.go | 2 +- cluster/leader_relocate.go | 2 +- cluster/leader_synchronize.go | 2 +- cluster/process.go | 17 ++- cluster/store/process.go | 19 ++- cluster/store/process_test.go | 32 ++--- cluster/store/store.go | 2 +- http/api/process.go | 111 ++++++++++++++++ http/handler/api/cluster_process.go | 152 +++------------------- http/handler/api/cluster_store.go | 8 +- http/handler/api/fixtures/addProcess.json | 3 +- http/handler/api/process.go | 36 ++--- restream/app/process.go | 2 +- restream/task.go | 7 +- 19 files changed, 282 insertions(+), 192 deletions(-) rename cluster/client/{proces.go => process.go} (79%) diff --git a/cluster/api.go b/cluster/api.go index 4a2328b4..bd3c5ca6 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -113,6 +113,7 @@ func NewAPI(config APIConfig) (API, error) { a.router.GET("/v1/snaphot", a.Snapshot) a.router.POST("/v1/process", a.ProcessAdd) + a.router.GET("/v1/process/:id", a.ProcessGet) a.router.DELETE("/v1/process/:id", a.ProcessRemove) a.router.PUT("/v1/process/:id", a.ProcessUpdate) a.router.PUT("/v1/process/:id/command", a.ProcessSetCommand) @@ -387,6 +388,45 @@ func (a *api) ProcessAdd(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } +// ProcessGet gets a process from the cluster DB +// @Summary Get a process +// @Description Get a process from the cluster DB +// @Tags v1.0.0 +// @ID cluster-1-get-process +// @Produce json +// @Param id path string true "Process ID" +// @Param domain query string false "Domain to act on" +// @Param X-Cluster-Origin header string false "Origin ID of request" +// @Success 200 {string} string +// @Failure 404 {object} Error +// @Failure 500 {object} Error +// @Failure 508 {object} Error +// @Router /v1/process/{id} [get] +func (a *api) ProcessGet(c echo.Context) error { + id := util.PathParam(c, "id") + domain := util.DefaultQuery(c, "domain", "") + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + pid := app.ProcessID{ID: id, Domain: domain} + + process, nodeid, err := a.cluster.Store().ProcessGet(pid) + if err != nil { + return ErrFromClusterError(err) + } + + res := client.GetProcessResponse{ + Process: process, + NodeID: nodeid, + } + + return c.JSON(http.StatusOK, res) +} + // ProcessRemove removes a process from the cluster DB // @Summary Remove a process // @Description Remove a process from the cluster DB diff --git a/cluster/client/client.go b/cluster/client/client.go index a5df0f49..c5b1b98f 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/config" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/ffmpeg/skills" @@ -26,6 +27,11 @@ type AddProcessRequest struct { Config app.Config `json:"config"` } +type GetProcessResponse struct { + Process store.Process `json:"process"` + NodeID string `json:"nodeid"` +} + type UpdateProcessRequest struct { Config app.Config `json:"config"` } diff --git a/cluster/client/proces.go b/cluster/client/process.go similarity index 79% rename from cluster/client/proces.go rename to cluster/client/process.go index aee4dd14..b76429cc 100644 --- a/cluster/client/proces.go +++ b/cluster/client/process.go @@ -5,6 +5,7 @@ import ( "net/http" "net/url" + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/restream/app" ) @@ -20,6 +21,22 @@ func (c *APIClient) ProcessAdd(origin string, r AddProcessRequest) error { return err } +func (c APIClient) ProcessGet(origin string, id app.ProcessID) (store.Process, string, error) { + res := GetProcessResponse{} + + data, err := c.call(http.MethodGet, "/v1/process/"+url.PathEscape(id.ID)+"?domain="+url.QueryEscape(id.Domain), "application/json", nil, origin) + if err != nil { + return store.Process{}, "", err + } + + err = json.Unmarshal(data, &res) + if err != nil { + return store.Process{}, "", err + } + + return res.Process, res.NodeID, nil +} + func (c *APIClient) ProcessRemove(origin string, id app.ProcessID) error { _, err := c.call(http.MethodDelete, "/v1/process/"+url.PathEscape(id.ID)+"?domain="+url.QueryEscape(id.Domain), "application/json", nil, origin) diff --git a/cluster/cluster.go b/cluster/cluster.go index b0b11fdd..eeb825ce 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -58,6 +58,7 @@ type Cluster interface { HasRaftLeader() bool ProcessAdd(origin string, config *app.Config) error + ProcessGet(origin string, id app.ProcessID, stale bool) (store.Process, string, error) ProcessRemove(origin string, id app.ProcessID) error ProcessUpdate(origin string, id app.ProcessID, config *app.Config) error ProcessSetCommand(origin string, id app.ProcessID, order string) error diff --git a/cluster/forwarder/process.go b/cluster/forwarder/process.go index 12a36392..acf6b997 100644 --- a/cluster/forwarder/process.go +++ b/cluster/forwarder/process.go @@ -2,6 +2,7 @@ package forwarder import ( apiclient "github.com/datarhei/core/v16/cluster/client" + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/restream/app" ) @@ -21,6 +22,20 @@ func (f *Forwarder) ProcessAdd(origin string, config *app.Config) error { return reconstructError(client.ProcessAdd(origin, r)) } +func (f *Forwarder) ProcessGet(origin string, id app.ProcessID) (store.Process, string, error) { + if origin == "" { + origin = f.ID + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + process, nodeid, err := client.ProcessGet(origin, id) + + return process, nodeid, reconstructError(err) +} + func (f *Forwarder) ProcessUpdate(origin string, id app.ProcessID, config *app.Config) error { if origin == "" { origin = f.ID diff --git a/cluster/leader_rebalance.go b/cluster/leader_rebalance.go index 6080777e..c583f1ac 100644 --- a/cluster/leader_rebalance.go +++ b/cluster/leader_rebalance.go @@ -41,7 +41,7 @@ func (c *cluster) doRebalance(emergency bool, term uint64) { for _, e := range errors { // Only apply the command if the error is different. - process, err := c.store.ProcessGet(e.processid) + process, _, err := c.store.ProcessGet(e.processid) if err != nil { continue } diff --git a/cluster/leader_relocate.go b/cluster/leader_relocate.go index 99493213..285a087f 100644 --- a/cluster/leader_relocate.go +++ b/cluster/leader_relocate.go @@ -43,7 +43,7 @@ func (c *cluster) doRelocate(emergency bool, term uint64) { for _, e := range errors { // Only apply the command if the error is different. - process, err := c.store.ProcessGet(e.processid) + process, _, err := c.store.ProcessGet(e.processid) if err != nil { continue } diff --git a/cluster/leader_synchronize.go b/cluster/leader_synchronize.go index 0ddd5ac4..a043d823 100644 --- a/cluster/leader_synchronize.go +++ b/cluster/leader_synchronize.go @@ -55,7 +55,7 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) { if !emergency { for _, e := range errors { // Only apply the command if the error is different. - process, err := c.store.ProcessGet(e.processid) + process, _, err := c.store.ProcessGet(e.processid) if err != nil { continue } diff --git a/cluster/process.go b/cluster/process.go index e7d3c5f4..c67f0de2 100644 --- a/cluster/process.go +++ b/cluster/process.go @@ -22,6 +22,21 @@ func (c *cluster) ProcessAdd(origin string, config *app.Config) error { return c.applyCommand(cmd) } +func (c *cluster) ProcessGet(origin string, id app.ProcessID, stale bool) (store.Process, string, error) { + if !stale { + if !c.IsRaftLeader() { + return c.forwarder.ProcessGet(origin, id) + } + } + + process, nodeid, err := c.store.ProcessGet(id) + if err != nil { + return store.Process{}, "", err + } + + return process, nodeid, nil +} + func (c *cluster) ProcessRemove(origin string, id app.ProcessID) error { if !c.IsRaftLeader() { return c.forwarder.ProcessRemove(origin, id) @@ -111,7 +126,7 @@ func (c *cluster) ProcessSetMetadata(origin string, id app.ProcessID, key string } func (c *cluster) ProcessGetMetadata(origin string, id app.ProcessID, key string) (interface{}, error) { - p, err := c.store.ProcessGet(id) + p, _, err := c.store.ProcessGet(id) if err != nil { return nil, err } diff --git a/cluster/store/process.go b/cluster/store/process.go index d7066e13..d04b7129 100644 --- a/cluster/store/process.go +++ b/cluster/store/process.go @@ -2,6 +2,7 @@ package store import ( "fmt" + "maps" "time" "github.com/datarhei/core/v16/restream/app" @@ -219,36 +220,32 @@ func (s *store) ProcessList() []Process { return processes } -func (s *store) ProcessGet(id app.ProcessID) (Process, error) { +func (s *store) ProcessGet(id app.ProcessID) (Process, string, error) { s.lock.RLock() defer s.lock.RUnlock() process, ok := s.data.Process[id.String()] if !ok { - return Process{}, fmt.Errorf("not found%w", ErrNotFound) + return Process{}, "", fmt.Errorf("not found%w", ErrNotFound) } + nodeid := s.data.ProcessNodeMap[id.String()] + return Process{ CreatedAt: process.CreatedAt, UpdatedAt: process.UpdatedAt, Config: process.Config.Clone(), Order: process.Order, - Metadata: process.Metadata, + Metadata: maps.Clone(process.Metadata), Error: process.Error, - }, nil + }, nodeid, nil } func (s *store) ProcessGetNodeMap() map[string]string { s.lock.RLock() defer s.lock.RUnlock() - m := map[string]string{} - - for key, value := range s.data.ProcessNodeMap { - m[key] = value - } - - return m + return maps.Clone(s.data.ProcessNodeMap) } func (s *store) ProcessGetNode(id app.ProcessID) (string, error) { diff --git a/cluster/store/process_test.go b/cluster/store/process_test.go index 03a33587..8d790cd2 100644 --- a/cluster/store/process_test.go +++ b/cluster/store/process_test.go @@ -301,13 +301,13 @@ func TestUpdateProcess(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(s.data.Process)) - _, err = s.ProcessGet(config1.ProcessID()) + _, _, err = s.ProcessGet(config1.ProcessID()) require.Error(t, err) - _, err = s.ProcessGet(config2.ProcessID()) + _, _, err = s.ProcessGet(config2.ProcessID()) require.NoError(t, err) - _, err = s.ProcessGet(config.ProcessID()) + _, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) } @@ -330,7 +330,7 @@ func TestSetProcessOrderCommand(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, s.data.Process) - p, err := s.ProcessGet(config.ProcessID()) + p, _, err := s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "stop", p.Order) @@ -343,7 +343,7 @@ func TestSetProcessOrderCommand(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "start", p.Order) } @@ -382,7 +382,7 @@ func TestSetProcessOrder(t *testing.T) { }) require.NoError(t, err) - p, err := s.ProcessGet(config.ProcessID()) + p, _, err := s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "stop", p.Order) @@ -392,7 +392,7 @@ func TestSetProcessOrder(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "start", p.Order) } @@ -416,7 +416,7 @@ func TestSetProcessMetadataCommand(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, s.data.Process) - p, err := s.ProcessGet(config.ProcessID()) + p, _, err := s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Empty(t, p.Metadata) @@ -432,7 +432,7 @@ func TestSetProcessMetadataCommand(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.NotEmpty(t, p.Metadata) @@ -477,7 +477,7 @@ func TestSetProcessMetadata(t *testing.T) { }) require.NoError(t, err) - p, err := s.ProcessGet(config.ProcessID()) + p, _, err := s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.NotEmpty(t, p.Metadata) @@ -492,7 +492,7 @@ func TestSetProcessMetadata(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.NotEmpty(t, p.Metadata) @@ -506,7 +506,7 @@ func TestSetProcessMetadata(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.NotEmpty(t, p.Metadata) @@ -533,7 +533,7 @@ func TestSetProcessErrorCommand(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, s.data.Process) - p, err := s.ProcessGet(config.ProcessID()) + p, _, err := s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "", p.Error) @@ -546,7 +546,7 @@ func TestSetProcessErrorCommand(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "foobar", p.Error) } @@ -585,7 +585,7 @@ func TestSetProcessError(t *testing.T) { }) require.NoError(t, err) - p, err := s.ProcessGet(config.ProcessID()) + p, _, err := s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "", p.Error) @@ -595,7 +595,7 @@ func TestSetProcessError(t *testing.T) { }) require.NoError(t, err) - p, err = s.ProcessGet(config.ProcessID()) + p, _, err = s.ProcessGet(config.ProcessID()) require.NoError(t, err) require.Equal(t, "foobar", p.Error) } diff --git a/cluster/store/store.go b/cluster/store/store.go index 40973111..d26b7e13 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -21,7 +21,7 @@ type Store interface { OnApply(func(op Operation)) ProcessList() []Process - ProcessGet(id app.ProcessID) (Process, error) + ProcessGet(id app.ProcessID) (Process, string, error) ProcessGetNode(id app.ProcessID) (string, error) ProcessGetNodeMap() map[string]string ProcessGetRelocateMap() map[string]string diff --git a/http/api/process.go b/http/api/process.go index 265789a6..baf87707 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -1,6 +1,9 @@ package api import ( + "strconv" + + "github.com/datarhei/core/v16/cluster/store" "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/restream/app" @@ -28,6 +31,114 @@ type Process struct { Metadata Metadata `json:"metadata,omitempty"` } +func (p *Process) Unmarshal(ap *app.Process, ac *app.Config, as *app.State, ar *app.Report, am interface{}) { + p.ID = ap.ID + p.Owner = ap.Owner + p.Domain = ap.Domain + p.Reference = ap.Reference + p.Type = "ffmpeg" + p.CreatedAt = ap.CreatedAt + p.UpdatedAt = ap.UpdatedAt + + p.Config = nil + if ac != nil { + p.Config = &ProcessConfig{} + p.Config.Unmarshal(ap.Config, nil) + } + + p.State = nil + if as != nil { + p.State = &ProcessState{} + p.State.Unmarshal(as) + } + + p.Report = nil + if ar != nil { + p.Report = &ProcessReport{} + p.Report.Unmarshal(ar) + } + + p.Metadata = nil + if am != nil { + p.Metadata = NewMetadata(am) + } +} + +func (p *Process) UnmarshalStore(s store.Process, config, state, report, metadata bool) { + p.ID = s.Config.ID + p.Owner = s.Config.Owner + p.Domain = s.Config.Domain + p.Type = "ffmpeg" + p.Reference = s.Config.Reference + p.CreatedAt = s.CreatedAt.Unix() + p.UpdatedAt = s.UpdatedAt.Unix() + + p.Metadata = nil + if metadata { + p.Metadata = s.Metadata + } + + p.Config = nil + if config { + config := &ProcessConfig{} + config.Unmarshal(s.Config, s.Metadata) + + p.Config = config + } + + p.State = nil + if state { + p.State = &ProcessState{ + Order: s.Order, + LastLog: s.Error, + Resources: ProcessUsage{ + CPU: ProcessUsageCPU{ + NCPU: json.ToNumber(1), + Limit: json.ToNumber(s.Config.LimitCPU), + }, + Memory: ProcessUsageMemory{ + Limit: s.Config.LimitMemory, + }, + }, + Command: []string{}, + Progress: &Progress{ + Input: []ProgressIO{}, + Output: []ProgressIO{}, + Mapping: StreamMapping{ + Graphs: []GraphElement{}, + Mapping: []GraphMapping{}, + }, + }, + } + + if len(s.Error) != 0 { + p.State.State = "failed" + } else { + p.State.State = "deploying" + } + } + + if report { + p.Report = &ProcessReport{ + ProcessReportEntry: ProcessReportEntry{ + CreatedAt: s.CreatedAt.Unix(), + Prelude: []string{}, + Log: [][2]string{}, + Matches: []string{}, + }, + } + + if len(s.Error) != 0 { + p.Report.Prelude = []string{s.Error} + p.Report.Log = [][2]string{ + {strconv.FormatInt(s.CreatedAt.Unix(), 10), s.Error}, + } + //process.Report.ExitedAt = p.CreatedAt.Unix() + //process.Report.ExitState = "failed" + } + } +} + // ProcessConfigIO represents an input or output of an ffmpeg process config type ProcessConfigIO struct { ID string `json:"id"` diff --git a/http/handler/api/cluster_process.go b/http/handler/api/cluster_process.go index 8c531810..11b3894d 100644 --- a/http/handler/api/cluster_process.go +++ b/http/handler/api/cluster_process.go @@ -1,15 +1,12 @@ package api import ( - "bytes" "fmt" "net/http" - "strconv" "strings" "github.com/datarhei/core/v16/cluster/node" "github.com/datarhei/core/v16/cluster/store" - "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/glob" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" @@ -89,53 +86,16 @@ func (h *ClusterHandler) ProcessList(c echo.Context) error { continue } - process := h.convertStoreProcessToAPIProcess(p, filter) + process := api.Process{} + process.UnmarshalStore(p, filter.config, filter.state, filter.report, filter.metadata) missing = append(missing, process) } } - // We're doing some byte-wrangling here because the processes from the nodes - // are of type clientapi.Process, the missing processes are from type api.Process. - // They are actually the same and converting them is cumbersome. That's why - // we're doing the JSON marshalling here and appending these two slices is done - // in JSON representation. + processes = append(processes, missing...) - data, err := json.Marshal(processes) - if err != nil { - return api.Err(http.StatusInternalServerError, "", err.Error()) - } - - buf := &bytes.Buffer{} - - if len(missing) != 0 { - reallyData, err := json.Marshal(missing) - if err != nil { - return api.Err(http.StatusInternalServerError, "", err.Error()) - } - - i := bytes.LastIndexByte(data, ']') - if i == -1 { - return api.Err(http.StatusInternalServerError, "", "no valid JSON") - } - - if len(processes) != 0 { - data[i] = ',' - } else { - data[i] = ' ' - } - buf.Write(data) - - i = bytes.IndexByte(reallyData, '[') - if i == -1 { - return api.Err(http.StatusInternalServerError, "", "no valid JSON") - } - buf.Write(reallyData[i+1:]) - } else { - buf.Write(data) - } - - return c.Stream(http.StatusOK, "application/json", buf) + return c.JSON(http.StatusOK, processes) } func (h *ClusterHandler) getFilteredStoreProcesses(processes []store.Process, wantids []string, _, reference, idpattern, refpattern, ownerpattern, domainpattern string) []store.Process { @@ -224,74 +184,6 @@ func (h *ClusterHandler) getFilteredStoreProcesses(processes []store.Process, wa return final } -func (h *ClusterHandler) convertStoreProcessToAPIProcess(p store.Process, filter filter) api.Process { - process := api.Process{ - ID: p.Config.ID, - Owner: p.Config.Owner, - Domain: p.Config.Domain, - Type: "ffmpeg", - Reference: p.Config.Reference, - CreatedAt: p.CreatedAt.Unix(), - UpdatedAt: p.UpdatedAt.Unix(), - } - - if filter.metadata { - process.Metadata = p.Metadata - } - - if filter.config { - config := &api.ProcessConfig{} - config.Unmarshal(p.Config, p.Metadata) - - process.Config = config - } - - if filter.state { - process.State = &api.ProcessState{ - Order: p.Order, - LastLog: p.Error, - Resources: api.ProcessUsage{ - CPU: api.ProcessUsageCPU{ - NCPU: json.ToNumber(1), - Limit: json.ToNumber(p.Config.LimitCPU), - }, - Memory: api.ProcessUsageMemory{ - Limit: p.Config.LimitMemory, - }, - }, - Command: []string{}, - } - - if len(p.Error) != 0 { - process.State.State = "failed" - } else { - process.State.State = "finished" - } - } - - if filter.report { - process.Report = &api.ProcessReport{ - ProcessReportEntry: api.ProcessReportEntry{ - CreatedAt: p.CreatedAt.Unix(), - Prelude: []string{}, - Log: [][2]string{}, - Matches: []string{}, - }, - } - - if len(p.Error) != 0 { - process.Report.Prelude = []string{p.Error} - process.Report.Log = [][2]string{ - {strconv.FormatInt(p.CreatedAt.Unix(), 10), p.Error}, - } - //process.Report.ExitedAt = p.CreatedAt.Unix() - //process.Report.ExitState = "failed" - } - } - - return process -} - // ProcessGet returns the process with the given ID whereever it's running on the cluster // @Summary List a process by its ID // @Description List a process by its ID. Use the filter parameter to specifiy the level of detail of the output. @@ -316,29 +208,27 @@ func (h *ClusterHandler) ProcessGet(c echo.Context) error { return api.Err(http.StatusForbidden, "") } - procs := h.proxy.ProcessList(node.ProcessListOptions{ - ID: []string{id}, - Filter: filter.Slice(), - Domain: domain, - }) + pid := app.NewProcessID(id, domain) - if len(procs) == 0 { - // Check the store in the cluster for an undeployed process - p, err := h.cluster.Store().ProcessGet(app.NewProcessID(id, domain)) + // Check the store for the process + // TODO: should check the leader because in larger cluster it needs time to get to all followers + p, nodeid, err := h.cluster.ProcessGet("", pid, false) + if err != nil { + return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) + } + + process := api.Process{} + process.UnmarshalStore(p, filter.config, filter.state, filter.report, filter.metadata) + + // Get the actual process data + if len(nodeid) != 0 { + process, err = h.proxy.ProcessGet(nodeid, pid, filter.Slice()) if err != nil { - return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) + return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) } - - process := h.convertStoreProcessToAPIProcess(p, filter) - - return c.JSON(http.StatusOK, process) } - if procs[0].Domain != domain { - return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) - } - - return c.JSON(http.StatusOK, procs[0]) + return c.JSON(http.StatusOK, process) } // Add adds a new process to the cluster @@ -436,7 +326,7 @@ func (h *ClusterHandler) ProcessUpdate(c echo.Context) error { pid := process.ProcessID() - current, err := h.cluster.Store().ProcessGet(pid) + current, _, err := h.cluster.ProcessGet("", pid, false) if err != nil { return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) } diff --git a/http/handler/api/cluster_store.go b/http/handler/api/cluster_store.go index 8d0c7349..59eeadae 100644 --- a/http/handler/api/cluster_store.go +++ b/http/handler/api/cluster_store.go @@ -33,7 +33,8 @@ func (h *ClusterHandler) StoreListProcesses(c echo.Context) error { continue } - process := h.convertStoreProcessToAPIProcess(p, newFilter("")) + process := api.Process{} + process.UnmarshalStore(p, true, true, true, true) processes = append(processes, process) } @@ -66,12 +67,13 @@ func (h *ClusterHandler) StoreGetProcess(c echo.Context) error { return api.Err(http.StatusForbidden, "", "API user %s is not allowed to read this process", ctxuser) } - p, err := h.cluster.Store().ProcessGet(pid) + p, _, err := h.cluster.Store().ProcessGet(pid) if err != nil { return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) } - process := h.convertStoreProcessToAPIProcess(p, newFilter("")) + process := api.Process{} + process.UnmarshalStore(p, true, true, true, true) return c.JSON(http.StatusOK, process) } diff --git a/http/handler/api/fixtures/addProcess.json b/http/handler/api/fixtures/addProcess.json index b967e245..73994f03 100644 --- a/http/handler/api/fixtures/addProcess.json +++ b/http/handler/api/fixtures/addProcess.json @@ -25,8 +25,9 @@ ] } ], + "log_patterns": [], "autostart": false, "reconnect": true, "reconnect_delay_seconds": 10, "stale_timeout_seconds": 10 -} +} \ No newline at end of file diff --git a/http/handler/api/process.go b/http/handler/api/process.go index dc3f5cc4..70d381da 100644 --- a/http/handler/api/process.go +++ b/http/handler/api/process.go @@ -1046,50 +1046,40 @@ func (h *ProcessHandler) getProcess(id app.ProcessID, filter filter) (api.Proces return api.Process{}, err } - info := api.Process{ - ID: process.ID, - Owner: process.Owner, - Domain: process.Domain, - Reference: process.Reference, - Type: "ffmpeg", - CoreID: h.restream.ID(), - CreatedAt: process.CreatedAt, - UpdatedAt: process.UpdatedAt, - } + var config *app.Config + var state *app.State + var report *app.Report + var metadata interface{} if filter.config { - info.Config = &api.ProcessConfig{} - info.Config.Unmarshal(process.Config, nil) + config = process.Config } if filter.state { - state, err := h.restream.GetProcessState(id) + state, err = h.restream.GetProcessState(id) if err != nil { return api.Process{}, err } - - info.State = &api.ProcessState{} - info.State.Unmarshal(state) } if filter.report { - log, err := h.restream.GetProcessReport(id) + report, err = h.restream.GetProcessReport(id) if err != nil { return api.Process{}, err } - - info.Report = &api.ProcessReport{} - info.Report.Unmarshal(log) } if filter.metadata { - data, err := h.restream.GetProcessMetadata(id, "") + metadata, err = h.restream.GetProcessMetadata(id, "") if err != nil { return api.Process{}, err } - - info.Metadata = api.NewMetadata(data) } + info := api.Process{ + CoreID: h.restream.ID(), + } + info.Unmarshal(process, config, state, report, metadata) + return info, nil } diff --git a/restream/app/process.go b/restream/app/process.go index 6dfebab7..8d58d746 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -81,7 +81,7 @@ type Config struct { StaleTimeout uint64 // seconds Timeout uint64 // seconds Scheduler string // crontab pattern or RFC3339 timestamp - LogPatterns []string // will we interpreted as regular expressions + LogPatterns []string // will be interpreted as regular expressions LimitCPU float64 // percent LimitMemory uint64 // bytes LimitWaitFor uint64 // seconds diff --git a/restream/task.go b/restream/task.go index fd8a1830..611702ef 100644 --- a/restream/task.go +++ b/restream/task.go @@ -2,6 +2,7 @@ package restream import ( "errors" + "maps" "time" "github.com/datarhei/core/v16/ffmpeg/parse" @@ -393,7 +394,11 @@ func (t *task) GetMetadata(key string) (interface{}, error) { defer t.lock.RUnlock(token) if len(key) == 0 { - return t.metadata, nil + if t.metadata == nil { + return nil, nil + } + + return maps.Clone(t.metadata), nil } if t.metadata == nil {