Add /cluster/transfer/:id endpoint to transfer leadership to another node

This commit is contained in:
Ingo Oppermann
2023-07-11 14:26:54 +02:00
parent d04254c891
commit 0f06b8b5a0
14 changed files with 338 additions and 12 deletions

View File

@@ -103,6 +103,8 @@ func NewAPI(config APIConfig) (API, error) {
a.router.POST("/v1/server", a.AddServer)
a.router.DELETE("/v1/server/:id", a.RemoveServer)
a.router.PUT("/v1/transfer/:id", a.TransferLeadership)
a.router.GET("/v1/snaphot", a.Snapshot)
a.router.POST("/v1/process", a.AddProcess)
@@ -255,7 +257,41 @@ func (a *api) RemoveServer(c echo.Context) error {
err := a.cluster.Leave(origin, id)
if err != nil {
a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to leave cluster")
return Err(http.StatusInternalServerError, "", "unable to leave cluster%s", err.Error())
return Err(http.StatusInternalServerError, "", "unable to leave cluster: %s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}
// TransferLeadership transfers the leadership to another node
// @Summary Transfer leadership
// @Description Transfer leadership
// @Tags v1.0.0
// @ID cluster-1-transfer-leadership
// @Accept json
// @Produce json
// @Param X-Cluster-Origin header string false "Origin ID of request"
// @Success 200 {string} string
// @Failure 500 {object} Error
// @Failure 508 {object} Error
// @Router /v1/transfer/{id} [put]
func (a *api) TransferLeadership(c echo.Context) error {
id := util.PathParam(c, "id")
a.logger.Debug().WithFields(log.Fields{
"id": id,
}).Log("Transfer request")
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
err := a.cluster.TransferLeadership(origin, id)
if err != nil {
a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to transfer leadership")
return Err(http.StatusInternalServerError, "", "unable to transfer leadership: %s", err.Error())
}
return c.JSON(http.StatusOK, "OK")

View File

@@ -163,6 +163,12 @@ func (c *APIClient) Leave(origin string, id string) error {
return err
}
func (c *APIClient) TransferLeadership(origin, id string) error {
_, err := c.call(http.MethodPut, "/v1/transfer/"+url.PathEscape(id), "application/json", nil, origin)
return err
}
func (c *APIClient) AddProcess(origin string, r AddProcessRequest) error {
data, err := json.Marshal(r)
if err != nil {

View File

@@ -56,6 +56,7 @@ type Cluster interface {
Join(origin, id, raftAddress, peerAddress string) error
Leave(origin, id string) error // gracefully remove a node from the cluster
TransferLeadership(origin, id string) error // transfer leadership to another node
Snapshot(origin string) (io.ReadCloser, error)
ListProcesses() []store.Process
@@ -696,6 +697,10 @@ func (c *cluster) IsClusterDegraded() (bool, error) {
}
func (c *cluster) Leave(origin, id string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if len(id) == 0 {
id = c.id
}
@@ -761,7 +766,7 @@ func (c *cluster) Leave(origin, id string) error {
}
// Transfer the leadership to another server
err := c.leadershipTransfer()
err := c.leadershipTransfer("")
if err != nil {
c.logger.Warn().WithError(err).Log("Transfer leadership")
return err
@@ -830,6 +835,10 @@ func (c *cluster) Leave(origin, id string) error {
}
func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
c.logger.Debug().Log("Not leader, forwarding to leader")
return c.forwarder.Join(origin, id, raftAddress, peerAddress)
@@ -888,7 +897,24 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error {
return nil
}
func (c *cluster) TransferLeadership(origin, id string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
c.logger.Debug().Log("Not leader, forwarding to leader")
return c.forwarder.TransferLeadership(origin, id)
}
return c.leadershipTransfer(id)
}
func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) {
if ok, _ := c.IsDegraded(); ok {
return nil, ErrDegraded
}
if !c.IsRaftLeader() {
c.logger.Debug().Log("Not leader, forwarding to leader")
return c.forwarder.Snapshot(origin)

View File

@@ -1078,6 +1078,50 @@ const docTemplateClusterAPI = `{
}
}
}
},
"/v1/transfer/{id}": {
"put": {
"description": "Transfer leadership",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Transfer leadership",
"operationId": "cluster-1-transfer-leadership",
"parameters": [
{
"type": "string",
"description": "Origin ID of request",
"name": "X-Cluster-Origin",
"in": "header"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
}
},
"definitions": {

View File

@@ -1070,6 +1070,50 @@
}
}
}
},
"/v1/transfer/{id}": {
"put": {
"description": "Transfer leadership",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Transfer leadership",
"operationId": "cluster-1-transfer-leadership",
"parameters": [
{
"type": "string",
"description": "Origin ID of request",
"name": "X-Cluster-Origin",
"in": "header"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
}
},
"definitions": {

View File

@@ -1552,4 +1552,33 @@ paths:
summary: Cluster DB snapshot
tags:
- v1.0.0
/v1/transfer/{id}:
put:
consumes:
- application/json
description: Transfer leadership
operationId: cluster-1-transfer-leadership
parameters:
- description: Origin ID of request
in: header
name: X-Cluster-Origin
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/cluster.Error'
"508":
description: Loop Detected
schema:
$ref: '#/definitions/cluster.Error'
summary: Transfer leadership
tags:
- v1.0.0
swagger: "2.0"

View File

@@ -20,6 +20,7 @@ type Forwarder interface {
Join(origin, id, raftAddress, peerAddress string) error
Leave(origin, id string) error
TransferLeadership(origin, id string) error
Snapshot(origin string) (io.ReadCloser, error)
AddProcess(origin string, config *app.Config) error
@@ -138,6 +139,20 @@ func (f *forwarder) Leave(origin, id string) error {
return client.Leave(origin, id)
}
func (f *forwarder) TransferLeadership(origin, id string) error {
if origin == "" {
origin = f.id
}
f.logger.Debug().WithField("id", id).Log("Transferring leadership")
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.TransferLeadership(origin, id)
}
func (f *forwarder) Snapshot(origin string) (io.ReadCloser, error) {
f.lock.RLock()
client := f.client

View File

@@ -187,10 +187,14 @@ func (c *cluster) monitorLeadership() {
// leadershipTransfer tries to transfer the leadership to another node e.g. in order
// to do a graceful shutdown.
func (c *cluster) leadershipTransfer() error {
func (c *cluster) leadershipTransfer(id string) error {
if id == c.id {
return nil
}
retryCount := 3
for i := 0; i < retryCount; i++ {
err := c.raft.LeadershipTransfer()
err := c.raft.LeadershipTransfer(id)
if err != nil {
c.logger.Error().WithError(err).WithFields(log.Fields{
"attempt": i,
@@ -254,7 +258,7 @@ RECONCILE:
// longer the leader. If leadershipTransfer() fails, we
// will try to acquire it again after
// 5 seconds.
if err := c.leadershipTransfer(); err != nil {
if err := c.leadershipTransfer(""); err != nil {
c.logger.Error().WithError(err).Log("Transfer leadership")
interval = time.After(5 * time.Second)
goto WAIT

View File

@@ -41,7 +41,7 @@ type Raft interface {
AddServer(id, address string) error
RemoveServer(id string) error
LeadershipTransfer() error
LeadershipTransfer(id string) error
Snapshot() (io.ReadCloser, error)
}
@@ -265,8 +265,27 @@ func (r *raft) RemoveServer(id string) error {
return nil
}
func (r *raft) LeadershipTransfer() error {
future := r.raft.LeadershipTransfer()
func (r *raft) LeadershipTransfer(id string) error {
var future hcraft.Future
if len(id) == 0 {
future = r.raft.LeadershipTransfer()
} else {
servers, err := r.Servers()
if err != nil {
return err
}
for _, server := range servers {
if server.ID != id {
continue
}
future = r.raft.LeadershipTransferToServer(hcraft.ServerID(id), hcraft.ServerAddress(server.Address))
break
}
}
if err := future.Error(); err != nil {
return fmt.Errorf("failed to transfer leadership: %w", err)
}

View File

@@ -1613,6 +1613,38 @@ const docTemplate = `{
}
}
},
"/api/v3/cluster/transfer/{id}": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Transfer the leadership to another node",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Transfer the leadership to another node",
"operationId": "cluster-3-transfer-leadership",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/config": {
"get": {
"security": [

View File

@@ -1605,6 +1605,38 @@
}
}
},
"/api/v3/cluster/transfer/{id}": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Transfer the leadership to another node",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Transfer the leadership to another node",
"operationId": "cluster-3-transfer-leadership",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/config": {
"get": {
"security": [

View File

@@ -3462,6 +3462,26 @@ paths:
summary: Retrieve snapshot of the cluster DB
tags:
- v16.?.?
/api/v3/cluster/transfer/{id}:
put:
description: Transfer the leadership to another node
operationId: cluster-3-transfer-leadership
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Transfer the leadership to another node
tags:
- v16.?.?
/api/v3/config:
get:
description: Retrieve the currently active Restreamer configuration

View File

@@ -115,6 +115,24 @@ func (h *ClusterHandler) Healthy(c echo.Context) error {
return c.JSON(http.StatusOK, !degraded)
}
// Transfer the leadership to another node
// @Summary Transfer the leadership to another node
// @Description Transfer the leadership to another node
// @Tags v16.?.?
// @ID cluster-3-transfer-leadership
// @Produce json
// @Success 200 {string} string
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/transfer/{id} [put]
func (h *ClusterHandler) TransferLeadership(c echo.Context) error {
id := util.PathParam(c, "id")
h.cluster.TransferLeadership("", id)
return c.JSON(http.StatusOK, "OK")
}
// Leave the cluster gracefully
// @Summary Leave the cluster gracefully
// @Description Leave the cluster gracefully
@@ -564,12 +582,12 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error {
}
if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "", "API user %s is not allowed to write this process", ctxuser)
return api.Err(http.StatusForbidden, "", "API user %s is not allowed to write this process in domain %s", ctxuser, process.Domain)
}
if !superuser {
if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "", "user %s is not allowed to write this process", process.Owner)
return api.Err(http.StatusForbidden, "", "user %s is not allowed to write this process in domain %s", process.Owner, process.Domain)
}
}
@@ -584,7 +602,7 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error {
config, metadata := process.Marshal()
if err := h.cluster.AddProcess("", config); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid process config: %s", err.Error())
return api.Err(http.StatusBadRequest, "", "adding process config: %s", err.Error())
}
for key, value := range metadata {

View File

@@ -716,6 +716,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion)
if !s.readOnly {
v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership)
v3.PUT("/cluster/leave", s.v3handler.cluster.Leave)
v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)