Add API for setting node status, respect it in leader tasks

This commit is contained in:
Ingo Oppermann
2024-06-24 16:50:15 +02:00
parent 166e313642
commit c032cdf5c7
19 changed files with 1219 additions and 126 deletions

View File

@@ -15,6 +15,7 @@ package cluster
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io/fs" "io/fs"
"net/http" "net/http"
@@ -127,6 +128,8 @@ func NewAPI(config APIConfig) (API, error) {
a.router.GET("/v1/kv/:key", a.GetKV) a.router.GET("/v1/kv/:key", a.GetKV)
a.router.DELETE("/v1/kv/:key", a.UnsetKV) a.router.DELETE("/v1/kv/:key", a.UnsetKV)
a.router.PUT("/v1/node/:id/state", a.SetNodeState)
a.router.GET("/v1/core", a.CoreAPIAddress) a.router.GET("/v1/core", a.CoreAPIAddress)
a.router.GET("/v1/core/config", a.CoreConfig) a.router.GET("/v1/core/config", a.CoreConfig)
a.router.GET("/v1/core/skills", a.CoreSkills) a.router.GET("/v1/core/skills", a.CoreSkills)
@@ -920,6 +923,55 @@ func (a *api) GetKV(c echo.Context) error {
return c.JSON(http.StatusOK, res) return c.JSON(http.StatusOK, res)
} }
// SetNodeState sets a state for a node
// @Summary Set a state for a node
// @Description Set a state for a node
// @Tags v1.0.0
// @ID cluster-1-node-set-state
// @Produce json
// @Param data body client.SetNodeStateRequest true "Set node state request"
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Failure 400 {object} Error
// @Failure 500 {object} Error
// @Failure 508 {object} Error
// @Router /v1/node/{id}/state [get]
func (a *api) SetNodeState(c echo.Context) error {
nodeid := util.PathParam(c, "id")
r := client.SetNodeStateRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
a.logger.Debug().WithFields(log.Fields{
"node": nodeid,
"state": r.State,
}).Log("Set node state")
err := a.cluster.SetNodeState(origin, nodeid, r.State)
if err != nil {
a.logger.Debug().WithError(err).WithFields(log.Fields{
"node": nodeid,
"state": r.State,
}).Log("Unable to set state")
if errors.Is(err, ErrUnsupportedNodeState) {
return Err(http.StatusBadRequest, "", "%s: %s", err.Error(), r.State)
}
return Err(http.StatusInternalServerError, "", "unable to set state: %s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}
// Error represents an error response of the API // Error represents an error response of the API
type Error struct { type Error struct {
Code int `json:"code" jsonschema:"required" format:"int"` Code int `json:"code" jsonschema:"required" format:"int"`

View File

@@ -70,6 +70,10 @@ type GetKVResponse struct {
UpdatedAt time.Time `json:"updated_at"` UpdatedAt time.Time `json:"updated_at"`
} }
type SetNodeStateRequest struct {
State string `json:"state"`
}
type APIClient struct { type APIClient struct {
Address string Address string
Client *http.Client Client *http.Client
@@ -333,6 +337,17 @@ func (c *APIClient) GetKV(origin string, key string) (string, time.Time, error)
return res.Value, res.UpdatedAt, nil return res.Value, res.UpdatedAt, nil
} }
func (c *APIClient) SetNodeState(origin string, nodeid string, r SetNodeStateRequest) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
_, err = c.call(http.MethodPut, "/v1/node/"+url.PathEscape(nodeid)+"/state", "application/json", bytes.NewReader(data), origin)
return err
}
func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) { func (c *APIClient) Snapshot(origin string) (io.ReadCloser, error) {
return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin) return c.stream(http.MethodGet, "/v1/snapshot", "", nil, origin)
} }

View File

@@ -90,6 +90,9 @@ type Cluster interface {
GetKV(origin, key string, stale bool) (string, time.Time, error) GetKV(origin, key string, stale bool) (string, time.Time, error)
ListKV(prefix string) map[string]store.Value ListKV(prefix string) map[string]store.Value
ListNodes() map[string]store.Node
SetNodeState(origin, id, state string) error
ProxyReader() proxy.ProxyReader ProxyReader() proxy.ProxyReader
CertManager() autocert.Manager CertManager() autocert.Manager
} }
@@ -1476,6 +1479,8 @@ func (c *cluster) About() (ClusterAbout, error) {
} }
} }
storeNodes := c.ListNodes()
c.nodesLock.RLock() c.nodesLock.RLock()
for id, node := range c.nodes { for id, node := range c.nodes {
nodeAbout := node.About() nodeAbout := node.About()
@@ -1515,6 +1520,12 @@ func (c *cluster) About() (ClusterAbout, error) {
node.Leader = s.Leader node.Leader = s.Leader
} }
if storeNode, hasStoreNode := storeNodes[id]; hasStoreNode {
if storeNode.State == "maintenance" {
node.Status = storeNode.State
}
}
about.Nodes = append(about.Nodes, node) about.Nodes = append(about.Nodes, node)
} }
c.nodesLock.RUnlock() c.nodesLock.RUnlock()

View File

@@ -626,6 +626,62 @@ const docTemplateClusterAPI = `{
} }
} }
}, },
"/v1/node/{id}/state": {
"get": {
"description": "Set a state for a node",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Set a state for a node",
"operationId": "cluster-1-node-set-state",
"parameters": [
{
"description": "Set node state request",
"name": "data",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/client.SetNodeStateRequest"
}
},
{
"type": "string",
"description": "Origin ID of request",
"name": "X-Cluster-Origin",
"in": "header"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
},
"/v1/process": { "/v1/process": {
"post": { "post": {
"description": "Add a process to the cluster DB", "description": "Add a process to the cluster DB",
@@ -1347,6 +1403,14 @@ const docTemplateClusterAPI = `{
} }
} }
}, },
"client.SetNodeStateRequest": {
"type": "object",
"properties": {
"state": {
"type": "string"
}
}
},
"client.SetPoliciesRequest": { "client.SetPoliciesRequest": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -618,6 +618,62 @@
} }
} }
}, },
"/v1/node/{id}/state": {
"get": {
"description": "Set a state for a node",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Set a state for a node",
"operationId": "cluster-1-node-set-state",
"parameters": [
{
"description": "Set node state request",
"name": "data",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/client.SetNodeStateRequest"
}
},
{
"type": "string",
"description": "Origin ID of request",
"name": "X-Cluster-Origin",
"in": "header"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
},
"/v1/process": { "/v1/process": {
"post": { "post": {
"description": "Add a process to the cluster DB", "description": "Add a process to the cluster DB",
@@ -1339,6 +1395,14 @@
} }
} }
}, },
"client.SetNodeStateRequest": {
"type": "object",
"properties": {
"state": {
"type": "string"
}
}
},
"client.SetPoliciesRequest": { "client.SetPoliciesRequest": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -136,6 +136,11 @@ definitions:
value: value:
type: string type: string
type: object type: object
client.SetNodeStateRequest:
properties:
state:
type: string
type: object
client.SetPoliciesRequest: client.SetPoliciesRequest:
properties: properties:
policies: policies:
@@ -1275,6 +1280,43 @@ paths:
summary: Remove a lock summary: Remove a lock
tags: tags:
- v1.0.0 - v1.0.0
/v1/node/{id}/state:
get:
description: Set a state for a node
operationId: cluster-1-node-set-state
parameters:
- description: Set node state request
in: body
name: data
required: true
schema:
$ref: '#/definitions/client.SetNodeStateRequest'
- description: Origin ID of request
in: header
name: X-Cluster-Origin
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/cluster.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/cluster.Error'
"508":
description: Loop Detected
schema:
$ref: '#/definitions/cluster.Error'
summary: Set a state for a node
tags:
- v1.0.0
/v1/process: /v1/process:
post: post:
consumes: consumes:

View File

@@ -41,6 +41,8 @@ type Forwarder interface {
SetKV(origin, key, value string) error SetKV(origin, key, value string) error
UnsetKV(origin, key string) error UnsetKV(origin, key string) error
GetKV(origin, key string) (string, time.Time, error) GetKV(origin, key string) (string, time.Time, error)
SetNodeState(origin, nodeid, state string) error
} }
type forwarder struct { type forwarder struct {
@@ -383,3 +385,19 @@ func (f *forwarder) GetKV(origin, key string) (string, time.Time, error) {
return client.GetKV(origin, key) return client.GetKV(origin, key)
} }
func (f *forwarder) SetNodeState(origin, nodeid, state string) error {
if origin == "" {
origin = f.id
}
r := apiclient.SetNodeStateRequest{
State: state,
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.SetNodeState(origin, nodeid, r)
}

View File

@@ -400,6 +400,7 @@ func (c *cluster) clearLocks(ctx context.Context, interval time.Duration) {
var errNotEnoughResourcesForDeployment = errors.New("no node with enough resources for deployment is available") var errNotEnoughResourcesForDeployment = errors.New("no node with enough resources for deployment is available")
var errNotEnoughResourcesForRebalancing = errors.New("no node with enough resources for rebalancing is available") var errNotEnoughResourcesForRebalancing = errors.New("no node with enough resources for rebalancing is available")
var errNotEnoughResourcesForRelocating = errors.New("no node with enough resources for relocating is available")
var errNoLimitsDefined = errors.New("this process has no limits defined") var errNoLimitsDefined = errors.New("this process has no limits defined")
type processOpDelete struct { type processOpDelete struct {
@@ -672,6 +673,7 @@ func (c *cluster) applyOpStack(stack []interface{}, term uint64) []processOpErro
func (c *cluster) doSynchronize(emergency bool, term uint64) { func (c *cluster) doSynchronize(emergency bool, term uint64) {
wish := c.store.GetProcessNodeMap() wish := c.store.GetProcessNodeMap()
want := c.store.ListProcesses() want := c.store.ListProcesses()
storeNodes := c.store.ListNodes()
have := c.proxy.ListProxyProcesses() have := c.proxy.ListProxyProcesses()
nodes := c.proxy.ListNodes() nodes := c.proxy.ListNodes()
@@ -683,6 +685,11 @@ func (c *cluster) doSynchronize(emergency bool, term uint64) {
for _, node := range nodes { for _, node := range nodes {
about := node.About() about := node.About()
if storeNode, hasStoreNode := storeNodes[about.ID]; hasStoreNode {
about.State = storeNode.State
}
nodesMap[about.ID] = about nodesMap[about.ID] = about
} }
@@ -752,6 +759,7 @@ func (c *cluster) doRebalance(emergency bool, term uint64) {
logger.Debug().WithField("emergency", emergency).Log("Rebalancing") logger.Debug().WithField("emergency", emergency).Log("Rebalancing")
storeNodes := c.store.ListNodes()
have := c.proxy.ListProxyProcesses() have := c.proxy.ListProxyProcesses()
nodes := c.proxy.ListNodes() nodes := c.proxy.ListNodes()
@@ -759,6 +767,11 @@ func (c *cluster) doRebalance(emergency bool, term uint64) {
for _, node := range nodes { for _, node := range nodes {
about := node.About() about := node.About()
if storeNode, hasStoreNode := storeNodes[about.ID]; hasStoreNode {
about.State = storeNode.State
}
nodesMap[about.ID] = about nodesMap[about.ID] = about
} }
@@ -815,6 +828,7 @@ func (c *cluster) doRelocate(emergency bool, term uint64) {
logger.Debug().WithField("emergency", emergency).Log("Relocating") logger.Debug().WithField("emergency", emergency).Log("Relocating")
relocateMap := c.store.GetProcessRelocateMap() relocateMap := c.store.GetProcessRelocateMap()
storeNodes := c.store.ListNodes()
have := c.proxy.ListProxyProcesses() have := c.proxy.ListProxyProcesses()
nodes := c.proxy.ListNodes() nodes := c.proxy.ListNodes()
@@ -822,6 +836,11 @@ func (c *cluster) doRelocate(emergency bool, term uint64) {
for _, node := range nodes { for _, node := range nodes {
about := node.About() about := node.About()
if storeNode, hasStoreNode := storeNodes[about.ID]; hasStoreNode {
about.State = storeNode.State
}
nodesMap[about.ID] = about nodesMap[about.ID] = about
} }
@@ -933,10 +952,7 @@ func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string
// synchronize returns a list of operations in order to adjust the "have" list to the "want" list // synchronize returns a list of operations in order to adjust the "have" list to the "want" list
// with taking the available resources on each node into account. // with taking the available resources on each node into account.
func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) { func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) {
resources := map[string]proxy.NodeResources{} resources := NewResources(nodes)
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
// A map same as wish, but reflecting the actual situation. // A map same as wish, but reflecting the actual situation.
reality := map[string]string{} reality := map[string]string{}
@@ -967,12 +983,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
processid: haveP.Config.ProcessID(), processid: haveP.Config.ProcessID(),
}) })
r, ok := resources[haveP.NodeID] resources.Remove(haveP.NodeID, haveP.CPU, haveP.Mem)
if ok {
r.CPU -= haveP.CPU
r.Mem -= haveP.Mem
resources[haveP.NodeID] = r
}
continue continue
} }
@@ -1007,12 +1018,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
}) })
// Release the resources. // Release the resources.
r, ok := resources[haveP.NodeID] resources.Remove(haveP.NodeID, haveP.CPU, haveP.Mem)
if ok {
r.CPU -= haveP.CPU
r.Mem -= haveP.Mem
resources[haveP.NodeID] = r
}
} }
} }
@@ -1022,12 +1028,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
for _, haveP := range wantOrderStart { for _, haveP := range wantOrderStart {
nodeid := haveP.NodeID nodeid := haveP.NodeID
r, ok := resources[nodeid] resources.Add(nodeid, haveP.Config.LimitCPU, haveP.Config.LimitMemory)
if ok {
// Consume the resources.
r.CPU += haveP.Config.LimitCPU
r.Mem += haveP.Config.LimitMemory
resources[nodeid] = r
// TODO: check if the current node has actually enough resources available, // TODO: check if the current node has actually enough resources available,
// otherwise it needs to be moved somewhere else. If the node doesn't // otherwise it needs to be moved somewhere else. If the node doesn't
@@ -1071,7 +1072,6 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
} }
} }
*/ */
}
opStack = append(opStack, processOpStart{ opStack = append(opStack, processOpStart{
nodeid: nodeid, nodeid: nodeid,
@@ -1140,7 +1140,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Try to add the process to a node where other processes with the same reference currently reside. // Try to add the process to a node where other processes with the same reference currently reside.
raNodes := haveReferenceAffinity.Nodes(wantP.Config.Reference, wantP.Config.Domain) raNodes := haveReferenceAffinity.Nodes(wantP.Config.Reference, wantP.Config.Domain)
for _, raNodeid := range raNodes { for _, raNodeid := range raNodes {
if hasNodeEnoughResources(resources[raNodeid], wantP.Config.LimitCPU, wantP.Config.LimitMemory) { if resources.HasNodeEnough(raNodeid, wantP.Config.LimitCPU, wantP.Config.LimitMemory) {
nodeid = raNodeid nodeid = raNodeid
break break
} }
@@ -1148,7 +1148,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// Find the node with the most resources available. // Find the node with the most resources available.
if len(nodeid) == 0 { if len(nodeid) == 0 {
nodes := findBestNodesForProcess(resources, wantP.Config.LimitCPU, wantP.Config.LimitMemory) nodes := resources.FindBestNodes(wantP.Config.LimitCPU, wantP.Config.LimitMemory)
if len(nodes) > 0 { if len(nodes) > 0 {
nodeid = nodes[0] nodeid = nodes[0]
} }
@@ -1163,12 +1163,7 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
}) })
// Consume the resources // Consume the resources
r, ok := resources[nodeid] resources.Add(nodeid, wantP.Config.LimitCPU, wantP.Config.LimitMemory)
if ok {
r.CPU += wantP.Config.LimitCPU
r.Mem += wantP.Config.LimitMemory
resources[nodeid] = r
}
reality[pid] = nodeid reality[pid] = nodeid
@@ -1181,26 +1176,56 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
} }
} }
return opStack, resources, reality return opStack, resources.Map(), reality
} }
// hasNodeEnoughResources returns whether a node has enough resources available for the type resources struct {
nodes map[string]proxy.NodeResources
blocked map[string]struct{}
}
func NewResources(nodes map[string]proxy.NodeAbout) *resources {
r := &resources{
nodes: map[string]proxy.NodeResources{},
blocked: map[string]struct{}{},
}
for nodeid, about := range nodes {
r.nodes[nodeid] = about.Resources
if about.State != "connected" {
r.blocked[nodeid] = struct{}{}
}
}
return r
}
// HasNodeEnough returns whether a node has enough resources available for the
// requested cpu and memory consumption. // requested cpu and memory consumption.
func hasNodeEnoughResources(r proxy.NodeResources, cpu float64, mem uint64) bool { func (r *resources) HasNodeEnough(nodeid string, cpu float64, mem uint64) bool {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { res, hasNode := r.nodes[nodeid]
if !hasNode {
return false
}
if _, hasNode := r.blocked[nodeid]; hasNode {
return false
}
if res.CPU+cpu < res.CPULimit && res.Mem+mem < res.MemLimit && !res.IsThrottling {
return true return true
} }
return false return false
} }
// findBestNodeForProcess returns an array of nodeids that can fit the requested cpu and memory requirements. If no // FindBestNodes returns an array of nodeids that can fit the requested cpu and memory requirements. If no
// such node is available, an empty array is returned. The array is sorted by the most suitable node first. // such node is available, an empty array is returned. The array is sorted by the most suitable node first.
func findBestNodesForProcess(resources map[string]proxy.NodeResources, cpu float64, mem uint64) []string { func (r *resources) FindBestNodes(cpu float64, mem uint64) []string {
nodes := []string{} nodes := []string{}
for id, r := range resources { for id := range r.nodes {
if hasNodeEnoughResources(r, cpu, mem) { if r.HasNodeEnough(id, cpu, mem) {
nodes = append(nodes, id) nodes = append(nodes, id)
} }
} }
@@ -1208,16 +1233,57 @@ func findBestNodesForProcess(resources map[string]proxy.NodeResources, cpu float
sort.SliceStable(nodes, func(i, j int) bool { sort.SliceStable(nodes, func(i, j int) bool {
nodeA, nodeB := nodes[i], nodes[j] nodeA, nodeB := nodes[i], nodes[j]
if resources[nodeA].CPU < resources[nodeB].CPU && resources[nodeA].Mem <= resources[nodeB].Mem { if r.nodes[nodeA].CPU != r.nodes[nodeB].CPU {
return true return r.nodes[nodeA].CPU < r.nodes[nodeB].CPU
} }
return false return r.nodes[nodeA].Mem <= r.nodes[nodeB].Mem
}) })
return nodes return nodes
} }
// Add adds the resources of the node according to the cpu and memory utilization.
func (r *resources) Add(nodeid string, cpu float64, mem uint64) {
res, hasRes := r.nodes[nodeid]
if !hasRes {
return
}
res.CPU += cpu
res.Mem += mem
r.nodes[nodeid] = res
}
// Remove subtracts the resources from the node according to the cpu and memory utilization.
func (r *resources) Remove(nodeid string, cpu float64, mem uint64) {
res, hasRes := r.nodes[nodeid]
if !hasRes {
return
}
res.CPU -= cpu
if res.CPU < 0 {
res.CPU = 0
}
if mem >= res.Mem {
res.Mem = 0
} else {
res.Mem -= mem
}
r.nodes[nodeid] = res
}
// Move adjusts the resources from the target and source node according to the cpu and memory utilization.
func (r *resources) Move(target, source string, cpu float64, mem uint64) {
r.Add(target, cpu, mem)
r.Remove(source, cpu, mem)
}
func (r *resources) Map() map[string]proxy.NodeResources {
return r.nodes
}
type referenceAffinityNodeCount struct { type referenceAffinityNodeCount struct {
nodeid string nodeid string
count uint64 count uint64
@@ -1394,10 +1460,7 @@ func (ra *referenceAffinity) Move(reference, domain, fromnodeid, tonodeid string
// rebalance returns a list of operations that will move running processes away from nodes that are overloaded. // rebalance returns a list of operations that will move running processes away from nodes that are overloaded.
func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) { func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) {
resources := map[string]proxy.NodeResources{} resources := NewResources(nodes)
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
// Group all running processes by node and sort them by their runtime in ascending order. // Group all running processes by node and sort them by their runtime in ascending order.
nodeProcessMap := createNodeProcessMap(have) nodeProcessMap := createNodeProcessMap(have)
@@ -1444,8 +1507,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
continue continue
} }
r := resources[raNodeid] if resources.HasNodeEnough(raNodeid, p.CPU, p.Mem) {
if hasNodeEnoughResources(r, p.CPU, p.Mem) {
availableNodeid = raNodeid availableNodeid = raNodeid
break break
} }
@@ -1454,7 +1516,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
// Find the best node with enough resources available. // Find the best node with enough resources available.
if len(availableNodeid) == 0 { if len(availableNodeid) == 0 {
nodes := findBestNodesForProcess(resources, p.CPU, p.Mem) nodes := resources.FindBestNodes(p.CPU, p.Mem)
for _, nodeid := range nodes { for _, nodeid := range nodes {
if nodeid == overloadedNodeid { if nodeid == overloadedNodeid {
continue continue
@@ -1487,7 +1549,7 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
processes[i] = p processes[i] = p
// Adjust the resources. // Adjust the resources.
resources = adjustResources(resources, availableNodeid, overloadedNodeid, p.CPU, p.Mem) resources.Move(availableNodeid, overloadedNodeid, p.CPU, p.Mem)
// Adjust the reference affinity. // Adjust the reference affinity.
haveReferenceAffinity.Move(p.Config.Reference, p.Config.Domain, overloadedNodeid, availableNodeid) haveReferenceAffinity.Move(p.Config.Reference, p.Config.Domain, overloadedNodeid, availableNodeid)
@@ -1497,15 +1559,12 @@ func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interf
} }
} }
return opStack, resources return opStack, resources.Map()
} }
// relocate returns a list of operations that will move deployed processes to different nodes. // relocate returns a list of operations that will move deployed processes to different nodes.
func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMap map[string]string) ([]interface{}, map[string]proxy.NodeResources, []string) { func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMap map[string]string) ([]interface{}, map[string]proxy.NodeResources, []string) {
resources := map[string]proxy.NodeResources{} resources := NewResources(nodes)
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
relocatedProcessIDs := []string{} relocatedProcessIDs := []string{}
@@ -1542,7 +1601,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
if len(targetNodeid) != 0 { if len(targetNodeid) != 0 {
_, hasNode := nodes[targetNodeid] _, hasNode := nodes[targetNodeid]
if !hasNode || !hasNodeEnoughResources(nodes[targetNodeid].Resources, process.CPU, process.Mem) { if !hasNode || !resources.HasNodeEnough(targetNodeid, process.CPU, process.Mem) {
targetNodeid = "" targetNodeid = ""
} }
} }
@@ -1558,8 +1617,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
continue continue
} }
r := resources[raNodeid] if resources.HasNodeEnough(raNodeid, process.CPU, process.Mem) {
if hasNodeEnoughResources(r, process.CPU, process.Mem) {
targetNodeid = raNodeid targetNodeid = raNodeid
break break
} }
@@ -1568,7 +1626,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
// Find the best node with enough resources available. // Find the best node with enough resources available.
if len(targetNodeid) == 0 { if len(targetNodeid) == 0 {
nodes := findBestNodesForProcess(resources, process.CPU, process.Mem) nodes := resources.FindBestNodes(process.CPU, process.Mem)
for _, nodeid := range nodes { for _, nodeid := range nodes {
if nodeid == sourceNodeid { if nodeid == sourceNodeid {
continue continue
@@ -1584,7 +1642,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
opStack = append(opStack, processOpSkip{ opStack = append(opStack, processOpSkip{
nodeid: sourceNodeid, nodeid: sourceNodeid,
processid: process.Config.ProcessID(), processid: process.Config.ProcessID(),
err: errNotEnoughResourcesForRebalancing, err: errNotEnoughResourcesForRelocating,
}) })
continue continue
} }
@@ -1599,7 +1657,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
}) })
// Adjust the resources. // Adjust the resources.
resources = adjustResources(resources, targetNodeid, sourceNodeid, process.CPU, process.Mem) resources.Move(targetNodeid, sourceNodeid, process.CPU, process.Mem)
// Adjust the reference affinity. // Adjust the reference affinity.
haveReferenceAffinity.Move(process.Config.Reference, process.Config.Domain, sourceNodeid, targetNodeid) haveReferenceAffinity.Move(process.Config.Reference, process.Config.Domain, sourceNodeid, targetNodeid)
@@ -1607,29 +1665,7 @@ func relocate(have []proxy.Process, nodes map[string]proxy.NodeAbout, relocateMa
relocatedProcessIDs = append(relocatedProcessIDs, processid) relocatedProcessIDs = append(relocatedProcessIDs, processid)
} }
return opStack, resources, relocatedProcessIDs return opStack, resources.Map(), relocatedProcessIDs
}
// adjustResources adjusts the resources from the target and source node according to the cpu and memory utilization.
func adjustResources(resources map[string]proxy.NodeResources, target, source string, cpu float64, mem uint64) map[string]proxy.NodeResources {
r := resources[target]
r.CPU += cpu
r.Mem += mem
resources[target] = r
r = resources[source]
r.CPU -= cpu
if r.CPU < 0 {
r.CPU = 0
}
if mem >= r.Mem {
r.Mem = 0
} else {
r.Mem -= mem
}
resources[source] = r
return resources
} }
// createNodeProcessMap takes a list of processes and groups them by the nodeid they // createNodeProcessMap takes a list of processes and groups them by the nodeid they

View File

@@ -2299,16 +2299,127 @@ func TestFindBestNodesForProcess(t *testing.T) {
}, },
} }
resources := map[string]proxy.NodeResources{} resources := NewResources(nodes)
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
list := findBestNodesForProcess(resources, 35, 20) list := resources.FindBestNodes(35, 20)
require.Equal(t, []string{"node3", "node2", "node1"}, list) require.Equal(t, []string{"node3", "node2", "node1"}, list)
} }
func TestFindBestNodesForProcess2(t *testing.T) {
resources := NewResources(nil)
resources.nodes = map[string]proxy.NodeResources{
"node1": {
CPULimit: 104.50000000000001,
CPU: 29.725299999999997,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 212262912,
NCPU: 1.1,
},
"node2": {
CPULimit: 104.50000000000001,
CPU: 53.576600000000006,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 805830656,
NCPU: 1.1,
},
"node3": {
CPULimit: 104.50000000000001,
CPU: 33.99000000000001,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 190910464,
NCPU: 1.1,
},
"node4": {
CPULimit: 104.50000000000001,
CPU: 31.291700000000006,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 129310720,
NCPU: 1.1,
},
"node5": {
CPULimit: 104.50000000000001,
CPU: 30.634999999999994,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 159158272,
NCPU: 1.1,
},
"node6": {
CPULimit: 104.50000000000001,
CPU: 40.368900000000004,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 212189184,
NCPU: 1.1,
},
"node7": {
CPULimit: 104.50000000000001,
CPU: 25.469399999999997,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 206098432,
NCPU: 1.1,
},
"node8": {
CPULimit: 104.50000000000001,
CPU: 22.180400000000002,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 144138240,
NCPU: 1.1,
},
"node9": {
CPULimit: 104.50000000000001,
CPU: 62.6714,
IsThrottling: true,
MemLimit: 1051931443,
Mem: 978501632,
NCPU: 1.1,
},
"node10": {
CPULimit: 104.50000000000001,
CPU: 18.7748,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 142430208,
NCPU: 1.1,
},
"node11": {
CPULimit: 104.50000000000001,
CPU: 43.807500000000005,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 368091136,
NCPU: 1.1,
},
"node12": {
CPULimit: 104.50000000000001,
CPU: 31.067299999999996,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 149897216,
NCPU: 1.1,
},
"node13": {
CPULimit: 104.50000000000001,
CPU: 35.93480000000001,
IsThrottling: false,
MemLimit: 1051931443,
Mem: 194408448,
NCPU: 1.1,
},
}
list := resources.FindBestNodes(4.0, 45*1024*1024)
require.Equal(t, []string{"node10", "node8", "node7", "node1", "node5", "node12", "node4", "node3", "node13", "node6", "node11", "node2"}, list)
}
func TestCreateNodeProcessMap(t *testing.T) { func TestCreateNodeProcessMap(t *testing.T) {
processes := []proxy.Process{ processes := []proxy.Process{
{ {

37
cluster/node.go Normal file
View File

@@ -0,0 +1,37 @@
package cluster
import (
"errors"
"github.com/datarhei/core/v16/cluster/store"
)
func (c *cluster) ListNodes() map[string]store.Node {
return c.store.ListNodes()
}
var ErrUnsupportedNodeState = errors.New("unsupported node state")
func (c *cluster) SetNodeState(origin, id, state string) error {
switch state {
case "online":
case "maintenance":
case "leave":
default:
return ErrUnsupportedNodeState
}
if !c.IsRaftLeader() {
return c.forwarder.SetNodeState(origin, id, state)
}
cmd := &store.Command{
Operation: store.OpSetNodeState,
Data: &store.CommandSetNodeState{
NodeID: id,
State: state,
},
}
return c.applyCommand(cmd)
}

35
cluster/store/node.go Normal file
View File

@@ -0,0 +1,35 @@
package store
import "time"
func (s *store) setNodeState(cmd CommandSetNodeState) error {
s.lock.Lock()
defer s.lock.Unlock()
if cmd.State == "online" {
delete(s.data.Nodes, cmd.NodeID)
return nil
}
node := s.data.Nodes[cmd.NodeID]
node.State = cmd.State
node.UpdatedAt = time.Now()
s.data.Nodes[cmd.NodeID] = node
return nil
}
func (s *store) ListNodes() map[string]Node {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]Node{}
for id, node := range s.data.Nodes {
m[id] = node
}
return m
}

View File

@@ -35,6 +35,8 @@ type Store interface {
ListKVS(prefix string) map[string]Value ListKVS(prefix string) map[string]Value
GetFromKVS(key string) (Value, error) GetFromKVS(key string) (Value, error)
ListNodes() map[string]Node
} }
type Process struct { type Process struct {
@@ -61,6 +63,11 @@ type Value struct {
UpdatedAt time.Time UpdatedAt time.Time
} }
type Node struct {
State string
UpdatedAt time.Time
}
type Operation string type Operation string
const ( const (
@@ -82,6 +89,7 @@ const (
OpClearLocks Operation = "clearLocks" OpClearLocks Operation = "clearLocks"
OpSetKV Operation = "setKV" OpSetKV Operation = "setKV"
OpUnsetKV Operation = "unsetKV" OpUnsetKV Operation = "unsetKV"
OpSetNodeState Operation = "setNodeState"
) )
type Command struct { type Command struct {
@@ -168,6 +176,11 @@ type CommandUnsetKV struct {
Key string Key string
} }
type CommandSetNodeState struct {
NodeID string
State string
}
type storeData struct { type storeData struct {
Version uint64 Version uint64
Process map[string]Process // processid -> process Process map[string]Process // processid -> process
@@ -188,6 +201,8 @@ type storeData struct {
Locks map[string]time.Time Locks map[string]time.Time
KVS map[string]Value KVS map[string]Value
Nodes map[string]Node
} }
func (s *storeData) init() { func (s *storeData) init() {
@@ -204,6 +219,7 @@ func (s *storeData) init() {
s.Policies.Policies = map[string][]access.Policy{} s.Policies.Policies = map[string][]access.Policy{}
s.Locks = map[string]time.Time{} s.Locks = map[string]time.Time{}
s.KVS = map[string]Value{} s.KVS = map[string]Value{}
s.Nodes = map[string]Node{}
} }
// store implements a raft.FSM // store implements a raft.FSM
@@ -430,6 +446,14 @@ func (s *store) applyCommand(c Command) error {
} }
err = s.unsetKV(cmd) err = s.unsetKV(cmd)
case OpSetNodeState:
cmd := CommandSetNodeState{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setNodeState(cmd)
default: default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation") s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
err = fmt.Errorf("unknown operation: %s", c.Operation) err = fmt.Errorf("unknown operation: %s", c.Operation)

View File

@@ -209,6 +209,35 @@ const docTemplate = `{
} }
} }
}, },
"/api/v3/cluster/db/node": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List of nodes in the cluster DB",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List nodes in the cluster DB",
"operationId": "cluster-3-db-list-nodes",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterStoreNode"
}
}
}
}
}
},
"/api/v3/cluster/db/policies": { "/api/v3/cluster/db/policies": {
"get": { "get": {
"security": [ "security": [
@@ -1381,6 +1410,107 @@ const docTemplate = `{
} }
} }
}, },
"/api/v3/cluster/node/{id}/state": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Get the state of a node with the given ID",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Get the state of a node with the given ID",
"operationId": "cluster-3-get-node-state",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ClusterNodeState"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Set the state of a node with the given ID",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Set the state of a node with the given ID",
"operationId": "cluster-3-set-node-state",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "State",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ClusterNodeState"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/node/{id}/version": { "/api/v3/cluster/node/{id}/version": {
"get": { "get": {
"security": [ "security": [
@@ -5135,6 +5265,22 @@ const docTemplate = `{
} }
} }
}, },
"api.ClusterNodeState": {
"type": "object",
"required": [
"state"
],
"properties": {
"state": {
"type": "string",
"enum": [
"online",
"maintenance",
"leave"
]
}
}
},
"api.ClusterProcessMap": { "api.ClusterProcessMap": {
"type": "object", "type": "object",
"additionalProperties": { "additionalProperties": {
@@ -5179,6 +5325,20 @@ const docTemplate = `{
} }
} }
}, },
"api.ClusterStoreNode": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"state": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"api.Command": { "api.Command": {
"type": "object", "type": "object",
"required": [ "required": [

View File

@@ -201,6 +201,35 @@
} }
} }
}, },
"/api/v3/cluster/db/node": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List of nodes in the cluster DB",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List nodes in the cluster DB",
"operationId": "cluster-3-db-list-nodes",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterStoreNode"
}
}
}
}
}
},
"/api/v3/cluster/db/policies": { "/api/v3/cluster/db/policies": {
"get": { "get": {
"security": [ "security": [
@@ -1373,6 +1402,107 @@
} }
} }
}, },
"/api/v3/cluster/node/{id}/state": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Get the state of a node with the given ID",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Get the state of a node with the given ID",
"operationId": "cluster-3-get-node-state",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ClusterNodeState"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Set the state of a node with the given ID",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Set the state of a node with the given ID",
"operationId": "cluster-3-set-node-state",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "State",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ClusterNodeState"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/node/{id}/version": { "/api/v3/cluster/node/{id}/version": {
"get": { "get": {
"security": [ "security": [
@@ -5127,6 +5257,22 @@
} }
} }
}, },
"api.ClusterNodeState": {
"type": "object",
"required": [
"state"
],
"properties": {
"state": {
"type": "string",
"enum": [
"online",
"maintenance",
"leave"
]
}
}
},
"api.ClusterProcessMap": { "api.ClusterProcessMap": {
"type": "object", "type": "object",
"additionalProperties": { "additionalProperties": {
@@ -5171,6 +5317,20 @@
} }
} }
}, },
"api.ClusterStoreNode": {
"type": "object",
"properties": {
"id": {
"type": "string"
},
"state": {
"type": "string"
},
"updated_at": {
"type": "string"
}
}
},
"api.Command": { "api.Command": {
"type": "object", "type": "object",
"required": [ "required": [

View File

@@ -207,6 +207,17 @@ definitions:
ncpu: ncpu:
type: number type: number
type: object type: object
api.ClusterNodeState:
properties:
state:
enum:
- online
- maintenance
- leave
type: string
required:
- state
type: object
api.ClusterProcessMap: api.ClusterProcessMap:
additionalProperties: additionalProperties:
type: string type: string
@@ -236,6 +247,15 @@ definitions:
state: state:
type: string type: string
type: object type: object
api.ClusterStoreNode:
properties:
id:
type: string
state:
type: string
updated_at:
type: string
type: object
api.Command: api.Command:
properties: properties:
command: command:
@@ -2705,6 +2725,24 @@ paths:
summary: List locks in the cluster DB summary: List locks in the cluster DB
tags: tags:
- v16.?.? - v16.?.?
/api/v3/cluster/db/node:
get:
description: List of nodes in the cluster DB
operationId: cluster-3-db-list-nodes
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/api.ClusterStoreNode'
type: array
security:
- ApiKeyAuth: []
summary: List nodes in the cluster DB
tags:
- v16.?.?
/api/v3/cluster/db/policies: /api/v3/cluster/db/policies:
get: get:
description: List of policies in the cluster description: List of policies in the cluster
@@ -3467,6 +3505,71 @@ paths:
summary: List of processes in the cluster on a node summary: List of processes in the cluster on a node
tags: tags:
- v16.?.? - v16.?.?
/api/v3/cluster/node/{id}/state:
get:
description: Get the state of a node with the given ID
operationId: cluster-3-get-node-state
parameters:
- description: Node ID
in: path
name: id
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.ClusterNodeState'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Get the state of a node with the given ID
tags:
- v16.?.?
put:
description: Set the state of a node with the given ID
operationId: cluster-3-set-node-state
parameters:
- description: Node ID
in: path
name: id
required: true
type: string
- description: State
in: body
name: config
required: true
schema:
$ref: '#/definitions/api.ClusterNodeState'
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Set the state of a node with the given ID
tags:
- v16.?.?
/api/v3/cluster/node/{id}/version: /api/v3/cluster/node/{id}/version:
get: get:
description: List a proxy node by its ID description: List a proxy node by its ID

View File

@@ -25,6 +25,10 @@ type ClusterNode struct {
Resources ClusterNodeResources `json:"resources"` Resources ClusterNodeResources `json:"resources"`
} }
type ClusterNodeState struct {
State string `json:"state" validate:"required" enums:"online,maintenance,leave" jsonschema:"enum=online,enum=maintenance,enum=leave"`
}
type ClusterNodeCore struct { type ClusterNodeCore struct {
Address string `json:"address"` Address string `json:"address"`
Status string `json:"status"` Status string `json:"status"`
@@ -94,3 +98,9 @@ type ClusterProcessReallocate struct {
TargetNodeID string `json:"target_node_id"` TargetNodeID string `json:"target_node_id"`
Processes []ProcessID `json:"process_ids"` Processes []ProcessID `json:"process_ids"`
} }
type ClusterStoreNode struct {
ID string `json:"id"`
State string `json:"state"`
UpdatedAt time.Time `json:"updated_at"`
}

View File

@@ -1,12 +1,14 @@
package api package api
import ( import (
"errors"
"net/http" "net/http"
"sort" "sort"
"strings" "strings"
"time" "time"
clientapi "github.com/datarhei/core-client-go/v16/api" clientapi "github.com/datarhei/core-client-go/v16/api"
"github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/cluster/proxy" "github.com/datarhei/core/v16/cluster/proxy"
"github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util" "github.com/datarhei/core/v16/http/handler/util"
@@ -26,9 +28,17 @@ import (
func (h *ClusterHandler) GetNodes(c echo.Context) error { func (h *ClusterHandler) GetNodes(c echo.Context) error {
about, _ := h.cluster.About() about, _ := h.cluster.About()
nodes := h.cluster.ListNodes()
list := []api.ClusterNode{} list := []api.ClusterNode{}
for _, node := range about.Nodes { for _, node := range about.Nodes {
if dbnode, hasNode := nodes[node.ID]; hasNode {
if dbnode.State == "maintenance" {
node.Status = dbnode.State
}
}
list = append(list, h.marshalClusterNode(node)) list = append(list, h.marshalClusterNode(node))
} }
@@ -51,11 +61,19 @@ func (h *ClusterHandler) GetNode(c echo.Context) error {
about, _ := h.cluster.About() about, _ := h.cluster.About()
nodes := h.cluster.ListNodes()
for _, node := range about.Nodes { for _, node := range about.Nodes {
if node.ID != id { if node.ID != id {
continue continue
} }
if dbnode, hasNode := nodes[node.ID]; hasNode {
if dbnode.State == "maintenance" {
node.Status = dbnode.State
}
}
return c.JSON(http.StatusOK, h.marshalClusterNode(node)) return c.JSON(http.StatusOK, h.marshalClusterNode(node))
} }
@@ -365,3 +383,108 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error {
return c.JSON(http.StatusOK, processes) return c.JSON(http.StatusOK, processes)
} }
// GetNodeState returns the state of a node with the given ID
// @Summary Get the state of a node with the given ID
// @Description Get the state of a node with the given ID
// @Tags v16.?.?
// @ID cluster-3-get-node-state
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {object} api.ClusterNodeState
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/state [get]
func (h *ClusterHandler) GetNodeState(c echo.Context) error {
id := util.PathParam(c, "id")
about, _ := h.cluster.About()
state := ""
for _, node := range about.Nodes {
if node.ID != id {
continue
}
state = node.Status
break
}
if len(state) == 0 {
return api.Err(http.StatusNotFound, "", "node not found")
}
nodes := h.cluster.ListNodes()
if node, hasNode := nodes[id]; hasNode {
if node.State == "maintenance" {
state = node.State
}
}
return c.JSON(http.StatusOK, api.ClusterNodeState{
State: state,
})
}
// SetNodeState sets the state of a node with the given ID
// @Summary Set the state of a node with the given ID
// @Description Set the state of a node with the given ID
// @Tags v16.?.?
// @ID cluster-3-set-node-state
// @Produce json
// @Param id path string true "Node ID"
// @Param config body api.ClusterNodeState true "State"
// @Success 200 {string} string
// @Failure 400 {object} api.Error
// @Failure 404 {object} api.Error
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/state [put]
func (h *ClusterHandler) SetNodeState(c echo.Context) error {
id := util.PathParam(c, "id")
about, _ := h.cluster.About()
found := false
for _, node := range about.Nodes {
if node.ID != id {
continue
}
found = true
break
}
if !found {
return api.Err(http.StatusNotFound, "", "node not found")
}
state := api.ClusterNodeState{}
if err := util.ShouldBindJSON(c, &state); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
if state.State == "leave" {
err := h.cluster.Leave("", id)
if err != nil {
if errors.Is(err, cluster.ErrUnknownNode) {
return api.Err(http.StatusNotFound, "", "node not found")
}
return api.Err(http.StatusInternalServerError, "", "%s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}
err := h.cluster.SetNodeState("", id, state.State)
if err != nil {
if errors.Is(err, cluster.ErrUnsupportedNodeState) {
return api.Err(http.StatusBadRequest, "", "%s", err.Error())
}
return api.Err(http.StatusInternalServerError, "", "%s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}

View File

@@ -259,3 +259,28 @@ func (h *ClusterHandler) ListStoreKV(c echo.Context) error {
return c.JSON(http.StatusOK, kvs) return c.JSON(http.StatusOK, kvs)
} }
// ListStoreNodes returns the list of stored node metadata
// @Summary List nodes in the cluster DB
// @Description List of nodes in the cluster DB
// @Tags v16.?.?
// @ID cluster-3-db-list-nodes
// @Produce json
// @Success 200 {array} api.ClusterStoreNode
// @Security ApiKeyAuth
// @Router /api/v3/cluster/db/node [get]
func (h *ClusterHandler) ListStoreNodes(c echo.Context) error {
clusternodes := h.cluster.ListNodes()
nodes := []api.ClusterStoreNode{}
for nodeid, v := range clusternodes {
nodes = append(nodes, api.ClusterStoreNode{
ID: nodeid,
State: v.State,
UpdatedAt: v.UpdatedAt,
})
}
return c.JSON(http.StatusOK, nodes)
}

View File

@@ -736,6 +736,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/db/locks", s.v3handler.cluster.ListStoreLocks) v3.GET("/cluster/db/locks", s.v3handler.cluster.ListStoreLocks)
v3.GET("/cluster/db/kv", s.v3handler.cluster.ListStoreKV) v3.GET("/cluster/db/kv", s.v3handler.cluster.ListStoreKV)
v3.GET("/cluster/db/map/process", s.v3handler.cluster.GetStoreProcessNodeMap) v3.GET("/cluster/db/map/process", s.v3handler.cluster.GetStoreProcessNodeMap)
v3.GET("/cluster/db/node", s.v3handler.cluster.ListStoreNodes)
v3.GET("/cluster/iam/user", s.v3handler.cluster.ListIdentities) v3.GET("/cluster/iam/user", s.v3handler.cluster.ListIdentities)
v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity) v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity)
@@ -753,6 +754,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSGetFile) v3.GET("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSGetFile)
v3.GET("/cluster/node/:id/process", s.v3handler.cluster.ListNodeProcesses) v3.GET("/cluster/node/:id/process", s.v3handler.cluster.ListNodeProcesses)
v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion) v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion)
v3.GET("/cluster/node/:id/state", s.v3handler.cluster.GetNodeState)
v3.GET("/cluster/fs/:storage", s.v3handler.cluster.ListFiles) v3.GET("/cluster/fs/:storage", s.v3handler.cluster.ListFiles)
@@ -772,6 +774,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.DELETE("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSDeleteFile) v3.DELETE("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSDeleteFile)
v3.PUT("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSPutFile) v3.PUT("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSPutFile)
v3.PUT("/cluster/node/:id/state", s.v3handler.cluster.SetNodeState)
v3.PUT("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM) v3.PUT("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM)
v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity) v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity)