mirror of
https://github.com/datarhei/core.git
synced 2025-12-24 13:07:56 +08:00
Allow cluster leave endpoint to remove any node in the cluster
This commit is contained in:
@@ -1719,15 +1719,35 @@ func (a *api) start(ctx context.Context) error {
|
||||
|
||||
// Start the cluster
|
||||
if a.cluster != nil {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second)
|
||||
defer cancel()
|
||||
wgStart.Add(1)
|
||||
a.wgStop.Add(1)
|
||||
|
||||
err := a.cluster.Start(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start cluster: %w", err)
|
||||
}
|
||||
go func() {
|
||||
logger := a.log.logger.core
|
||||
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
logger.Info().Log("Cluster exited")
|
||||
a.wgStop.Done()
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Duration(cfg.Cluster.StartupTimeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
wgStart.Done()
|
||||
|
||||
err = a.cluster.Start(ctx)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cluster failed: %w", err)
|
||||
}
|
||||
|
||||
sendError(err)
|
||||
}()
|
||||
}
|
||||
|
||||
wgStart.Wait()
|
||||
|
||||
// Start the service
|
||||
if a.service != nil {
|
||||
a.service.Start()
|
||||
|
||||
@@ -432,6 +432,8 @@ func (c *cluster) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to setup cluster: %w", err)
|
||||
}
|
||||
|
||||
<-c.shutdownCh
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1014,6 +1016,13 @@ func (c *cluster) trackNodeChanges() {
|
||||
}
|
||||
|
||||
delete(c.nodes, id)
|
||||
/*
|
||||
if id == c.nodeID {
|
||||
c.logger.Warn().WithField("id", id).Log("This node left the cluster. Shutting down.")
|
||||
// We got removed from the cluster, shutdown
|
||||
c.Shutdown()
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
c.nodesLock.Unlock()
|
||||
@@ -1307,6 +1316,27 @@ func (c *cluster) trackLeaderChanges() {
|
||||
c.hasRaftLeader = true
|
||||
}
|
||||
c.leaderLock.Unlock()
|
||||
|
||||
servers, err := c.raft.Servers()
|
||||
if err != nil {
|
||||
c.logger.Error().WithError(err).Log("Raft configuration")
|
||||
break
|
||||
}
|
||||
|
||||
isNodeInCluster := false
|
||||
for _, server := range servers {
|
||||
if c.nodeID == server.ID {
|
||||
isNodeInCluster = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !isNodeInCluster {
|
||||
// We're not anymore part of the cluster, shutdown
|
||||
c.logger.Warn().WithField("id", c.nodeID).Log("This node left the cluster. Shutting down.")
|
||||
c.Shutdown()
|
||||
}
|
||||
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
19
docs/docs.go
19
docs/docs.go
@@ -860,6 +860,17 @@ const docTemplate = `{
|
||||
],
|
||||
"summary": "Leave the cluster gracefully",
|
||||
"operationId": "cluster-3-leave",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Node ID",
|
||||
"name": "nodeid",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ClusterNodeID"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
@@ -5043,6 +5054,14 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterNodeID": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterNodeResources": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -852,6 +852,17 @@
|
||||
],
|
||||
"summary": "Leave the cluster gracefully",
|
||||
"operationId": "cluster-3-leave",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Node ID",
|
||||
"name": "nodeid",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/api.ClusterNodeID"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
@@ -5035,6 +5046,14 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterNodeID": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterNodeResources": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -181,6 +181,11 @@ definitions:
|
||||
description: unix timestamp
|
||||
type: integer
|
||||
type: object
|
||||
api.ClusterNodeID:
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
type: object
|
||||
api.ClusterNodeResources:
|
||||
properties:
|
||||
cpu_limit:
|
||||
@@ -3093,6 +3098,13 @@ paths:
|
||||
put:
|
||||
description: Leave the cluster gracefully
|
||||
operationId: cluster-3-leave
|
||||
parameters:
|
||||
- description: Node ID
|
||||
in: body
|
||||
name: nodeid
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/definitions/api.ClusterNodeID'
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
|
||||
@@ -4,6 +4,10 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type ClusterNodeID struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
type ClusterNode struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
||||
@@ -175,13 +175,13 @@ func (h *ClusterHandler) TransferLeadership(c echo.Context) error {
|
||||
// @Tags v16.?.?
|
||||
// @ID cluster-3-leave
|
||||
// @Produce json
|
||||
// @Param nodeid body string true "Node ID"
|
||||
// @Param nodeid body api.ClusterNodeID true "Node ID"
|
||||
// @Success 200 {string} string
|
||||
// @Failure 500 {object} api.Error
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/cluster/leave [put]
|
||||
func (h *ClusterHandler) Leave(c echo.Context) error {
|
||||
nodeid := ""
|
||||
nodeid := api.ClusterNodeID{}
|
||||
|
||||
req := c.Request()
|
||||
|
||||
@@ -196,8 +196,7 @@ func (h *ClusterHandler) Leave(c echo.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
h.cluster.Leave("", nodeid)
|
||||
//h.cluster.Shutdown()
|
||||
h.cluster.Leave("", nodeid.ID)
|
||||
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
@@ -319,6 +319,10 @@ func (e *Event) WithFields(f Fields) Logger {
|
||||
}
|
||||
|
||||
func (e *Event) WithError(err error) Logger {
|
||||
if err == nil {
|
||||
return e
|
||||
}
|
||||
|
||||
return e.WithFields(Fields{
|
||||
"error": err,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user