diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 87e6deae..7a2121cc 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -1289,6 +1289,11 @@ const docTemplateClusterAPI = `{ "type": "string" } }, + "startup_timeout_sec": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "sync_interval_sec": { "description": "seconds", "type": "integer", @@ -1745,6 +1750,9 @@ const docTemplateClusterAPI = `{ }, "key_file": { "type": "string" + }, + "staging": { + "type": "boolean" } } }, diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index 2ac8d8e2..042d9af8 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -1281,6 +1281,11 @@ "type": "string" } }, + "startup_timeout_sec": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "sync_interval_sec": { "description": "seconds", "type": "integer", @@ -1737,6 +1742,9 @@ }, "key_file": { "type": "string" + }, + "staging": { + "type": "boolean" } } }, diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index 35f60151..2a39a7fb 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -243,6 +243,10 @@ definitions: items: type: string type: array + startup_timeout_sec: + description: seconds + format: int64 + type: integer sync_interval_sec: description: seconds format: int64 @@ -552,6 +556,8 @@ definitions: type: boolean key_file: type: string + staging: + type: boolean type: object update_check: type: boolean diff --git a/cluster/leader.go b/cluster/leader.go index 82d4d565..50da474c 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -1,7 +1,9 @@ package cluster import ( + "bytes" "context" + "encoding/json" "errors" "fmt" "sort" @@ -571,7 +573,7 @@ func (c *cluster) doSynchronize(emergency bool) { opStack, _, reality := synchronize(wish, want, have, nodesMap, c.nodeRecoverTimeout) - if !emergency { + if !emergency && len(opStack) != 0 { cmd := &store.Command{ Operation: store.OpSetProcessNodeMap, Data: store.CommandSetProcessNodeMap{ @@ -606,6 +608,55 @@ func (c *cluster) doRebalance(emergency bool) { c.applyOpStack(opStack) } +// isMetadataUpdateRequired compares two metadata. It relies on the documents property that json.Marshal +// sorts the map keys prior encoding. +func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string]interface{}) (bool, map[string]interface{}) { + hasChanges := false + changeMap := map[string]interface{}{} + + haveMapKeys := map[string]struct{}{} + + for key := range haveMap { + haveMapKeys[key] = struct{}{} + } + + for key, wantMapValue := range wantMap { + haveMapValue, ok := haveMap[key] + if !ok { + // A key in map1 exists, that doesn't exist in map2, we need to update + hasChanges = true + } + + // Compare the values + changesData, err := json.Marshal(wantMapValue) + if err != nil { + continue + } + + completeData, err := json.Marshal(haveMapValue) + if err != nil { + continue + } + + if !bytes.Equal(changesData, completeData) { + // The values are not equal, we need to update + hasChanges = true + } + + delete(haveMapKeys, key) + + changeMap[key] = wantMapValue + } + + for key := range haveMapKeys { + // If there keys in map2 that are not in map1, we have to update + hasChanges = true + changeMap[key] = nil + } + + return hasChanges, changeMap +} + // synchronize returns a list of operations in order to adjust the "have" list to the "want" list // with taking the available resources on each node into account. func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) { @@ -654,13 +705,15 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc continue } else { - // The process is on the wantMap. Update the process if the configuration differ. - if !wantP.Config.Equal(haveP.Config) { + // The process is on the wantMap. Update the process if the configuration and/or metadata differ. + hasConfigChanges := !wantP.Config.Equal(haveP.Config) + hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata) + if hasConfigChanges || hasMetadataChanges { opStack = append(opStack, processOpUpdate{ nodeid: haveP.NodeID, processid: haveP.Config.ProcessID(), config: wantP.Config, - metadata: wantP.Metadata, + metadata: metadata, }) } } diff --git a/cluster/leader_test.go b/cluster/leader_test.go index d7d5aea6..ee54ebb5 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -566,6 +566,236 @@ func TestSynchronizeAddRemove(t *testing.T) { }, reality) } +func TestSynchronizeNoUpdate(t *testing.T) { + wish := map[string]string{ + "foobar@": "node1", + } + + want := []store.Process{ + { + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "baz", + }, + Order: "start", + }, + } + + have := []proxy.Process{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "baz", + }, + }, + } + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 7, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 85, + Mem: 65, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Empty(t, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node1", + }, reality) +} + +func TestSynchronizeUpdate(t *testing.T) { + wish := map[string]string{ + "foobar@": "node1", + } + + want := []store.Process{ + { + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "baz", + }, + Order: "start", + }, + } + + have := []proxy.Process{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "boz", + }, + }, + } + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 7, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 85, + Mem: 65, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, []interface{}{ + processOpUpdate{ + nodeid: "node1", + processid: app.ProcessID{ID: "foobar"}, + config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "baz", + }, + metadata: nil, + }, + }, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node1", + }, reality) +} + +func TestSynchronizeUpdateMetadata(t *testing.T) { + wish := map[string]string{ + "foobar@": "node1", + } + + want := []store.Process{ + { + UpdatedAt: time.Now(), + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "boz", + }, + Order: "start", + Metadata: map[string]interface{}{ + "foo": "bar", + }, + }, + } + + have := []proxy.Process{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "boz", + }, + }, + } + + nodes := map[string]proxy.NodeAbout{ + "node1": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 7, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, + }, + "node2": { + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 85, + Mem: 65, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, []interface{}{ + processOpUpdate{ + nodeid: "node1", + processid: app.ProcessID{ID: "foobar"}, + config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 5, + Reference: "boz", + }, + metadata: map[string]interface{}{ + "foo": "bar", + }, + }, + }, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node1", + }, reality) +} + func TestSynchronizeWaitDisconnectedNode(t *testing.T) { wish := map[string]string{ "foobar1@": "node1", @@ -1569,3 +1799,57 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) { }, }, affinityMap) } + +func TestIsMetadataUpdateRequired(t *testing.T) { + want1 := map[string]interface{}{ + "foo": "boz", + "sum": "sum", + "sim": []string{"id", "sam"}, + } + + have := map[string]interface{}{ + "sim": []string{"id", "sam"}, + "foo": "boz", + "sum": "sum", + } + + changes, _ := isMetadataUpdateRequired(want1, have) + require.False(t, changes) + + want2 := map[string]interface{}{ + "sim": []string{"id", "sam"}, + "foo": "boz", + } + + changes, metadata := isMetadataUpdateRequired(want2, have) + require.True(t, changes) + require.Equal(t, map[string]interface{}{ + "sim": []string{"id", "sam"}, + "foo": "boz", + "sum": nil, + }, metadata) + + want3 := map[string]interface{}{ + "sim": []string{"id", "sim"}, + "foo": "boz", + "sum": "sum", + } + + changes, metadata = isMetadataUpdateRequired(want3, have) + require.True(t, changes) + require.Equal(t, map[string]interface{}{ + "sim": []string{"id", "sim"}, + "foo": "boz", + "sum": "sum", + }, metadata) + + want4 := map[string]interface{}{} + + changes, metadata = isMetadataUpdateRequired(want4, have) + require.True(t, changes) + require.Equal(t, map[string]interface{}{ + "sim": nil, + "foo": nil, + "sum": nil, + }, metadata) +} diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index afc754a0..38b3920d 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -891,7 +891,9 @@ func (n *node) ProcessList(options ProcessListOptions) ([]clientapi.Process, err } func (n *node) ProxyProcessList() ([]Process, error) { - list, err := n.ProcessList(ProcessListOptions{}) + list, err := n.ProcessList(ProcessListOptions{ + Filter: []string{"config", "state", "metadata"}, + }) if err != nil { return nil, err } diff --git a/cluster/store/store.go b/cluster/store/store.go index 3aaf4136..023816f7 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -801,6 +801,7 @@ func (s *store) GetProcess(id app.ProcessID) (Process, error) { CreatedAt: process.CreatedAt, UpdatedAt: process.UpdatedAt, Config: process.Config.Clone(), + Order: process.Order, Metadata: process.Metadata, }, nil } diff --git a/docs/docs.go b/docs/docs.go index 5da4011d..be2331b0 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -245,14 +245,14 @@ const docTemplate = `{ "ApiKeyAuth": [] } ], - "description": "List of processes in the cluster", + "description": "List of processes in the cluster DB", "produces": [ "application/json" ], "tags": [ "v16.?.?" ], - "summary": "List of processes in the cluster", + "summary": "List of processes in the cluster DB", "operationId": "cluster-3-db-list-processes", "responses": { "200": { @@ -267,6 +267,47 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/db/process/:id": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Get a process in the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Get a process in the cluster DB", + "operationId": "cluster-3-db-get-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Process" + } + } + } + } + }, "/api/v3/cluster/db/user": { "get": { "security": [ @@ -1216,6 +1257,12 @@ const docTemplate = `{ "in": "path", "required": true }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + }, { "description": "Process config", "name": "config", @@ -4517,6 +4564,11 @@ const docTemplate = `{ "type": "string" } }, + "startup_timeout_sec": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "sync_interval_sec": { "description": "seconds", "type": "integer", @@ -4973,6 +5025,9 @@ const docTemplate = `{ }, "key_file": { "type": "string" + }, + "staging": { + "type": "boolean" } } }, @@ -6736,6 +6791,11 @@ const docTemplate = `{ "type": "string" } }, + "startup_timeout_sec": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "sync_interval_sec": { "description": "seconds", "type": "integer", @@ -7192,6 +7252,9 @@ const docTemplate = `{ }, "key_file": { "type": "string" + }, + "staging": { + "type": "boolean" } } }, diff --git a/docs/swagger.json b/docs/swagger.json index 00969ca0..9fa69897 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -237,14 +237,14 @@ "ApiKeyAuth": [] } ], - "description": "List of processes in the cluster", + "description": "List of processes in the cluster DB", "produces": [ "application/json" ], "tags": [ "v16.?.?" ], - "summary": "List of processes in the cluster", + "summary": "List of processes in the cluster DB", "operationId": "cluster-3-db-list-processes", "responses": { "200": { @@ -259,6 +259,47 @@ } } }, + "/api/v3/cluster/db/process/:id": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Get a process in the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Get a process in the cluster DB", + "operationId": "cluster-3-db-get-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Process" + } + } + } + } + }, "/api/v3/cluster/db/user": { "get": { "security": [ @@ -1208,6 +1249,12 @@ "in": "path", "required": true }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + }, { "description": "Process config", "name": "config", @@ -4509,6 +4556,11 @@ "type": "string" } }, + "startup_timeout_sec": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "sync_interval_sec": { "description": "seconds", "type": "integer", @@ -4965,6 +5017,9 @@ }, "key_file": { "type": "string" + }, + "staging": { + "type": "boolean" } } }, @@ -6728,6 +6783,11 @@ "type": "string" } }, + "startup_timeout_sec": { + "description": "seconds", + "type": "integer", + "format": "int64" + }, "sync_interval_sec": { "description": "seconds", "type": "integer", @@ -7184,6 +7244,9 @@ }, "key_file": { "type": "string" + }, + "staging": { + "type": "boolean" } } }, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index cf42dc9a..41f85ff8 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -282,6 +282,10 @@ definitions: items: type: string type: array + startup_timeout_sec: + description: seconds + format: int64 + type: integer sync_interval_sec: description: seconds format: int64 @@ -591,6 +595,8 @@ definitions: type: boolean key_file: type: string + staging: + type: boolean type: object update_check: type: boolean @@ -1848,6 +1854,10 @@ definitions: items: type: string type: array + startup_timeout_sec: + description: seconds + format: int64 + type: integer sync_interval_sec: description: seconds format: int64 @@ -2157,6 +2167,8 @@ definitions: type: boolean key_file: type: string + staging: + type: boolean type: object update_check: type: boolean @@ -2532,7 +2544,7 @@ paths: - v16.?.? /api/v3/cluster/db/process: get: - description: List of processes in the cluster + description: List of processes in the cluster DB operationId: cluster-3-db-list-processes produces: - application/json @@ -2545,7 +2557,33 @@ paths: type: array security: - ApiKeyAuth: [] - summary: List of processes in the cluster + summary: List of processes in the cluster DB + tags: + - v16.?.? + /api/v3/cluster/db/process/:id: + get: + description: Get a process in the cluster DB + operationId: cluster-3-db-get-process + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + - description: Domain to act on + in: query + name: domain + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.Process' + security: + - ApiKeyAuth: [] + summary: Get a process in the cluster DB tags: - v16.?.? /api/v3/cluster/db/user: @@ -3197,6 +3235,10 @@ paths: name: id required: true type: string + - description: Domain to act on + in: query + name: domain + type: string - description: Process config in: body name: config diff --git a/http/api/process.go b/http/api/process.go index 4c2726bc..58e4bd68 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -65,7 +65,7 @@ type ProcessConfig struct { Metadata map[string]interface{} `json:"metadata,omitempty"` } -// Marshal converts a process config in API representation to a restreamer process config +// Marshal converts a process config in API representation to a restreamer process config and metadata func (cfg *ProcessConfig) Marshal() (*app.Config, map[string]interface{}) { p := &app.Config{ ID: cfg.ID, diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 316dfdb3..351c912f 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -209,6 +209,10 @@ func (h *ClusterHandler) GetAllNodesProcess(c echo.Context) error { return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) } + if procs[0].Domain != domain { + return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id) + } + return c.JSON(http.StatusOK, procs[0]) } @@ -417,8 +421,8 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { } // ListStoreProcesses returns the list of processes stored in the DB of the cluster -// @Summary List of processes in the cluster -// @Description List of processes in the cluster +// @Summary List of processes in the cluster DB +// @Description List of processes in the cluster DB // @Tags v16.?.? // @ID cluster-3-db-list-processes // @Produce json @@ -427,14 +431,13 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { // @Router /api/v3/cluster/db/process [get] func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error { ctxuser := util.DefaultContext(c, "user", "") - domain := util.DefaultQuery(c, "domain", "") procs := h.cluster.ListProcesses() processes := []api.Process{} for _, p := range procs { - if !h.iam.Enforce(ctxuser, domain, "process:"+p.Config.ID, "read") { + if !h.iam.Enforce(ctxuser, p.Config.Domain, "process:"+p.Config.ID, "read") { continue } @@ -464,6 +467,59 @@ func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error { return c.JSON(http.StatusOK, processes) } +// GerStoreProcess returns a process stored in the DB of the cluster +// @Summary Get a process in the cluster DB +// @Description Get a process in the cluster DB +// @Tags v16.?.? +// @ID cluster-3-db-get-process +// @Produce json +// @Param id path string true "Process ID" +// @Param domain query string false "Domain to act on" +// @Success 200 {object} api.Process +// @Security ApiKeyAuth +// @Router /api/v3/cluster/db/process/:id [get] +func (h *ClusterHandler) GetStoreProcess(c echo.Context) error { + ctxuser := util.DefaultContext(c, "user", "") + domain := util.DefaultQuery(c, "domain", "") + id := util.PathParam(c, "id") + + pid := app.ProcessID{ + ID: id, + Domain: domain, + } + + if !h.iam.Enforce(ctxuser, domain, "process:"+id, "read") { + return api.Err(http.StatusForbidden, "", "API user %s is not allowed to read this process", ctxuser) + } + + p, err := h.cluster.GetProcess(pid) + if err != nil { + return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) + } + + process := api.Process{ + ID: p.Config.ID, + Owner: p.Config.Owner, + Domain: p.Config.Domain, + Type: "ffmpeg", + Reference: p.Config.Reference, + CreatedAt: p.CreatedAt.Unix(), + UpdatedAt: p.UpdatedAt.Unix(), + Metadata: p.Metadata, + } + + config := &api.ProcessConfig{} + config.Unmarshal(p.Config) + + process.Config = config + + process.State = &api.ProcessState{ + Order: p.Order, + } + + return c.JSON(http.StatusOK, process) +} + // Add adds a new process to the cluster // @Summary Add a new process // @Description Add a new FFmpeg process @@ -531,6 +587,7 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error { // @Accept json // @Produce json // @Param id path string true "Process ID" +// @Param domain query string false "Domain to act on" // @Param config body api.ProcessConfig true "Process config" // @Success 200 {object} api.ProcessConfig // @Failure 400 {object} api.Error @@ -560,7 +617,7 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error { current, err := h.cluster.GetProcess(pid) if err != nil { - return api.Err(http.StatusNotFound, "", "process not found: %s", id) + return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) } // Prefill the config with the current values @@ -584,7 +641,7 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error { if err := h.cluster.UpdateProcess("", pid, config); err != nil { if err == restream.ErrUnknownProcess { - return api.Err(http.StatusNotFound, "", "process not found: %s", id) + return api.Err(http.StatusNotFound, "", "process not found: %s in domain '%s'", pid.ID, pid.Domain) } return api.Err(http.StatusBadRequest, "", "process can't be updated: %s", err.Error()) diff --git a/http/server.go b/http/server.go index 5a5fa4f6..b7d8dfed 100644 --- a/http/server.go +++ b/http/server.go @@ -693,6 +693,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/snapshot", s.v3handler.cluster.GetSnapshot) v3.GET("/cluster/db/process", s.v3handler.cluster.ListStoreProcesses) + v3.GET("/cluster/db/process/:id", s.v3handler.cluster.GetStoreProcess) v3.GET("/cluster/db/user", s.v3handler.cluster.ListStoreIdentities) v3.GET("/cluster/db/user/:name", s.v3handler.cluster.ListStoreIdentity) v3.GET("/cluster/db/policies", s.v3handler.cluster.ListStorePolicies) diff --git a/restream/app/process.go b/restream/app/process.go index 3f435481..8663a3da 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -150,7 +150,6 @@ func (config *Config) Hash() []byte { b.WriteString(config.Reference) b.WriteString(config.Owner) b.WriteString(config.Domain) - b.WriteString(config.FFVersion) b.WriteString(config.Scheduler) b.WriteString(strings.Join(config.Options, ",")) b.WriteString(strings.Join(config.LogPatterns, ",")) diff --git a/restream/app/process_test.go b/restream/app/process_test.go index 17bfa6bb..61f6ca1f 100644 --- a/restream/app/process_test.go +++ b/restream/app/process_test.go @@ -50,7 +50,7 @@ func TestConfigHash(t *testing.T) { hash1 := config.Hash() - require.Equal(t, []byte{0x23, 0x5d, 0xcc, 0x36, 0x77, 0xa1, 0x49, 0x7c, 0xcd, 0x8a, 0x72, 0x6a, 0x6c, 0xa2, 0xc3, 0x24}, hash1) + require.Equal(t, []byte{0x7e, 0xae, 0x5b, 0xc3, 0xad, 0xe3, 0x9a, 0xfc, 0xd3, 0x49, 0x15, 0x28, 0x93, 0x17, 0xc5, 0xbf}, hash1) config.Reconnect = false diff --git a/restream/restream.go b/restream/restream.go index eac3aab8..b270e081 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1175,16 +1175,16 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { return ErrUnknownProcess } + // If the new config has the same hash as the current config, do nothing. + if task.process.Config.Equal(config) { + return nil + } + t, err := r.createTask(config) if err != nil { return err } - // If the new config has the same hash as the current config, do nothing. - if task.config.Equal(t.config) { - return nil - } - tid := t.ID() if !tid.Equal(id) {