diff --git a/cluster/api.go b/cluster/api.go index 809cfcf3..ae12f8aa 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -103,6 +103,8 @@ func NewAPI(config APIConfig) (API, error) { a.router.POST("/v1/server", a.AddServer) a.router.DELETE("/v1/server/:id", a.RemoveServer) + a.router.PUT("/v1/transfer/:id", a.TransferLeadership) + a.router.GET("/v1/snaphot", a.Snapshot) a.router.POST("/v1/process", a.AddProcess) @@ -255,7 +257,41 @@ func (a *api) RemoveServer(c echo.Context) error { err := a.cluster.Leave(origin, id) if err != nil { a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to leave cluster") - return Err(http.StatusInternalServerError, "", "unable to leave cluster%s", err.Error()) + return Err(http.StatusInternalServerError, "", "unable to leave cluster: %s", err.Error()) + } + + return c.JSON(http.StatusOK, "OK") +} + +// TransferLeadership transfers the leadership to another node +// @Summary Transfer leadership +// @Description Transfer leadership +// @Tags v1.0.0 +// @ID cluster-1-transfer-leadership +// @Accept json +// @Produce json +// @Param X-Cluster-Origin header string false "Origin ID of request" +// @Success 200 {string} string +// @Failure 500 {object} Error +// @Failure 508 {object} Error +// @Router /v1/transfer/{id} [put] +func (a *api) TransferLeadership(c echo.Context) error { + id := util.PathParam(c, "id") + + a.logger.Debug().WithFields(log.Fields{ + "id": id, + }).Log("Transfer request") + + origin := c.Request().Header.Get("X-Cluster-Origin") + + if origin == a.id { + return Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.TransferLeadership(origin, id) + if err != nil { + a.logger.Debug().WithError(err).WithField("id", id).Log("Unable to transfer leadership") + return Err(http.StatusInternalServerError, "", "unable to transfer leadership: %s", err.Error()) } return c.JSON(http.StatusOK, "OK") diff --git a/cluster/client/client.go b/cluster/client/client.go index 4b058cbe..1d1c43a7 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -163,6 +163,12 @@ func (c *APIClient) Leave(origin string, id string) error { return err } +func (c *APIClient) TransferLeadership(origin, id string) error { + _, err := c.call(http.MethodPut, "/v1/transfer/"+url.PathEscape(id), "application/json", nil, origin) + + return err +} + func (c *APIClient) AddProcess(origin string, r AddProcessRequest) error { data, err := json.Marshal(r) if err != nil { diff --git a/cluster/cluster.go b/cluster/cluster.go index 24fd7ffb..6cbdb3f9 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -55,7 +55,8 @@ type Cluster interface { GetBarrier(name string) bool Join(origin, id, raftAddress, peerAddress string) error - Leave(origin, id string) error // gracefully remove a node from the cluster + Leave(origin, id string) error // gracefully remove a node from the cluster + TransferLeadership(origin, id string) error // transfer leadership to another node Snapshot(origin string) (io.ReadCloser, error) ListProcesses() []store.Process @@ -696,6 +697,10 @@ func (c *cluster) IsClusterDegraded() (bool, error) { } func (c *cluster) Leave(origin, id string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + if len(id) == 0 { id = c.id } @@ -761,7 +766,7 @@ func (c *cluster) Leave(origin, id string) error { } // Transfer the leadership to another server - err := c.leadershipTransfer() + err := c.leadershipTransfer("") if err != nil { c.logger.Warn().WithError(err).Log("Transfer leadership") return err @@ -830,6 +835,10 @@ func (c *cluster) Leave(origin, id string) error { } func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + if !c.IsRaftLeader() { c.logger.Debug().Log("Not leader, forwarding to leader") return c.forwarder.Join(origin, id, raftAddress, peerAddress) @@ -888,7 +897,24 @@ func (c *cluster) Join(origin, id, raftAddress, peerAddress string) error { return nil } +func (c *cluster) TransferLeadership(origin, id string) error { + if ok, _ := c.IsDegraded(); ok { + return ErrDegraded + } + + if !c.IsRaftLeader() { + c.logger.Debug().Log("Not leader, forwarding to leader") + return c.forwarder.TransferLeadership(origin, id) + } + + return c.leadershipTransfer(id) +} + func (c *cluster) Snapshot(origin string) (io.ReadCloser, error) { + if ok, _ := c.IsDegraded(); ok { + return nil, ErrDegraded + } + if !c.IsRaftLeader() { c.logger.Debug().Log("Not leader, forwarding to leader") return c.forwarder.Snapshot(origin) diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index 7abd844b..d9c38872 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -1078,6 +1078,50 @@ const docTemplateClusterAPI = `{ } } } + }, + "/v1/transfer/{id}": { + "put": { + "description": "Transfer leadership", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Transfer leadership", + "operationId": "cluster-1-transfer-leadership", + "parameters": [ + { + "type": "string", + "description": "Origin ID of request", + "name": "X-Cluster-Origin", + "in": "header" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } } }, "definitions": { diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index 6ed4513c..5ff2aaca 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -1070,6 +1070,50 @@ } } } + }, + "/v1/transfer/{id}": { + "put": { + "description": "Transfer leadership", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v1.0.0" + ], + "summary": "Transfer leadership", + "operationId": "cluster-1-transfer-leadership", + "parameters": [ + { + "type": "string", + "description": "Origin ID of request", + "name": "X-Cluster-Origin", + "in": "header" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + }, + "508": { + "description": "Loop Detected", + "schema": { + "$ref": "#/definitions/cluster.Error" + } + } + } + } } }, "definitions": { diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index f7a29101..3f597676 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -1552,4 +1552,33 @@ paths: summary: Cluster DB snapshot tags: - v1.0.0 + /v1/transfer/{id}: + put: + consumes: + - application/json + description: Transfer leadership + operationId: cluster-1-transfer-leadership + parameters: + - description: Origin ID of request + in: header + name: X-Cluster-Origin + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/cluster.Error' + "508": + description: Loop Detected + schema: + $ref: '#/definitions/cluster.Error' + summary: Transfer leadership + tags: + - v1.0.0 swagger: "2.0" diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index 9d2a4389..a998bc5a 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -20,6 +20,7 @@ type Forwarder interface { Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error + TransferLeadership(origin, id string) error Snapshot(origin string) (io.ReadCloser, error) AddProcess(origin string, config *app.Config) error @@ -138,6 +139,20 @@ func (f *forwarder) Leave(origin, id string) error { return client.Leave(origin, id) } +func (f *forwarder) TransferLeadership(origin, id string) error { + if origin == "" { + origin = f.id + } + + f.logger.Debug().WithField("id", id).Log("Transferring leadership") + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.TransferLeadership(origin, id) +} + func (f *forwarder) Snapshot(origin string) (io.ReadCloser, error) { f.lock.RLock() client := f.client diff --git a/cluster/leader.go b/cluster/leader.go index c60ff5cd..992aa472 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -187,10 +187,14 @@ func (c *cluster) monitorLeadership() { // leadershipTransfer tries to transfer the leadership to another node e.g. in order // to do a graceful shutdown. -func (c *cluster) leadershipTransfer() error { +func (c *cluster) leadershipTransfer(id string) error { + if id == c.id { + return nil + } + retryCount := 3 for i := 0; i < retryCount; i++ { - err := c.raft.LeadershipTransfer() + err := c.raft.LeadershipTransfer(id) if err != nil { c.logger.Error().WithError(err).WithFields(log.Fields{ "attempt": i, @@ -254,7 +258,7 @@ RECONCILE: // longer the leader. If leadershipTransfer() fails, we // will try to acquire it again after // 5 seconds. - if err := c.leadershipTransfer(); err != nil { + if err := c.leadershipTransfer(""); err != nil { c.logger.Error().WithError(err).Log("Transfer leadership") interval = time.After(5 * time.Second) goto WAIT diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 4bfd155c..d970b46e 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -41,7 +41,7 @@ type Raft interface { AddServer(id, address string) error RemoveServer(id string) error - LeadershipTransfer() error + LeadershipTransfer(id string) error Snapshot() (io.ReadCloser, error) } @@ -265,8 +265,27 @@ func (r *raft) RemoveServer(id string) error { return nil } -func (r *raft) LeadershipTransfer() error { - future := r.raft.LeadershipTransfer() +func (r *raft) LeadershipTransfer(id string) error { + var future hcraft.Future + + if len(id) == 0 { + future = r.raft.LeadershipTransfer() + } else { + servers, err := r.Servers() + if err != nil { + return err + } + + for _, server := range servers { + if server.ID != id { + continue + } + + future = r.raft.LeadershipTransferToServer(hcraft.ServerID(id), hcraft.ServerAddress(server.Address)) + break + } + } + if err := future.Error(); err != nil { return fmt.Errorf("failed to transfer leadership: %w", err) } diff --git a/docs/docs.go b/docs/docs.go index 2ba1e22e..354a61ba 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1613,6 +1613,38 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/transfer/{id}": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Transfer the leadership to another node", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Transfer the leadership to another node", + "operationId": "cluster-3-transfer-leadership", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/config": { "get": { "security": [ diff --git a/docs/swagger.json b/docs/swagger.json index 2e762b1b..9a12c841 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1605,6 +1605,38 @@ } } }, + "/api/v3/cluster/transfer/{id}": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Transfer the leadership to another node", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Transfer the leadership to another node", + "operationId": "cluster-3-transfer-leadership", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/config": { "get": { "security": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 759d9435..c1e8f427 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -3462,6 +3462,26 @@ paths: summary: Retrieve snapshot of the cluster DB tags: - v16.?.? + /api/v3/cluster/transfer/{id}: + put: + description: Transfer the leadership to another node + operationId: cluster-3-transfer-leadership + produces: + - application/json + responses: + "200": + description: OK + schema: + type: string + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Transfer the leadership to another node + tags: + - v16.?.? /api/v3/config: get: description: Retrieve the currently active Restreamer configuration diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 19e8e72f..1c8b68db 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -115,6 +115,24 @@ func (h *ClusterHandler) Healthy(c echo.Context) error { return c.JSON(http.StatusOK, !degraded) } +// Transfer the leadership to another node +// @Summary Transfer the leadership to another node +// @Description Transfer the leadership to another node +// @Tags v16.?.? +// @ID cluster-3-transfer-leadership +// @Produce json +// @Success 200 {string} string +// @Failure 500 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/transfer/{id} [put] +func (h *ClusterHandler) TransferLeadership(c echo.Context) error { + id := util.PathParam(c, "id") + + h.cluster.TransferLeadership("", id) + + return c.JSON(http.StatusOK, "OK") +} + // Leave the cluster gracefully // @Summary Leave the cluster gracefully // @Description Leave the cluster gracefully @@ -564,12 +582,12 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error { } if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") { - return api.Err(http.StatusForbidden, "", "API user %s is not allowed to write this process", ctxuser) + return api.Err(http.StatusForbidden, "", "API user %s is not allowed to write this process in domain %s", ctxuser, process.Domain) } if !superuser { if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") { - return api.Err(http.StatusForbidden, "", "user %s is not allowed to write this process", process.Owner) + return api.Err(http.StatusForbidden, "", "user %s is not allowed to write this process in domain %s", process.Owner, process.Domain) } } @@ -584,7 +602,7 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error { config, metadata := process.Marshal() if err := h.cluster.AddProcess("", config); err != nil { - return api.Err(http.StatusBadRequest, "", "invalid process config: %s", err.Error()) + return api.Err(http.StatusBadRequest, "", "adding process config: %s", err.Error()) } for key, value := range metadata { diff --git a/http/server.go b/http/server.go index 0d06bd64..b391e08f 100644 --- a/http/server.go +++ b/http/server.go @@ -716,6 +716,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion) if !s.readOnly { + v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership) v3.PUT("/cluster/leave", s.v3handler.cluster.Leave) v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)