diff --git a/cluster/api.go b/cluster/api.go index 74990ebd..04c47acd 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -15,6 +15,7 @@ package cluster import ( "context" + "errors" "fmt" "io/fs" "net/http" @@ -127,6 +128,8 @@ func NewAPI(config APIConfig) (API, error) { a.router.GET("/v1/kv/:key", a.GetKV) a.router.DELETE("/v1/kv/:key", a.UnsetKV) + a.router.PUT("/v1/node/:id/state", a.SetNodeState) + a.router.GET("/v1/core", a.CoreAPIAddress) a.router.GET("/v1/core/config", a.CoreConfig) a.router.GET("/v1/core/skills", a.CoreSkills) @@ -920,6 +923,55 @@ func (a *api) GetKV(c echo.Context) error { return c.JSON(http.StatusOK, res) } +// SetNodeState sets a state for a node +// @Summary Set a state for a node +// @Description Set a state for a node +// @Tags v1.0.0 +// @ID cluster-1-node-set-state +// @Produce json +// @Param data body client.SetNodeStateRequest true "Set node state request" +// @Param X-Cluster-Origin header string false "Origin ID of request" +// @Success 200 {string} string +// @Failure 400 {object} Error +// @Failure 500 {object} Error +// @Failure 508 {object} Error +// @Router /v1/node/{id}/state [get] +func (a *api) SetNodeState(c echo.Context) error { + nodeid := util.PathParam(c, "id") + + r := client.SetNodeStateRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + a.logger.Debug().WithFields(log.Fields{ + "node": nodeid, + "state": r.State, + }).Log("Set node state") + + err := a.cluster.SetNodeState(origin, nodeid, r.State) + if err != nil { + a.logger.Debug().WithError(err).WithFields(log.Fields{ + "node": nodeid, + "state": r.State, + }).Log("Unable to set state") + + if errors.Is(err, ErrUnsupportedNodeState) { + return Err(http.StatusBadRequest, "", "%s: %s", err.Error(), r.State) + } + return Err(http.StatusInternalServerError, "", "unable to set state: %s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} + // Error represents an error response of the API type Error struct { Code int `json:"code" jsonschema:"required" format:"int"` diff --git a/cluster/client/client.go b/cluster/client/client.go index 41ffda60..0dd417cb 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -70,6 +70,10 @@ type GetKVResponse struct { UpdatedAt time.Time `json:"updated_at"` } +type SetNodeStateRequest struct { + State string `json:"state"` +} + type APIClient struct { Address string Client *http.Client @@ -333,6 +337,17 @@ func (c *APIClient) GetKV(origin string, key string) (string, time.Time, error) return res.Value, res.UpdatedAt, nil } +func (c *APIClient) SetNodeState(origin string, nodeid string, r SetNodeStateRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPut, "/v1/node/"+url.PathEscape(nodeid)+"/state", "application/json", bytes.NewReader(data), origin) + + return err +} + func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) { return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin) } diff --git a/cluster/cluster.go b/cluster/cluster.go index ed642826..4866bb86 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -90,6 +90,9 @@ type Cluster interface { GetKV(origin, key string, stale bool) (string, time.Time, error) ListKV(prefix string) map[string]store.Value + ListNodes() map[string]store.Node + SetNodeState(origin, id, state string) error + ProxyReader() proxy.ProxyReader CertManager() autocert.Manager } @@ -1476,6 +1479,8 @@ func (c *cluster) About() (ClusterAbout, error) { } } + storeNodes := c.ListNodes() + c.nodesLock.RLock() for id, node := range c.nodes { nodeAbout := node.About() @@ -1515,6 +1520,12 @@ func (c *cluster) About() (ClusterAbout, error) { node.Leader = s.Leader } + if storeNode, hasStoreNode := storeNodes[id]; hasStoreNode { + if storeNode.State == "maintenance" { + node.Status = storeNode.State + } + } + about.Nodes = append(about.Nodes, node) } c.nodesLock.RUnlock() diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 7689e9a5..90240e9d 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -626,6 +626,62 @@ const docTemplateClusterAPI = `{ } } }, + "/v1/node/{id}/state": { + "get": { + "description": "Set a state for a node", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Set a state for a node", + "operationId": "cluster-1-node-set-state", + "parameters": [ + { + "description": "Set node state request", + "name": "data", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/client.SetNodeStateRequest" + } + }, + { + "type": "string", + "description": "Origin ID of request", + "name": "X-Cluster-Origin", + "in": "header" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/process": { "post": { "description": "Add a process to the cluster DB", @@ -1347,6 +1403,14 @@ const docTemplateClusterAPI = `{ } } }, + "client.SetNodeStateRequest": { + "type": "object", + "properties": { + "state": { + "type": "string" + } + } + }, "client.SetPoliciesRequest": { "type": "object", "properties": { diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index 3e0f98e8..711c3c88 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -618,6 +618,62 @@ } } }, + "/v1/node/{id}/state": { + "get": { + "description": "Set a state for a node", + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Set a state for a node", + "operationId": "cluster-1-node-set-state", + "parameters": [ + { + "description": "Set node state request", + "name": "data", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/client.SetNodeStateRequest" + } + }, + { + "type": "string", + "description": "Origin ID of request", + "name": "X-Cluster-Origin", + "in": "header" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } + }, "/v1/process": { "post": { "description": "Add a process to the cluster DB", @@ -1339,6 +1395,14 @@ } } }, + "client.SetNodeStateRequest": { + "type": "object", + "properties": { + "state": { + "type": "string" + } + } + }, "client.SetPoliciesRequest": { "type": "object", "properties": { diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index 9b679bf0..e9052e6e 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -136,6 +136,11 @@ definitions: value: type: string type: object + client.SetNodeStateRequest: + properties: + state: + type: string + type: object client.SetPoliciesRequest: properties: policies: @@ -1275,6 +1280,43 @@ paths: summary: Remove a lock tags: - v1.0.0 + /v1/node/{id}/state: + get: + description: Set a state for a node + operationId: cluster-1-node-set-state + parameters: + - description: Set node state request + in: body + name: data + required: true + schema: + $ref: '#/definitions/client.SetNodeStateRequest' + - description: Origin ID of request + in: header + name: X-Cluster-Origin + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/cluster.Error' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/cluster.Error' + "508": + description: Loop Detected + schema: + $ref: '#/definitions/cluster.Error' + summary: Set a state for a node + tags: + - v1.0.0 /v1/process: post: consumes: diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index 1d8e45be..cac84815 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -41,6 +41,8 @@ type Forwarder interface { SetKV(origin, key, value string) error UnsetKV(origin, key string) error GetKV(origin, key string) (string, time.Time, error) + + SetNodeState(origin, nodeid, state string) error } type forwarder struct { @@ -383,3 +385,19 @@ func (f *forwarder) GetKV(origin, key string) (string, time.Time, error) { return client.GetKV(origin, key) } + +func (f *forwarder) SetNodeState(origin, nodeid, state string) error { + if origin == "" { + origin = f.id + } + + r := apiclient.SetNodeStateRequest{ + State: state, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.SetNodeState(origin, nodeid, r) +} diff --git a/cluster/leader.go b/cluster/leader.go index c9ba5825..e27f96f5 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -400,6 +400,7 @@ func (c *cluster) clearLocks(ctx context.Context, interval time.Duration) { var errNotEnoughResourcesForDeployment = errors.New("no node with enough resources for deployment is available") var errNotEnoughResourcesForRebalancing = errors.New("no node with enough resources for rebalancing is available") +var errNotEnoughResourcesForRelocating = errors.New("no node with enough resources for relocating is available") var errNoLimitsDefined = errors.New("this process has no limits defined") type processOpDelete struct { @@ -672,6 +673,7 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro func (c *cluster) doSynchronize(emergency bool, term uint64) { wish := c.store.GetProcessNodeMap() want := c.store.ListProcesses() + storeNodes := c.store.ListNodes() have := c.proxy.ListProxyProcesses() nodes := c.proxy.ListNodes() @@ -683,6 +685,11 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) { for _, node := range nodes { about := node.About() + + if storeNode, hasStoreNode := storeNodes[about.ID]; hasStoreNode { + about.State = storeNode.State + } + nodesMap[about.ID] = about } @@ -752,6 +759,7 @@ func (c *cluster) doRebalance(emergency bool, term uint64) { logger.Debug().WithField("emergency", emergency).Log("Rebalancing") + storeNodes := c.store.ListNodes() have := c.proxy.ListProxyProcesses() nodes := c.proxy.ListNodes() @@ -759,6 +767,11 @@ func (c *cluster) doRebalance(emergency bool, term uint64) { for _, node := range nodes { about := node.About() + + if storeNode, hasStoreNode := storeNodes[about.ID]; hasStoreNode { + about.State = storeNode.State + } + nodesMap[about.ID] = about } @@ -815,6 +828,7 @@ func (c *cluster) doRelocate(emergency bool, term uint64) { logger.Debug().WithField("emergency", emergency).Log("Relocating") relocateMap := c.store.GetProcessRelocateMap() + storeNodes := c.store.ListNodes() have := c.proxy.ListProxyProcesses() nodes := c.proxy.ListNodes() @@ -822,6 +836,11 @@ func (c *cluster) doRelocate(emergency bool, term uint64) { for _, node := range nodes { about := node.About() + + if storeNode, hasStoreNode := storeNodes[about.ID]; hasStoreNode { + about.State = storeNode.State + } + nodesMap[about.ID] = about } @@ -933,10 +952,7 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string // synchronize returns a list of operations in order to adjust the "have" list to the "want" list // with taking the available resources on each node into account. func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) { - resources := map[string]proxy.NodeResources{} - for nodeid, about := range nodes { - resources[nodeid] = about.Resources - } + resources := NewResources(nodes) // A map same as wish, but reflecting the actual situation. reality := map[string]string{} @@ -967,12 +983,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc processid: haveP.Config.ProcessID(), }) - r, ok := resources[haveP.NodeID] - if ok { - r.CPU -= haveP.CPU - r.Mem -= haveP.Mem - resources[haveP.NodeID] = r - } + resources.Remove(haveP.NodeID, haveP.CPU, haveP.Mem) continue } @@ -1007,12 +1018,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc }) // Release the resources. - r, ok := resources[haveP.NodeID] - if ok { - r.CPU -= haveP.CPU - r.Mem -= haveP.Mem - resources[haveP.NodeID] = r - } + resources.Remove(haveP.NodeID, haveP.CPU, haveP.Mem) } } @@ -1022,56 +1028,50 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc for _, haveP := range wantOrderStart { nodeid := haveP.NodeID - r, ok := resources[nodeid] - if ok { - // Consume the resources. - r.CPU += haveP.Config.LimitCPU - r.Mem += haveP.Config.LimitMemory - resources[nodeid] = r + resources.Add(nodeid, haveP.Config.LimitCPU, haveP.Config.LimitMemory) - // TODO: check if the current node has actually enough resources available, - // otherwise it needs to be moved somewhere else. If the node doesn't - // have enough resources available, the process will be prevented - // from starting. + // TODO: check if the current node has actually enough resources available, + // otherwise it needs to be moved somewhere else. If the node doesn't + // have enough resources available, the process will be prevented + // from starting. - /* - if hasNodeEnoughResources(r, haveP.Config.LimitCPU, haveP.Config.LimitMemory) { - // Consume the resources + /* + if hasNodeEnoughResources(r, haveP.Config.LimitCPU, haveP.Config.LimitMemory) { + // Consume the resources + r.CPU += haveP.Config.LimitCPU + r.Mem += haveP.Config.LimitMemory + resources[nodeid] = r + } else { + nodeid = findBestNodeForProcess(resources, haveP.Config.LimitCPU, haveP.Config.LimitMemory) + if len(nodeid) == 0 { + // Start it anyways and let it run into an error + opStack = append(opStack, processOpStart{ + nodeid: nodeid, + processid: haveP.Config.ProcessID(), + }) + + continue + } + + if nodeid != haveP.NodeID { + opStack = append(opStack, processOpMove{ + fromNodeid: haveP.NodeID, + toNodeid: nodeid, + config: haveP.Config, + metadata: haveP.Metadata, + order: haveP.Order, + }) + } + + // Consume the resources + r, ok := resources[nodeid] + if ok { r.CPU += haveP.Config.LimitCPU r.Mem += haveP.Config.LimitMemory resources[nodeid] = r - } else { - nodeid = findBestNodeForProcess(resources, haveP.Config.LimitCPU, haveP.Config.LimitMemory) - if len(nodeid) == 0 { - // Start it anyways and let it run into an error - opStack = append(opStack, processOpStart{ - nodeid: nodeid, - processid: haveP.Config.ProcessID(), - }) - - continue - } - - if nodeid != haveP.NodeID { - opStack = append(opStack, processOpMove{ - fromNodeid: haveP.NodeID, - toNodeid: nodeid, - config: haveP.Config, - metadata: haveP.Metadata, - order: haveP.Order, - }) - } - - // Consume the resources - r, ok := resources[nodeid] - if ok { - r.CPU += haveP.Config.LimitCPU - r.Mem += haveP.Config.LimitMemory - resources[nodeid] = r - } } - */ - } + } + */ opStack = append(opStack, processOpStart{ nodeid: nodeid, @@ -1140,7 +1140,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Try to add the process to a node where other processes with the same reference currently reside. raNodes := haveReferenceAffinity.Nodes(wantP.Config.Reference, wantP.Config.Domain) for _, raNodeid := range raNodes { - if hasNodeEnoughResources(resources[raNodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) { + if resources.HasNodeEnough(raNodeid, wantP.Config.LimitCPU, wantP.Config.LimitMemory) { nodeid = raNodeid break } @@ -1148,7 +1148,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc // Find the node with the most resources available. if len(nodeid) == 0 { - nodes := findBestNodesForProcess(resources, wantP.Config.LimitCPU, wantP.Config.LimitMemory) + nodes := resources.FindBestNodes(wantP.Config.LimitCPU, wantP.Config.LimitMemory) if len(nodes) > 0 { nodeid = nodes[0] } @@ -1163,12 +1163,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc }) // Consume the resources - r, ok := resources[nodeid] - if ok { - r.CPU += wantP.Config.LimitCPU - r.Mem += wantP.Config.LimitMemory - resources[nodeid] = r - } + resources.Add(nodeid, wantP.Config.LimitCPU, wantP.Config.LimitMemory) reality[pid] = nodeid @@ -1181,26 +1176,56 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc } } - return opStack, resources, reality + return opStack, resources.Map(), reality } -// hasNodeEnoughResources returns whether a node has enough resources available for the +type resources struct { + nodes map[string]proxy.NodeResources + blocked map[string]struct{} +} + +func NewResources(nodes map[string]proxy.NodeAbout) *resources { + r := &resources{ + nodes: map[string]proxy.NodeResources{}, + blocked: map[string]struct{}{}, + } + + for nodeid, about := range nodes { + r.nodes[nodeid] = about.Resources + if about.State != "connected" { + r.blocked[nodeid] = struct{}{} + } + } + + return r +} + +// HasNodeEnough returns whether a node has enough resources available for the // requested cpu and memory consumption. -func hasNodeEnoughResources(r proxy.NodeResources, cpu float64, mem uint64) bool { - if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { +func (r *resources) HasNodeEnough(nodeid string, cpu float64, mem uint64) bool { + res, hasNode := r.nodes[nodeid] + if !hasNode { + return false + } + + if _, hasNode := r.blocked[nodeid]; hasNode { + return false + } + + if res.CPU+cpu < res.CPULimit && res.Mem+mem < res.MemLimit && !res.IsThrottling { return true } return false } -// findBestNodeForProcess returns an array of nodeids that can fit the requested cpu and memory requirements. If no +// FindBestNodes returns an array of nodeids that can fit the requested cpu and memory requirements. If no // such node is available, an empty array is returned. The array is sorted by the most suitable node first. -func findBestNodesForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) []string { +func (r *resources) FindBestNodes(cpu float64, mem uint64) []string { nodes := []string{} - for id, r := range resources { - if hasNodeEnoughResources(r, cpu, mem) { + for id := range r.nodes { + if r.HasNodeEnough(id, cpu, mem) { nodes = append(nodes, id) } } @@ -1208,16 +1233,57 @@ func findBestNodesForProcess(resources map[string]proxy.NodeResources, cpu float sort.SliceStable(nodes, func(i, j int) bool { nodeA, nodeB := nodes[i], nodes[j] - if resources[nodeA].CPU < resources[nodeB].CPU && resources[nodeA].Mem <= resources[nodeB].Mem { - return true + if r.nodes[nodeA].CPU != r.nodes[nodeB].CPU { + return r.nodes[nodeA].CPU < r.nodes[nodeB].CPU } - return false + return r.nodes[nodeA].Mem <= r.nodes[nodeB].Mem }) return nodes } +// Add adds the resources of the node according to the cpu and memory utilization. +func (r *resources) Add(nodeid string, cpu float64, mem uint64) { + res, hasRes := r.nodes[nodeid] + if !hasRes { + return + } + + res.CPU += cpu + res.Mem += mem + r.nodes[nodeid] = res +} + +// Remove subtracts the resources from the node according to the cpu and memory utilization. +func (r *resources) Remove(nodeid string, cpu float64, mem uint64) { + res, hasRes := r.nodes[nodeid] + if !hasRes { + return + } + + res.CPU -= cpu + if res.CPU < 0 { + res.CPU = 0 + } + if mem >= res.Mem { + res.Mem = 0 + } else { + res.Mem -= mem + } + r.nodes[nodeid] = res +} + +// Move adjusts the resources from the target and source node according to the cpu and memory utilization. +func (r *resources) Move(target, source string, cpu float64, mem uint64) { + r.Add(target, cpu, mem) + r.Remove(source, cpu, mem) +} + +func (r *resources) Map() map[string]proxy.NodeResources { + return r.nodes +} + type referenceAffinityNodeCount struct { nodeid string count uint64 @@ -1394,10 +1460,7 @@ func (ra *referenceAffinity) Move(reference, domain, fromnodeid, tonodeid string // rebalance returns a list of operations that will move running processes away from nodes that are overloaded. func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) { - resources := map[string]proxy.NodeResources{} - for nodeid, about := range nodes { - resources[nodeid] = about.Resources - } + resources := NewResources(nodes) // Group all running processes by node and sort them by their runtime in ascending order. nodeProcessMap := createNodeProcessMap(have) @@ -1444,8 +1507,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf continue } - r := resources[raNodeid] - if hasNodeEnoughResources(r, p.CPU, p.Mem) { + if resources.HasNodeEnough(raNodeid, p.CPU, p.Mem) { availableNodeid = raNodeid break } @@ -1454,7 +1516,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf // Find the best node with enough resources available. if len(availableNodeid) == 0 { - nodes := findBestNodesForProcess(resources, p.CPU, p.Mem) + nodes := resources.FindBestNodes(p.CPU, p.Mem) for _, nodeid := range nodes { if nodeid == overloadedNodeid { continue @@ -1487,7 +1549,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf processes[i] = p // Adjust the resources. - resources = adjustResources(resources, availableNodeid, overloadedNodeid, p.CPU, p.Mem) + resources.Move(availableNodeid, overloadedNodeid, p.CPU, p.Mem) // Adjust the reference affinity. haveReferenceAffinity.Move(p.Config.Reference, p.Config.Domain, overloadedNodeid, availableNodeid) @@ -1497,15 +1559,12 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf } } - return opStack, resources + return opStack, resources.Map() } // relocate returns a list of operations that will move deployed processes to different nodes. func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMap map[string]string) ([]interface{}, map[string]proxy.NodeResources, []string) { - resources := map[string]proxy.NodeResources{} - for nodeid, about := range nodes { - resources[nodeid] = about.Resources - } + resources := NewResources(nodes) relocatedProcessIDs := []string{} @@ -1542,7 +1601,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa if len(targetNodeid) != 0 { _, hasNode := nodes[targetNodeid] - if !hasNode || !hasNodeEnoughResources(nodes[targetNodeid].Resources, process.CPU, process.Mem) { + if !hasNode || !resources.HasNodeEnough(targetNodeid, process.CPU, process.Mem) { targetNodeid = "" } } @@ -1558,8 +1617,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa continue } - r := resources[raNodeid] - if hasNodeEnoughResources(r, process.CPU, process.Mem) { + if resources.HasNodeEnough(raNodeid, process.CPU, process.Mem) { targetNodeid = raNodeid break } @@ -1568,7 +1626,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa // Find the best node with enough resources available. if len(targetNodeid) == 0 { - nodes := findBestNodesForProcess(resources, process.CPU, process.Mem) + nodes := resources.FindBestNodes(process.CPU, process.Mem) for _, nodeid := range nodes { if nodeid == sourceNodeid { continue @@ -1584,7 +1642,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa opStack = append(opStack, processOpSkip{ nodeid: sourceNodeid, processid: process.Config.ProcessID(), - err: errNotEnoughResourcesForRebalancing, + err: errNotEnoughResourcesForRelocating, }) continue } @@ -1599,7 +1657,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa }) // Adjust the resources. - resources = adjustResources(resources, targetNodeid, sourceNodeid, process.CPU, process.Mem) + resources.Move(targetNodeid, sourceNodeid, process.CPU, process.Mem) // Adjust the reference affinity. haveReferenceAffinity.Move(process.Config.Reference, process.Config.Domain, sourceNodeid, targetNodeid) @@ -1607,29 +1665,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa relocatedProcessIDs = append(relocatedProcessIDs, processid) } - return opStack, resources, relocatedProcessIDs -} - -// adjustResources adjusts the resources from the target and source node according to the cpu and memory utilization. -func adjustResources(resources map[string]proxy.NodeResources, target, source string, cpu float64, mem uint64) map[string]proxy.NodeResources { - r := resources[target] - r.CPU += cpu - r.Mem += mem - resources[target] = r - - r = resources[source] - r.CPU -= cpu - if r.CPU < 0 { - r.CPU = 0 - } - if mem >= r.Mem { - r.Mem = 0 - } else { - r.Mem -= mem - } - resources[source] = r - - return resources + return opStack, resources.Map(), relocatedProcessIDs } // createNodeProcessMap takes a list of processes and groups them by the nodeid they diff --git a/cluster/leader_test.go b/cluster/leader_test.go index c69a3b6b..1669afb2 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -2299,16 +2299,127 @@ func TestFindBestNodesForProcess(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{} - for nodeid, about := range nodes { - resources[nodeid] = about.Resources - } + resources := NewResources(nodes) - list := findBestNodesForProcess(resources, 35, 20) + list := resources.FindBestNodes(35, 20) require.Equal(t, []string{"node3", "node2", "node1"}, list) } +func TestFindBestNodesForProcess2(t *testing.T) { + resources := NewResources(nil) + resources.nodes = map[string]proxy.NodeResources{ + "node1": { + CPULimit: 104.50000000000001, + CPU: 29.725299999999997, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 212262912, + NCPU: 1.1, + }, + "node2": { + CPULimit: 104.50000000000001, + CPU: 53.576600000000006, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 805830656, + NCPU: 1.1, + }, + "node3": { + CPULimit: 104.50000000000001, + CPU: 33.99000000000001, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 190910464, + NCPU: 1.1, + }, + "node4": { + CPULimit: 104.50000000000001, + CPU: 31.291700000000006, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 129310720, + NCPU: 1.1, + }, + "node5": { + CPULimit: 104.50000000000001, + CPU: 30.634999999999994, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 159158272, + NCPU: 1.1, + }, + "node6": { + CPULimit: 104.50000000000001, + CPU: 40.368900000000004, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 212189184, + NCPU: 1.1, + }, + "node7": { + CPULimit: 104.50000000000001, + CPU: 25.469399999999997, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 206098432, + NCPU: 1.1, + }, + "node8": { + CPULimit: 104.50000000000001, + CPU: 22.180400000000002, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 144138240, + NCPU: 1.1, + }, + "node9": { + CPULimit: 104.50000000000001, + CPU: 62.6714, + IsThrottling: true, + MemLimit: 1051931443, + Mem: 978501632, + NCPU: 1.1, + }, + "node10": { + CPULimit: 104.50000000000001, + CPU: 18.7748, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 142430208, + NCPU: 1.1, + }, + "node11": { + CPULimit: 104.50000000000001, + CPU: 43.807500000000005, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 368091136, + NCPU: 1.1, + }, + "node12": { + CPULimit: 104.50000000000001, + CPU: 31.067299999999996, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 149897216, + NCPU: 1.1, + }, + "node13": { + CPULimit: 104.50000000000001, + CPU: 35.93480000000001, + IsThrottling: false, + MemLimit: 1051931443, + Mem: 194408448, + NCPU: 1.1, + }, + } + + list := resources.FindBestNodes(4.0, 45*1024*1024) + + require.Equal(t, []string{"node10", "node8", "node7", "node1", "node5", "node12", "node4", "node3", "node13", "node6", "node11", "node2"}, list) +} + func TestCreateNodeProcessMap(t *testing.T) { processes := []proxy.Process{ { diff --git a/cluster/node.go b/cluster/node.go new file mode 100644 index 00000000..d54ce25a --- /dev/null +++ b/cluster/node.go @@ -0,0 +1,37 @@ +package cluster + +import ( + "errors" + + "github.com/datarhei/core/v16/cluster/store" +) + +func (c *cluster) ListNodes() map[string]store.Node { + return c.store.ListNodes() +} + +var ErrUnsupportedNodeState = errors.New("unsupported node state") + +func (c *cluster) SetNodeState(origin, id, state string) error { + switch state { + case "online": + case "maintenance": + case "leave": + default: + return ErrUnsupportedNodeState + } + + if !c.IsRaftLeader() { + return c.forwarder.SetNodeState(origin, id, state) + } + + cmd := &store.Command{ + Operation: store.OpSetNodeState, + Data: &store.CommandSetNodeState{ + NodeID: id, + State: state, + }, + } + + return c.applyCommand(cmd) +} diff --git a/cluster/store/node.go b/cluster/store/node.go new file mode 100644 index 00000000..a362e204 --- /dev/null +++ b/cluster/store/node.go @@ -0,0 +1,35 @@ +package store + +import "time" + +func (s *store) setNodeState(cmd CommandSetNodeState) error { + s.lock.Lock() + defer s.lock.Unlock() + + if cmd.State == "online" { + delete(s.data.Nodes, cmd.NodeID) + return nil + } + + node := s.data.Nodes[cmd.NodeID] + + node.State = cmd.State + node.UpdatedAt = time.Now() + + s.data.Nodes[cmd.NodeID] = node + + return nil +} + +func (s *store) ListNodes() map[string]Node { + s.lock.RLock() + defer s.lock.RUnlock() + + m := map[string]Node{} + + for id, node := range s.data.Nodes { + m[id] = node + } + + return m +} diff --git a/cluster/store/store.go b/cluster/store/store.go index 686da6b3..f5d6c082 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -35,6 +35,8 @@ type Store interface { ListKVS(prefix string) map[string]Value GetFromKVS(key string) (Value, error) + + ListNodes() map[string]Node } type Process struct { @@ -61,6 +63,11 @@ type Value struct { UpdatedAt time.Time } +type Node struct { + State string + UpdatedAt time.Time +} + type Operation string const ( @@ -82,6 +89,7 @@ const ( OpClearLocks Operation = "clearLocks" OpSetKV Operation = "setKV" OpUnsetKV Operation = "unsetKV" + OpSetNodeState Operation = "setNodeState" ) type Command struct { @@ -168,6 +176,11 @@ type CommandUnsetKV struct { Key string } +type CommandSetNodeState struct { + NodeID string + State string +} + type storeData struct { Version uint64 Process map[string]Process // processid -> process @@ -188,6 +201,8 @@ type storeData struct { Locks map[string]time.Time KVS map[string]Value + + Nodes map[string]Node } func (s *storeData) init() { @@ -204,6 +219,7 @@ func (s *storeData) init() { s.Policies.Policies = map[string][]access.Policy{} s.Locks = map[string]time.Time{} s.KVS = map[string]Value{} + s.Nodes = map[string]Node{} } // store implements a raft.FSM @@ -430,6 +446,14 @@ func (s *store) applyCommand(c Command) error { } err = s.unsetKV(cmd) + case OpSetNodeState: + cmd := CommandSetNodeState{} + err = decodeCommand(&cmd, c.Data) + if err != nil { + break + } + + err = s.setNodeState(cmd) default: s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation") err = fmt.Errorf("unknown operation: %s", c.Operation) diff --git a/docs/docs.go b/docs/docs.go index b83d6569..5f9ab7e8 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -209,6 +209,35 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/db/node": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of nodes in the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List nodes in the cluster DB", + "operationId": "cluster-3-db-list-nodes", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterStoreNode" + } + } + } + } + } + }, "/api/v3/cluster/db/policies": { "get": { "security": [ @@ -1381,6 +1410,107 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/node/{id}/state": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Get the state of a node with the given ID", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Get the state of a node with the given ID", + "operationId": "cluster-3-get-node-state", + "parameters": [ + { + "type": "string", + "description": "Node ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ClusterNodeState" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + }, + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Set the state of a node with the given ID", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Set the state of a node with the given ID", + "operationId": "cluster-3-set-node-state", + "parameters": [ + { + "type": "string", + "description": "Node ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "State", + "name": "config", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ClusterNodeState" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/cluster/node/{id}/version": { "get": { "security": [ @@ -5135,6 +5265,22 @@ const docTemplate = `{ } } }, + "api.ClusterNodeState": { + "type": "object", + "required": [ + "state" + ], + "properties": { + "state": { + "type": "string", + "enum": [ + "online", + "maintenance", + "leave" + ] + } + } + }, "api.ClusterProcessMap": { "type": "object", "additionalProperties": { @@ -5179,6 +5325,20 @@ const docTemplate = `{ } } }, + "api.ClusterStoreNode": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "state": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, "api.Command": { "type": "object", "required": [ diff --git a/docs/swagger.json b/docs/swagger.json index f542859c..06ac2d39 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -201,6 +201,35 @@ } } }, + "/api/v3/cluster/db/node": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of nodes in the cluster DB", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List nodes in the cluster DB", + "operationId": "cluster-3-db-list-nodes", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterStoreNode" + } + } + } + } + } + }, "/api/v3/cluster/db/policies": { "get": { "security": [ @@ -1373,6 +1402,107 @@ } } }, + "/api/v3/cluster/node/{id}/state": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Get the state of a node with the given ID", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Get the state of a node with the given ID", + "operationId": "cluster-3-get-node-state", + "parameters": [ + { + "type": "string", + "description": "Node ID", + "name": "id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ClusterNodeState" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + }, + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Set the state of a node with the given ID", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Set the state of a node with the given ID", + "operationId": "cluster-3-set-node-state", + "parameters": [ + { + "type": "string", + "description": "Node ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "State", + "name": "config", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ClusterNodeState" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/cluster/node/{id}/version": { "get": { "security": [ @@ -5127,6 +5257,22 @@ } } }, + "api.ClusterNodeState": { + "type": "object", + "required": [ + "state" + ], + "properties": { + "state": { + "type": "string", + "enum": [ + "online", + "maintenance", + "leave" + ] + } + } + }, "api.ClusterProcessMap": { "type": "object", "additionalProperties": { @@ -5171,6 +5317,20 @@ } } }, + "api.ClusterStoreNode": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "state": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, "api.Command": { "type": "object", "required": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 39105241..25f3cb48 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -207,6 +207,17 @@ definitions: ncpu: type: number type: object + api.ClusterNodeState: + properties: + state: + enum: + - online + - maintenance + - leave + type: string + required: + - state + type: object api.ClusterProcessMap: additionalProperties: type: string @@ -236,6 +247,15 @@ definitions: state: type: string type: object + api.ClusterStoreNode: + properties: + id: + type: string + state: + type: string + updated_at: + type: string + type: object api.Command: properties: command: @@ -2705,6 +2725,24 @@ paths: summary: List locks in the cluster DB tags: - v16.?.? + /api/v3/cluster/db/node: + get: + description: List of nodes in the cluster DB + operationId: cluster-3-db-list-nodes + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/api.ClusterStoreNode' + type: array + security: + - ApiKeyAuth: [] + summary: List nodes in the cluster DB + tags: + - v16.?.? /api/v3/cluster/db/policies: get: description: List of policies in the cluster @@ -3467,6 +3505,71 @@ paths: summary: List of processes in the cluster on a node tags: - v16.?.? + /api/v3/cluster/node/{id}/state: + get: + description: Get the state of a node with the given ID + operationId: cluster-3-get-node-state + parameters: + - description: Node ID + in: path + name: id + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.ClusterNodeState' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Get the state of a node with the given ID + tags: + - v16.?.? + put: + description: Set the state of a node with the given ID + operationId: cluster-3-set-node-state + parameters: + - description: Node ID + in: path + name: id + required: true + type: string + - description: State + in: body + name: config + required: true + schema: + $ref: '#/definitions/api.ClusterNodeState' + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Set the state of a node with the given ID + tags: + - v16.?.? /api/v3/cluster/node/{id}/version: get: description: List a proxy node by its ID diff --git a/http/api/cluster.go b/http/api/cluster.go index 67a8d45c..29a896b3 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -25,6 +25,10 @@ type ClusterNode struct { Resources ClusterNodeResources `json:"resources"` } +type ClusterNodeState struct { + State string `json:"state" validate:"required" enums:"online,maintenance,leave" jsonschema:"enum=online,enum=maintenance,enum=leave"` +} + type ClusterNodeCore struct { Address string `json:"address"` Status string `json:"status"` @@ -94,3 +98,9 @@ type ClusterProcessReallocate struct { TargetNodeID string `json:"target_node_id"` Processes []ProcessID `json:"process_ids"` } + +type ClusterStoreNode struct { + ID string `json:"id"` + State string `json:"state"` + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/http/handler/api/cluster_node.go b/http/handler/api/cluster_node.go index d01c67f1..1ef0f2b0 100644 --- a/http/handler/api/cluster_node.go +++ b/http/handler/api/cluster_node.go @@ -1,12 +1,14 @@ package api import ( + "errors" "net/http" "sort" "strings" "time" clientapi "github.com/datarhei/core-client-go/v16/api" + "github.com/datarhei/core/v16/cluster" "github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" @@ -26,9 +28,17 @@ import ( func (h *ClusterHandler) GetNodes(c echo.Context) error { about, _ := h.cluster.About() + nodes := h.cluster.ListNodes() + list := []api.ClusterNode{} for _, node := range about.Nodes { + if dbnode, hasNode := nodes[node.ID]; hasNode { + if dbnode.State == "maintenance" { + node.Status = dbnode.State + } + } + list = append(list, h.marshalClusterNode(node)) } @@ -51,11 +61,19 @@ func (h *ClusterHandler) GetNode(c echo.Context) error { about, _ := h.cluster.About() + nodes := h.cluster.ListNodes() + for _, node := range about.Nodes { if node.ID != id { continue } + if dbnode, hasNode := nodes[node.ID]; hasNode { + if dbnode.State == "maintenance" { + node.Status = dbnode.State + } + } + return c.JSON(http.StatusOK, h.marshalClusterNode(node)) } @@ -365,3 +383,108 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { return c.JSON(http.StatusOK, processes) } + +// GetNodeState returns the state of a node with the given ID +// @Summary Get the state of a node with the given ID +// @Description Get the state of a node with the given ID +// @Tags v16.?.? +// @ID cluster-3-get-node-state +// @Produce json +// @Param id path string true "Node ID" +// @Success 200 {object} api.ClusterNodeState +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/node/{id}/state [get] +func (h *ClusterHandler) GetNodeState(c echo.Context) error { + id := util.PathParam(c, "id") + + about, _ := h.cluster.About() + + state := "" + for _, node := range about.Nodes { + if node.ID != id { + continue + } + + state = node.Status + break + } + + if len(state) == 0 { + return api.Err(http.StatusNotFound, "", "node not found") + } + + nodes := h.cluster.ListNodes() + if node, hasNode := nodes[id]; hasNode { + if node.State == "maintenance" { + state = node.State + } + } + + return c.JSON(http.StatusOK, api.ClusterNodeState{ + State: state, + }) +} + +// SetNodeState sets the state of a node with the given ID +// @Summary Set the state of a node with the given ID +// @Description Set the state of a node with the given ID +// @Tags v16.?.? +// @ID cluster-3-set-node-state +// @Produce json +// @Param id path string true "Node ID" +// @Param config body api.ClusterNodeState true "State" +// @Success 200 {string} string +// @Failure 400 {object} api.Error +// @Failure 404 {object} api.Error +// @Failure 500 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/node/{id}/state [put] +func (h *ClusterHandler) SetNodeState(c echo.Context) error { + id := util.PathParam(c, "id") + + about, _ := h.cluster.About() + + found := false + for _, node := range about.Nodes { + if node.ID != id { + continue + } + + found = true + break + } + + if !found { + return api.Err(http.StatusNotFound, "", "node not found") + } + + state := api.ClusterNodeState{} + + if err := util.ShouldBindJSON(c, &state); err != nil { + return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error()) + } + + if state.State == "leave" { + err := h.cluster.Leave("", id) + if err != nil { + if errors.Is(err, cluster.ErrUnknownNode) { + return api.Err(http.StatusNotFound, "", "node not found") + } + + return api.Err(http.StatusInternalServerError, "", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") + } + + err := h.cluster.SetNodeState("", id, state.State) + if err != nil { + if errors.Is(err, cluster.ErrUnsupportedNodeState) { + return api.Err(http.StatusBadRequest, "", "%s", err.Error()) + } + return api.Err(http.StatusInternalServerError, "", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} diff --git a/http/handler/api/cluster_store.go b/http/handler/api/cluster_store.go index d2a56798..5c7d121b 100644 --- a/http/handler/api/cluster_store.go +++ b/http/handler/api/cluster_store.go @@ -259,3 +259,28 @@ func (h *ClusterHandler) ListStoreKV(c echo.Context) error { return c.JSON(http.StatusOK, kvs) } + +// ListStoreNodes returns the list of stored node metadata +// @Summary List nodes in the cluster DB +// @Description List of nodes in the cluster DB +// @Tags v16.?.? +// @ID cluster-3-db-list-nodes +// @Produce json +// @Success 200 {array} api.ClusterStoreNode +// @Security ApiKeyAuth +// @Router /api/v3/cluster/db/node [get] +func (h *ClusterHandler) ListStoreNodes(c echo.Context) error { + clusternodes := h.cluster.ListNodes() + + nodes := []api.ClusterStoreNode{} + + for nodeid, v := range clusternodes { + nodes = append(nodes, api.ClusterStoreNode{ + ID: nodeid, + State: v.State, + UpdatedAt: v.UpdatedAt, + }) + } + + return c.JSON(http.StatusOK, nodes) +} diff --git a/http/server.go b/http/server.go index da3c18e4..b30782ca 100644 --- a/http/server.go +++ b/http/server.go @@ -736,6 +736,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/db/locks", s.v3handler.cluster.ListStoreLocks) v3.GET("/cluster/db/kv", s.v3handler.cluster.ListStoreKV) v3.GET("/cluster/db/map/process", s.v3handler.cluster.GetStoreProcessNodeMap) + v3.GET("/cluster/db/node", s.v3handler.cluster.ListStoreNodes) v3.GET("/cluster/iam/user", s.v3handler.cluster.ListIdentities) v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity) @@ -753,6 +754,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSGetFile) v3.GET("/cluster/node/:id/process", s.v3handler.cluster.ListNodeProcesses) v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion) + v3.GET("/cluster/node/:id/state", s.v3handler.cluster.GetNodeState) v3.GET("/cluster/fs/:storage", s.v3handler.cluster.ListFiles) @@ -772,6 +774,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.DELETE("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSDeleteFile) v3.PUT("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSPutFile) + v3.PUT("/cluster/node/:id/state", s.v3handler.cluster.SetNodeState) v3.PUT("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM) v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity)