diff --git a/cluster/api.go b/cluster/api.go index 072ca475..74990ebd 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -113,6 +113,8 @@ func NewAPI(config APIConfig) (API, error) { a.router.PUT("/v1/process/:id/command", a.SetProcessCommand) a.router.PUT("/v1/process/:id/metadata/:key", a.SetProcessMetadata) + a.router.PUT("/v1/relocate", a.RelocateProcesses) + a.router.POST("/v1/iam/user", a.AddIdentity) a.router.PUT("/v1/iam/user/:name", a.UpdateIdentity) a.router.PUT("/v1/iam/user/:name/policies", a.SetIdentityPolicies) @@ -508,6 +510,39 @@ func (a *api) SetProcessMetadata(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } +// RelocateProcesses relocates processes to another node +// @Summary Relocate processes to another node +// @Description Relocate processes to another node. +// @Tags v1.0.0 +// @ID cluster-3-relocate-processes +// @Produce json +// @Param data body client.RelocateProcessesRequest true "List of processes to relocate" +// @Success 200 {string} string +// @Failure 500 {object} Error +// @Failure 508 {object} Error +// @Router /v1/relocate [put] +func (a *api) RelocateProcesses(c echo.Context) error { + r := client.RelocateProcessesRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.RelocateProcesses(origin, r.Map) + if err != nil { + a.logger.Debug().WithError(err).Log("Unable to apply process relocation request") + return Err(http.StatusInternalServerError, "", "unable to apply process relocation request: %s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} + // AddIdentity adds an identity to the cluster DB // @Summary Add an identity // @Description Add an identity to the cluster DB diff --git a/cluster/client/client.go b/cluster/client/client.go index c1af8f5b..41ffda60 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -39,6 +39,10 @@ type SetProcessMetadataRequest struct { Metadata interface{} `json:"metadata"` } +type RelocateProcessesRequest struct { + Map map[app.ProcessID]string +} + type AddIdentityRequest struct { Identity iamidentity.User `json:"identity"` } @@ -219,6 +223,17 @@ func (c *APIClient) SetProcessMetadata(origin string, id app.ProcessID, key stri return err } +func (c *APIClient) RelocateProcesses(origin string, r RelocateProcessesRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPut, "/v1/relocate", "application/json", bytes.NewReader(data), origin) + + return err +} + func (c *APIClient) AddIdentity(origin string, r AddIdentityRequest) error { data, err := json.Marshal(r) if err != nil { diff --git a/cluster/cluster.go b/cluster/cluster.go index ff05e38d..ed642826 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -69,6 +69,7 @@ type Cluster interface { SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error GetProcessMetadata(origin string, id app.ProcessID, key string) (interface{}, error) GetProcessNodeMap() map[string]string + RelocateProcesses(origin string, relocations map[app.ProcessID]string) error IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) ListIdentities() (time.Time, []iamidentity.User) diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 61b995ed..7689e9a5 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -918,6 +918,50 @@ const docTemplateClusterAPI = `{ } } }, + "/v1/relocate": { + "put": { + "description": "Relocate processes to another node.", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Relocate processes to another node", + "operationId": "cluster-3-relocate-processes", + "parameters": [ + { + "description": "List of processes to relocate", + "name": "data", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/client.RelocateProcessesRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/server": { "post": { "description": "Add a new server to the cluster", @@ -1281,6 +1325,17 @@ const docTemplateClusterAPI = `{ } } }, + "client.RelocateProcessesRequest": { + "type": "object", + "properties": { + "map": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + }, "client.SetKVRequest": { "type": "object", "properties": { diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index e7dd2113..3e0f98e8 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -910,6 +910,50 @@ } } }, + "/v1/relocate": { + "put": { + "description": "Relocate processes to another node.", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Relocate processes to another node", + "operationId": "cluster-3-relocate-processes", + "parameters": [ + { + "description": "List of processes to relocate", + "name": "data", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/client.RelocateProcessesRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/server": { "post": { "description": "Add a new server to the cluster", @@ -1273,6 +1317,17 @@ } } }, + "client.RelocateProcessesRequest": { + "type": "object", + "properties": { + "map": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + }, "client.SetKVRequest": { "type": "object", "properties": { diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index f31820fb..9b679bf0 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -122,6 +122,13 @@ definitions: valid_until: type: string type: object + client.RelocateProcessesRequest: + properties: + map: + additionalProperties: + type: string + type: object + type: object client.SetKVRequest: properties: key: @@ -1466,6 +1473,35 @@ paths: summary: Add JSON metadata with a process under the given key tags: - v1.0.0 + /v1/relocate: + put: + description: Relocate processes to another node. + operationId: cluster-3-relocate-processes + parameters: + - description: List of processes to relocate + in: body + name: data + required: true + schema: + $ref: '#/definitions/client.RelocateProcessesRequest' + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/cluster.Error' + "508": + description: Loop Detected + schema: + $ref: '#/definitions/cluster.Error' + summary: Relocate processes to another node + tags: + - v1.0.0 /v1/server: post: consumes: diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index a998bc5a..1d8e45be 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -28,6 +28,7 @@ type Forwarder interface { RemoveProcess(origin string, id app.ProcessID) error SetProcessCommand(origin string, id app.ProcessID, command string) error SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error + RelocateProcesses(origin string, relocations map[app.ProcessID]string) error AddIdentity(origin string, identity iamidentity.User) error UpdateIdentity(origin, name string, identity iamidentity.User) error @@ -237,6 +238,22 @@ func (f *forwarder) RemoveProcess(origin string, id app.ProcessID) error { return client.RemoveProcess(origin, id) } +func (f *forwarder) RelocateProcesses(origin string, relocations map[app.ProcessID]string) error { + if origin == "" { + origin = f.id + } + + r := apiclient.RelocateProcessesRequest{ + Map: relocations, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.RelocateProcesses(origin, r) +} + func (f *forwarder) AddIdentity(origin string, identity iamidentity.User) error { if origin == "" { origin = f.id diff --git a/cluster/leader.go b/cluster/leader.go index 5b0a19fd..c9ba5825 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -412,6 +412,7 @@ type processOpMove struct { toNodeid string config *app.Config metadata map[string]interface{} + order string } type processOpStart struct { @@ -574,18 +575,20 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro break } - err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") - if err != nil { - errors = append(errors, processOpError{ - processid: v.config.ProcessID(), - err: err, - }) - logger.Info().WithError(err).WithFields(log.Fields{ - "processid": v.config.ProcessID(), - "fromnodeid": v.fromNodeid, - "tonodeid": v.toNodeid, - }).Log("Moving process, starting process") - break + if v.order == "start" { + err = c.proxy.CommandProcess(v.toNodeid, v.config.ProcessID(), "start") + if err != nil { + errors = append(errors, processOpError{ + processid: v.config.ProcessID(), + err: err, + }) + logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ProcessID(), + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, starting process") + break + } } errors = append(errors, processOpError{ @@ -1055,6 +1058,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc toNodeid: nodeid, config: haveP.Config, metadata: haveP.Metadata, + order: haveP.Order, }) } @@ -1475,6 +1479,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf toNodeid: availableNodeid, config: p.Config, metadata: p.Metadata, + order: p.Order, }) // Adjust the process. @@ -1529,6 +1534,11 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa sourceNodeid := process.NodeID + if sourceNodeid == targetNodeid { + relocatedProcessIDs = append(relocatedProcessIDs, processid) + continue + } + if len(targetNodeid) != 0 { _, hasNode := nodes[targetNodeid] @@ -1585,6 +1595,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa toNodeid: targetNodeid, config: process.Config, metadata: process.Metadata, + order: process.Order, }) // Adjust the resources. diff --git a/cluster/process.go b/cluster/process.go index 84d6fbf8..381ae4c9 100644 --- a/cluster/process.go +++ b/cluster/process.go @@ -102,6 +102,25 @@ func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command str return c.proxy.CommandProcess(nodeid, id, command) } +func (c *cluster) RelocateProcesses(origin string, relocations map[app.ProcessID]string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + return c.forwarder.RelocateProcesses(origin, relocations) + } + + cmd := &store.Command{ + Operation: store.OpSetRelocateProcess, + Data: &store.CommandSetRelocateProcess{ + Map: relocations, + }, + } + + return c.applyCommand(cmd) +} + func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { if ok, _ := c.IsDegraded(); ok { return ErrDegraded diff --git a/cluster/store/store.go b/cluster/store/store.go index 8628de78..686da6b3 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -196,6 +196,7 @@ func (s *storeData) init() { s.Version = 1 s.Process = map[string]Process{} s.ProcessNodeMap = map[string]string{} + s.ProcessRelocateMap = map[string]string{} s.Users.UpdatedAt = now s.Users.Users = map[string]identity.User{} s.Users.userlist = identity.NewUserList() @@ -476,6 +477,14 @@ func (s *store) Restore(snapshot io.ReadCloser) error { return err } + if data.ProcessNodeMap == nil { + data.ProcessNodeMap = map[string]string{} + } + + if data.ProcessRelocateMap == nil { + data.ProcessRelocateMap = map[string]string{} + } + for id, p := range data.Process { if p.Metadata != nil { continue diff --git a/docs/docs.go b/docs/docs.go index f296fe53..b83d6569 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -2039,6 +2039,49 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/reallocation": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Retrieve snapshot of the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Retrieve snapshot of the cluster DB", + "operationId": "cluster-3-reallocation", + "parameters": [ + { + "description": "Process reallocations", + "name": "reallocations", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ClusterProcessReallocate" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/cluster/snapshot": { "get": { "security": [ @@ -5098,6 +5141,20 @@ const docTemplate = `{ "type": "string" } }, + "api.ClusterProcessReallocate": { + "type": "object", + "properties": { + "process_ids": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessID" + } + }, + "target_node_id": { + "type": "string" + } + } + }, "api.ClusterRaft": { "type": "object", "properties": { @@ -6614,6 +6671,17 @@ const docTemplate = `{ } } }, + "api.ProcessID": { + "type": "object", + "properties": { + "domain": { + "type": "string" + }, + "id": { + "type": "string" + } + } + }, "api.ProcessReport": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 554a708f..f542859c 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -2031,6 +2031,49 @@ } } }, + "/api/v3/cluster/reallocation": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Retrieve snapshot of the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Retrieve snapshot of the cluster DB", + "operationId": "cluster-3-reallocation", + "parameters": [ + { + "description": "Process reallocations", + "name": "reallocations", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ClusterProcessReallocate" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/cluster/snapshot": { "get": { "security": [ @@ -5090,6 +5133,20 @@ "type": "string" } }, + "api.ClusterProcessReallocate": { + "type": "object", + "properties": { + "process_ids": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ProcessID" + } + }, + "target_node_id": { + "type": "string" + } + } + }, "api.ClusterRaft": { "type": "object", "properties": { @@ -6606,6 +6663,17 @@ } } }, + "api.ProcessID": { + "type": "object", + "properties": { + "domain": { + "type": "string" + }, + "id": { + "type": "string" + } + } + }, "api.ProcessReport": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 3afc15c0..39105241 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -211,6 +211,15 @@ definitions: additionalProperties: type: string type: object + api.ClusterProcessReallocate: + properties: + process_ids: + items: + $ref: '#/definitions/api.ProcessID' + type: array + target_node_id: + type: string + type: object api.ClusterRaft: properties: address: @@ -1231,6 +1240,13 @@ definitions: format: uint64 type: integer type: object + api.ProcessID: + properties: + domain: + type: string + id: + type: string + type: object api.ProcessReport: properties: created_at: @@ -3893,6 +3909,33 @@ paths: summary: Probe a process in the cluster tags: - v16.?.? + /api/v3/cluster/reallocation: + put: + description: Retrieve snapshot of the cluster DB + operationId: cluster-3-reallocation + parameters: + - description: Process reallocations + in: body + name: reallocations + required: true + schema: + $ref: '#/definitions/api.ClusterProcessReallocate' + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Retrieve snapshot of the cluster DB + tags: + - v16.?.? /api/v3/cluster/snapshot: get: description: Retrieve snapshot of the cluster DB diff --git a/http/api/cluster.go b/http/api/cluster.go index f5e0d12f..67a8d45c 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -89,3 +89,8 @@ type ClusterKVSValue struct { type ClusterKVS map[string]ClusterKVSValue type ClusterProcessMap map[string]string + +type ClusterProcessReallocate struct { + TargetNodeID string `json:"target_node_id"` + Processes []ProcessID `json:"process_ids"` +} diff --git a/http/api/process.go b/http/api/process.go index 61e96bce..a4b5c710 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -7,6 +7,11 @@ import ( "github.com/lithammer/shortuuid/v4" ) +type ProcessID struct { + ID string `json:"id"` + Domain string `json:"domain"` +} + // Process represents all information on a process type Process struct { ID string `json:"id" jsonschema:"minLength=1"` diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index d1d12289..7666650d 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -13,6 +13,7 @@ import ( "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/iam" + "github.com/datarhei/core/v16/restream/app" "github.com/labstack/echo/v4" ) @@ -152,7 +153,7 @@ func (h *ClusterHandler) Healthy(c echo.Context) error { return c.JSON(http.StatusOK, !degraded) } -// Transfer the leadership to another node +// TransferLeadership transfers the leadership to another node // @Summary Transfer the leadership to another node // @Description Transfer the leadership to another node // @Tags v16.?.? @@ -228,3 +229,47 @@ func (h *ClusterHandler) GetSnapshot(c echo.Context) error { return c.Stream(http.StatusOK, "application/octet-stream", r) } + +// Reallocation issues reallocation requests of processes +// @Summary Retrieve snapshot of the cluster DB +// @Description Retrieve snapshot of the cluster DB +// @Tags v16.?.? +// @ID cluster-3-reallocation +// @Produce json +// @Param reallocations body api.ClusterProcessReallocate true "Process reallocations" +// @Success 200 {string} string +// @Failure 500 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/reallocation [put] +func (h *ClusterHandler) Reallocation(c echo.Context) error { + reallocations := []api.ClusterProcessReallocate{} + + if err := util.ShouldBindJSONValidation(c, &reallocations, false); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + for _, r := range reallocations { + err := c.Validate(r) + if err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + } + + relocations := map[app.ProcessID]string{} + + for _, r := range reallocations { + for _, p := range r.Processes { + relocations[app.ProcessID{ + ID: p.ID, + Domain: p.Domain, + }] = r.TargetNodeID + } + } + + err := h.cluster.RelocateProcesses("", relocations) + if err != nil { + return api.Err(http.StatusInternalServerError, "", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} diff --git a/http/server.go b/http/server.go index 68f506e6..da3c18e4 100644 --- a/http/server.go +++ b/http/server.go @@ -768,6 +768,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.PUT("/cluster/process/:id/command", s.v3handler.cluster.SetProcessCommand) v3.PUT("/cluster/process/:id/metadata/:key", s.v3handler.cluster.SetProcessMetadata) + v3.PUT("/cluster/reallocation", s.v3handler.cluster.Reallocation) + v3.DELETE("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSDeleteFile) v3.PUT("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSPutFile) diff --git a/restream/app/process.go b/restream/app/process.go index 97876821..b0e9e5f2 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -316,3 +316,12 @@ func (p *ProcessID) Parse(pid string) { p.ID = pid[:i] p.Domain = pid[i+1:] } + +func (p ProcessID) MarshalText() ([]byte, error) { + return []byte(p.String()), nil +} + +func (p *ProcessID) UnmarshalText(text []byte) error { + p.Parse(string(text)) + return nil +}