mirror of
https://github.com/datarhei/core.git
synced 2025-10-04 15:42:57 +08:00
Add locks on the cluster DB
This commit is contained in:
@@ -552,13 +552,81 @@ func (a *api) RemoveIdentity(c echo.Context) error {
|
||||
// @ID cluster-1-core-api-address
|
||||
// @Produce json
|
||||
// @Success 200 {string} string
|
||||
// @Success 500 {array} Error
|
||||
// @Success 500 {object} Error
|
||||
// @Router /v1/core [get]
|
||||
func (a *api) CoreAPIAddress(c echo.Context) error {
|
||||
address, _ := a.cluster.CoreAPIAddress("")
|
||||
return c.JSON(http.StatusOK, address)
|
||||
}
|
||||
|
||||
// Lock tries to acquire a named lock
|
||||
// @Summary Acquire a named lock
|
||||
// @Description Acquire a named lock
|
||||
// @Tags v1.0.0
|
||||
// @ID cluster-1-lock
|
||||
// @Produce json
|
||||
// @Param data body client.LockRequest true "Lock request"
|
||||
// @Param X-Cluster-Origin header string false "Origin ID of request"
|
||||
// @Success 200 {string} string
|
||||
// @Success 500 {object} Error
|
||||
// @Failure 508 {object} Error
|
||||
// @Router /v1/lock [post]
|
||||
func (a *api) Lock(c echo.Context) error {
|
||||
r := client.LockRequest{}
|
||||
|
||||
if err := util.ShouldBindJSON(c, &r); err != nil {
|
||||
return Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
|
||||
}
|
||||
|
||||
origin := c.Request().Header.Get("X-Cluster-Origin")
|
||||
|
||||
if origin == a.id {
|
||||
return Err(http.StatusLoopDetected, "", "breaking circuit")
|
||||
}
|
||||
|
||||
a.logger.Debug().WithField("name", r.Name).Log("Acquire lock")
|
||||
|
||||
err := a.cluster.CreateLock(origin, r.Name, r.ValidUntil)
|
||||
if err != nil {
|
||||
a.logger.Debug().WithError(err).WithField("name", r.Name).Log("Unable to acquire lock")
|
||||
return Err(http.StatusInternalServerError, "unable to acquire lock", "%s", err.Error())
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
// Unlock removes a named lock
|
||||
// @Summary Remove a lock
|
||||
// @Description Remove a lock
|
||||
// @Tags v1.0.0
|
||||
// @ID cluster-1-unlock
|
||||
// @Produce json
|
||||
// @Param name path string true "Lock name"
|
||||
// @Param X-Cluster-Origin header string false "Origin ID of request"
|
||||
// @Success 200 {string} string
|
||||
// @Failure 404 {object} Error
|
||||
// @Failure 508 {object} Error
|
||||
// @Router /v1/lock/{name} [delete]
|
||||
func (a *api) Unlock(c echo.Context) error {
|
||||
name := util.PathParam(c, "name")
|
||||
|
||||
origin := c.Request().Header.Get("X-Cluster-Origin")
|
||||
|
||||
if origin == a.id {
|
||||
return Err(http.StatusLoopDetected, "", "breaking circuit")
|
||||
}
|
||||
|
||||
a.logger.Debug().WithField("name", name).Log("Remove lock request")
|
||||
|
||||
err := a.cluster.DeleteLock(origin, name)
|
||||
if err != nil {
|
||||
a.logger.Debug().WithError(err).WithField("name", name).Log("Unable to remove lock")
|
||||
return Err(http.StatusInternalServerError, "unable to remove lock", "%s", err.Error())
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
// Error represents an error response of the API
|
||||
type Error struct {
|
||||
Code int `json:"code" jsonschema:"required" format:"int"`
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
httpapi "github.com/datarhei/core/v16/http/api"
|
||||
@@ -43,6 +44,11 @@ type SetPoliciesRequest struct {
|
||||
Policies []iamaccess.Policy `json:"policies"`
|
||||
}
|
||||
|
||||
type LockRequest struct {
|
||||
Name string `json:"name"`
|
||||
ValidUntil time.Time `json:"valid_until"`
|
||||
}
|
||||
|
||||
type APIClient struct {
|
||||
Address string
|
||||
Client *http.Client
|
||||
@@ -90,7 +96,7 @@ func (c *APIClient) Join(origin string, r JoinRequest) error {
|
||||
}
|
||||
|
||||
func (c *APIClient) Leave(origin string, id string) error {
|
||||
_, err := c.call(http.MethodDelete, "/v1/server/"+id, "application/json", nil, origin)
|
||||
_, err := c.call(http.MethodDelete, "/v1/server/"+url.PathEscape(id), "application/json", nil, origin)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -107,7 +113,7 @@ func (c *APIClient) AddProcess(origin string, r AddProcessRequest) error {
|
||||
}
|
||||
|
||||
func (c *APIClient) RemoveProcess(origin string, id app.ProcessID) error {
|
||||
_, err := c.call(http.MethodDelete, "/v1/process/"+id.ID+"?domain="+id.Domain, "application/json", nil, origin)
|
||||
_, err := c.call(http.MethodDelete, "/v1/process/"+url.PathEscape(id.ID)+"?domain="+url.QueryEscape(id.Domain), "application/json", nil, origin)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -118,7 +124,7 @@ func (c *APIClient) UpdateProcess(origin string, id app.ProcessID, r UpdateProce
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.call(http.MethodPut, "/v1/process/"+id.ID+"?domain="+id.Domain, "application/json", bytes.NewReader(data), origin)
|
||||
_, err = c.call(http.MethodPut, "/v1/process/"+url.PathEscape(id.ID)+"?domain="+url.QueryEscape(id.Domain), "application/json", bytes.NewReader(data), origin)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -129,7 +135,7 @@ func (c *APIClient) SetProcessMetadata(origin string, id app.ProcessID, key stri
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.call(http.MethodPut, "/v1/process/"+id.ID+"/metadata/"+key+"?domain="+id.Domain, "application/json", bytes.NewReader(data), origin)
|
||||
_, err = c.call(http.MethodPut, "/v1/process/"+url.PathEscape(id.ID)+"/metadata/"+url.PathEscape(key)+"?domain="+url.QueryEscape(id.Domain), "application/json", bytes.NewReader(data), origin)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -151,7 +157,7 @@ func (c *APIClient) UpdateIdentity(origin, name string, r UpdateIdentityRequest)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.call(http.MethodPut, "/v1/iam/user/"+name, "application/json", bytes.NewReader(data), origin)
|
||||
_, err = c.call(http.MethodPut, "/v1/iam/user/"+url.PathEscape(name), "application/json", bytes.NewReader(data), origin)
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -162,13 +168,30 @@ func (c *APIClient) SetPolicies(origin, name string, r SetPoliciesRequest) error
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.call(http.MethodPut, "/v1/iam/user/"+name+"/policies", "application/json", bytes.NewReader(data), origin)
|
||||
_, err = c.call(http.MethodPut, "/v1/iam/user/"+url.PathEscape(name)+"/policies", "application/json", bytes.NewReader(data), origin)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *APIClient) RemoveIdentity(origin string, name string) error {
|
||||
_, err := c.call(http.MethodDelete, "/v1/iam/user/"+name, "application/json", nil, origin)
|
||||
_, err := c.call(http.MethodDelete, "/v1/iam/user/"+url.PathEscape(name), "application/json", nil, origin)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *APIClient) Lock(origin string, r LockRequest) error {
|
||||
data, err := json.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.call(http.MethodPost, "/v1/lock", "application/json", bytes.NewReader(data), origin)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *APIClient) Unlock(origin string, name string) error {
|
||||
_, err := c.call(http.MethodDelete, "/v1/lock/"+url.PathEscape(name), "application/json", nil, origin)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@@ -65,6 +65,10 @@ type Cluster interface {
|
||||
SetPolicies(origin, name string, policies []iamaccess.Policy) error
|
||||
RemoveIdentity(origin string, name string) error
|
||||
|
||||
CreateLock(origin string, name string, validUntil time.Time) error
|
||||
DeleteLock(origin string, name string) error
|
||||
ListLocks() map[string]time.Time
|
||||
|
||||
ProxyReader() proxy.ProxyReader
|
||||
}
|
||||
|
||||
@@ -1016,6 +1020,49 @@ func (c *cluster) RemoveIdentity(origin string, name string) error {
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) CreateLock(origin string, name string, validUntil time.Time) error {
|
||||
if ok, _ := c.IsDegraded(); ok {
|
||||
return ErrDegraded
|
||||
}
|
||||
|
||||
if !c.IsRaftLeader() {
|
||||
return c.forwarder.CreateLock(origin, name, validUntil)
|
||||
}
|
||||
|
||||
cmd := &store.Command{
|
||||
Operation: store.OpCreateLock,
|
||||
Data: &store.CommandCreateLock{
|
||||
Name: name,
|
||||
ValidUntil: validUntil,
|
||||
},
|
||||
}
|
||||
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) DeleteLock(origin string, name string) error {
|
||||
if ok, _ := c.IsDegraded(); ok {
|
||||
return ErrDegraded
|
||||
}
|
||||
|
||||
if !c.IsRaftLeader() {
|
||||
return c.forwarder.DeleteLock(origin, name)
|
||||
}
|
||||
|
||||
cmd := &store.Command{
|
||||
Operation: store.OpDeleteLock,
|
||||
Data: &store.CommandDeleteLock{
|
||||
Name: name,
|
||||
},
|
||||
}
|
||||
|
||||
return c.applyCommand(cmd)
|
||||
}
|
||||
|
||||
func (c *cluster) ListLocks() map[string]time.Time {
|
||||
return c.store.ListLocks()
|
||||
}
|
||||
|
||||
func (c *cluster) applyCommand(cmd *store.Command) error {
|
||||
b, err := json.Marshal(cmd)
|
||||
if err != nil {
|
||||
|
@@ -45,14 +45,11 @@ const docTemplateClusterAPI = `{
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/iam/user": {
|
||||
"post": {
|
||||
@@ -270,6 +267,104 @@ const docTemplateClusterAPI = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/lock": {
|
||||
"post": {
|
||||
"description": "Acquire a named lock",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v1.0.0"
|
||||
],
|
||||
"summary": "Acquire a named lock",
|
||||
"operationId": "cluster-1-lock",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Lock request",
|
||||
"name": "data",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/client.LockRequest"
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Origin ID of request",
|
||||
"name": "X-Cluster-Origin",
|
||||
"in": "header"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
},
|
||||
"508": {
|
||||
"description": "Loop Detected",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/lock/{name}": {
|
||||
"delete": {
|
||||
"description": "Remove a lock",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v1.0.0"
|
||||
],
|
||||
"summary": "Remove a lock",
|
||||
"operationId": "cluster-1-unlock",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Lock name",
|
||||
"name": "name",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Origin ID of request",
|
||||
"name": "X-Cluster-Origin",
|
||||
"in": "header"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Not Found",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
},
|
||||
"508": {
|
||||
"description": "Loop Detected",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/process": {
|
||||
"post": {
|
||||
"description": "Add a process to the cluster DB",
|
||||
@@ -810,6 +905,17 @@ const docTemplateClusterAPI = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"client.LockRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"valid_until": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"client.SetPoliciesRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@@ -37,14 +37,11 @@
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/iam/user": {
|
||||
"post": {
|
||||
@@ -262,6 +259,104 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/lock": {
|
||||
"post": {
|
||||
"description": "Acquire a named lock",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v1.0.0"
|
||||
],
|
||||
"summary": "Acquire a named lock",
|
||||
"operationId": "cluster-1-lock",
|
||||
"parameters": [
|
||||
{
|
||||
"description": "Lock request",
|
||||
"name": "data",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/client.LockRequest"
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Origin ID of request",
|
||||
"name": "X-Cluster-Origin",
|
||||
"in": "header"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "Internal Server Error",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
},
|
||||
"508": {
|
||||
"description": "Loop Detected",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/lock/{name}": {
|
||||
"delete": {
|
||||
"description": "Remove a lock",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v1.0.0"
|
||||
],
|
||||
"summary": "Remove a lock",
|
||||
"operationId": "cluster-1-unlock",
|
||||
"parameters": [
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Lock name",
|
||||
"name": "name",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "Origin ID of request",
|
||||
"name": "X-Cluster-Origin",
|
||||
"in": "header"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "Not Found",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
},
|
||||
"508": {
|
||||
"description": "Loop Detected",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/cluster.Error"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v1/process": {
|
||||
"post": {
|
||||
"description": "Add a process to the cluster DB",
|
||||
@@ -802,6 +897,17 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"client.LockRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"valid_until": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"client.SetPoliciesRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@@ -111,6 +111,13 @@ definitions:
|
||||
raft_address:
|
||||
type: string
|
||||
type: object
|
||||
client.LockRequest:
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
valid_until:
|
||||
type: string
|
||||
type: object
|
||||
client.SetPoliciesRequest:
|
||||
properties:
|
||||
policies:
|
||||
@@ -227,9 +234,7 @@ paths:
|
||||
"500":
|
||||
description: Internal Server Error
|
||||
schema:
|
||||
items:
|
||||
$ref: '#/definitions/cluster.Error'
|
||||
type: array
|
||||
summary: Core API address and login
|
||||
tags:
|
||||
- v1.0.0
|
||||
@@ -378,6 +383,71 @@ paths:
|
||||
summary: Set identity policies
|
||||
tags:
|
||||
- v1.0.0
|
||||
/v1/lock:
|
||||
post:
|
||||
description: Acquire a named lock
|
||||
operationId: cluster-1-lock
|
||||
parameters:
|
||||
- description: Lock request
|
||||
in: body
|
||||
name: data
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/definitions/client.LockRequest'
|
||||
- description: Origin ID of request
|
||||
in: header
|
||||
name: X-Cluster-Origin
|
||||
type: string
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
schema:
|
||||
type: string
|
||||
"500":
|
||||
description: Internal Server Error
|
||||
schema:
|
||||
$ref: '#/definitions/cluster.Error'
|
||||
"508":
|
||||
description: Loop Detected
|
||||
schema:
|
||||
$ref: '#/definitions/cluster.Error'
|
||||
summary: Acquire a named lock
|
||||
tags:
|
||||
- v1.0.0
|
||||
/v1/lock/{name}:
|
||||
delete:
|
||||
description: Remove a lock
|
||||
operationId: cluster-1-unlock
|
||||
parameters:
|
||||
- description: Lock name
|
||||
in: path
|
||||
name: name
|
||||
required: true
|
||||
type: string
|
||||
- description: Origin ID of request
|
||||
in: header
|
||||
name: X-Cluster-Origin
|
||||
type: string
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
schema:
|
||||
type: string
|
||||
"404":
|
||||
description: Not Found
|
||||
schema:
|
||||
$ref: '#/definitions/cluster.Error'
|
||||
"508":
|
||||
description: Loop Detected
|
||||
schema:
|
||||
$ref: '#/definitions/cluster.Error'
|
||||
summary: Remove a lock
|
||||
tags:
|
||||
- v1.0.0
|
||||
/v1/process:
|
||||
post:
|
||||
consumes:
|
||||
|
@@ -31,6 +31,9 @@ type Forwarder interface {
|
||||
UpdateIdentity(origin, name string, identity iamidentity.User) error
|
||||
SetPolicies(origin, name string, policies []iamaccess.Policy) error
|
||||
RemoveIdentity(origin string, name string) error
|
||||
|
||||
CreateLock(origin string, name string, validUntil time.Time) error
|
||||
DeleteLock(origin string, name string) error
|
||||
}
|
||||
|
||||
type forwarder struct {
|
||||
@@ -258,3 +261,32 @@ func (f *forwarder) RemoveIdentity(origin string, name string) error {
|
||||
|
||||
return client.RemoveIdentity(origin, name)
|
||||
}
|
||||
|
||||
func (f *forwarder) CreateLock(origin string, name string, validUntil time.Time) error {
|
||||
if origin == "" {
|
||||
origin = f.id
|
||||
}
|
||||
|
||||
r := apiclient.LockRequest{
|
||||
Name: name,
|
||||
ValidUntil: validUntil,
|
||||
}
|
||||
|
||||
f.lock.RLock()
|
||||
client := f.client
|
||||
f.lock.RUnlock()
|
||||
|
||||
return client.Lock(origin, r)
|
||||
}
|
||||
|
||||
func (f *forwarder) DeleteLock(origin string, name string) error {
|
||||
if origin == "" {
|
||||
origin = f.id
|
||||
}
|
||||
|
||||
f.lock.RLock()
|
||||
client := f.client
|
||||
f.lock.RUnlock()
|
||||
|
||||
return client.Unlock(origin, name)
|
||||
}
|
||||
|
@@ -28,6 +28,8 @@ type Store interface {
|
||||
GetUser(name string) Users
|
||||
ListPolicies() Policies
|
||||
ListUserPolicies(name string) Policies
|
||||
|
||||
ListLocks() map[string]time.Time
|
||||
}
|
||||
|
||||
type Process struct {
|
||||
@@ -59,6 +61,8 @@ const (
|
||||
OpRemoveIdentity Operation = "removeIdentity"
|
||||
OpSetPolicies Operation = "setPolicies"
|
||||
OpSetProcessNodeMap Operation = "setProcessNodeMap"
|
||||
OpCreateLock Operation = "createLock"
|
||||
OpDeleteLock Operation = "deleteLock"
|
||||
)
|
||||
|
||||
type Command struct {
|
||||
@@ -107,6 +111,15 @@ type CommandSetProcessNodeMap struct {
|
||||
Map map[string]string
|
||||
}
|
||||
|
||||
type CommandCreateLock struct {
|
||||
Name string
|
||||
ValidUntil time.Time
|
||||
}
|
||||
|
||||
type CommandDeleteLock struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
type storeData struct {
|
||||
Version uint64
|
||||
Process map[string]Process
|
||||
@@ -121,6 +134,8 @@ type storeData struct {
|
||||
UpdatedAt time.Time
|
||||
Policies map[string][]access.Policy
|
||||
}
|
||||
|
||||
Locks map[string]time.Time
|
||||
}
|
||||
|
||||
func (s *storeData) init() {
|
||||
@@ -133,6 +148,7 @@ func (s *storeData) init() {
|
||||
s.Users.Users = map[string]identity.User{}
|
||||
s.Policies.UpdatedAt = now
|
||||
s.Policies.Policies = map[string][]access.Policy{}
|
||||
s.Locks = map[string]time.Time{}
|
||||
}
|
||||
|
||||
// store implements a raft.FSM
|
||||
@@ -314,6 +330,30 @@ func (s *store) applyCommand(c Command) error {
|
||||
}
|
||||
|
||||
err = s.setProcessNodeMap(cmd)
|
||||
case OpCreateLock:
|
||||
b, err = json.Marshal(c.Data)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
cmd := CommandCreateLock{}
|
||||
err = json.Unmarshal(b, &cmd)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
err = s.createLock(cmd)
|
||||
case OpDeleteLock:
|
||||
b, err = json.Marshal(c.Data)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
cmd := CommandDeleteLock{}
|
||||
err = json.Unmarshal(b, &cmd)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
err = s.deleteLock(cmd)
|
||||
default:
|
||||
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
|
||||
err = fmt.Errorf("unknown operation: %s", c.Operation)
|
||||
@@ -538,6 +578,36 @@ func (s *store) setProcessNodeMap(cmd CommandSetProcessNodeMap) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) createLock(cmd CommandCreateLock) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
validUntil, ok := s.data.Locks[cmd.Name]
|
||||
|
||||
if ok {
|
||||
if time.Now().Before(validUntil) {
|
||||
return fmt.Errorf("the lock with the ID '%s' already exists", cmd.Name)
|
||||
}
|
||||
}
|
||||
|
||||
s.data.Locks[cmd.Name] = cmd.ValidUntil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) deleteLock(cmd CommandDeleteLock) error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if _, ok := s.data.Locks[cmd.Name]; !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(s.data.Locks, cmd.Name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *store) OnApply(fn func(op Operation)) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
@@ -701,6 +771,19 @@ func (s *store) GetProcessNodeMap() map[string]string {
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *store) ListLocks() map[string]time.Time {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
m := map[string]time.Time{}
|
||||
|
||||
for key, value := range s.data.Locks {
|
||||
m[key] = value
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
type fsmSnapshot struct {
|
||||
data []byte
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package store
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/iam/access"
|
||||
"github.com/datarhei/core/v16/iam/identity"
|
||||
@@ -33,6 +34,7 @@ func TestCreateStore(t *testing.T) {
|
||||
require.NotNil(t, s.data.ProcessNodeMap)
|
||||
require.NotNil(t, s.data.Users.Users)
|
||||
require.NotNil(t, s.data.Policies.Policies)
|
||||
require.NotNil(t, s.data.Locks)
|
||||
}
|
||||
|
||||
func TestAddProcessCommand(t *testing.T) {
|
||||
@@ -853,6 +855,101 @@ func TestSetProcessNodeMap(t *testing.T) {
|
||||
require.Equal(t, m2, m)
|
||||
}
|
||||
|
||||
func TestCreateLockCommand(t *testing.T) {
|
||||
s, err := createStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.applyCommand(Command{
|
||||
Operation: OpCreateLock,
|
||||
Data: CommandCreateLock{
|
||||
Name: "foobar",
|
||||
ValidUntil: time.Now().Add(3 * time.Second),
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok := s.data.Locks["foobar"]
|
||||
require.True(t, ok)
|
||||
}
|
||||
|
||||
func TestCreateLock(t *testing.T) {
|
||||
s, err := createStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
cmd := CommandCreateLock{
|
||||
Name: "foobar",
|
||||
ValidUntil: time.Now().Add(3 * time.Second),
|
||||
}
|
||||
|
||||
err = s.createLock(cmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.createLock(cmd)
|
||||
require.Error(t, err)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
err = s.createLock(cmd)
|
||||
return err == nil
|
||||
}, 5*time.Second, time.Second)
|
||||
}
|
||||
|
||||
func TestDeleteLockCommand(t *testing.T) {
|
||||
s, err := createStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.applyCommand(Command{
|
||||
Operation: OpCreateLock,
|
||||
Data: CommandCreateLock{
|
||||
Name: "foobar",
|
||||
ValidUntil: time.Now().Add(10 * time.Second),
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok := s.data.Locks["foobar"]
|
||||
require.True(t, ok)
|
||||
|
||||
err = s.applyCommand(Command{
|
||||
Operation: OpDeleteLock,
|
||||
Data: CommandDeleteLock{
|
||||
Name: "foobar",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, ok = s.data.Locks["foobar"]
|
||||
require.False(t, ok)
|
||||
}
|
||||
|
||||
func TestDeleteLock(t *testing.T) {
|
||||
s, err := createStore()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.deleteLock(CommandDeleteLock{
|
||||
Name: "foobar",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
cmd := CommandCreateLock{
|
||||
Name: "foobar",
|
||||
ValidUntil: time.Now().Add(10 * time.Second),
|
||||
}
|
||||
|
||||
err = s.createLock(cmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.createLock(cmd)
|
||||
require.Error(t, err)
|
||||
|
||||
err = s.deleteLock(CommandDeleteLock{
|
||||
Name: "foobar",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.createLock(cmd)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestApplyCommand(t *testing.T) {
|
||||
s, err := createStore()
|
||||
require.NoError(t, err)
|
||||
|
40
docs/docs.go
40
docs/docs.go
@@ -154,6 +154,35 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/cluster/db/locks": {
|
||||
"get": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "List of locks in the cluster DB",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v16.?.?"
|
||||
],
|
||||
"summary": "List locks in the cluster DB",
|
||||
"operationId": "cluster-3-db-list-locks",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/api.ClusterLock"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/cluster/db/policies": {
|
||||
"get": {
|
||||
"security": [
|
||||
@@ -4074,6 +4103,17 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterLock": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"valid_until": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterNode": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@@ -146,6 +146,35 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/cluster/db/locks": {
|
||||
"get": {
|
||||
"security": [
|
||||
{
|
||||
"ApiKeyAuth": []
|
||||
}
|
||||
],
|
||||
"description": "List of locks in the cluster DB",
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"v16.?.?"
|
||||
],
|
||||
"summary": "List locks in the cluster DB",
|
||||
"operationId": "cluster-3-db-list-locks",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "OK",
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/api.ClusterLock"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/v3/cluster/db/policies": {
|
||||
"get": {
|
||||
"security": [
|
||||
@@ -4066,6 +4095,17 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterLock": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"valid_until": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"api.ClusterNode": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
@@ -88,6 +88,13 @@ definitions:
|
||||
version:
|
||||
type: string
|
||||
type: object
|
||||
api.ClusterLock:
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
valid_until:
|
||||
type: string
|
||||
type: object
|
||||
api.ClusterNode:
|
||||
properties:
|
||||
address:
|
||||
@@ -2460,6 +2467,24 @@ paths:
|
||||
summary: List of nodes in the cluster
|
||||
tags:
|
||||
- v16.?.?
|
||||
/api/v3/cluster/db/locks:
|
||||
get:
|
||||
description: List of locks in the cluster DB
|
||||
operationId: cluster-3-db-list-locks
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
schema:
|
||||
items:
|
||||
$ref: '#/definitions/api.ClusterLock'
|
||||
type: array
|
||||
security:
|
||||
- ApiKeyAuth: []
|
||||
summary: List locks in the cluster DB
|
||||
tags:
|
||||
- v16.?.?
|
||||
/api/v3/cluster/db/policies:
|
||||
get:
|
||||
description: List of policies in the cluster
|
||||
|
@@ -87,3 +87,8 @@ type ClusterProcess struct {
|
||||
Memory uint64 `json:"memory_bytes"` // bytes
|
||||
Runtime int64 `json:"runtime_seconds"` // seconds
|
||||
}
|
||||
|
||||
type ClusterLock struct {
|
||||
Name string `json:"name"`
|
||||
ValidUntil time.Time `json:"valid_until"`
|
||||
}
|
||||
|
@@ -1191,3 +1191,27 @@ func (h *ClusterHandler) RemoveIdentity(c echo.Context) error {
|
||||
|
||||
return c.JSON(http.StatusOK, "OK")
|
||||
}
|
||||
|
||||
// ListStoreLocks returns the list of currently stored locks
|
||||
// @Summary List locks in the cluster DB
|
||||
// @Description List of locks in the cluster DB
|
||||
// @Tags v16.?.?
|
||||
// @ID cluster-3-db-list-locks
|
||||
// @Produce json
|
||||
// @Success 200 {array} api.ClusterLock
|
||||
// @Security ApiKeyAuth
|
||||
// @Router /api/v3/cluster/db/locks [get]
|
||||
func (h *ClusterHandler) ListStoreLocks(c echo.Context) error {
|
||||
clusterlocks := h.cluster.ListLocks()
|
||||
|
||||
locks := []api.ClusterLock{}
|
||||
|
||||
for name, validUntil := range clusterlocks {
|
||||
locks = append(locks, api.ClusterLock{
|
||||
Name: name,
|
||||
ValidUntil: validUntil,
|
||||
})
|
||||
}
|
||||
|
||||
return c.JSON(http.StatusOK, locks)
|
||||
}
|
||||
|
@@ -679,6 +679,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
|
||||
v3.GET("/cluster/db/user", s.v3handler.cluster.ListStoreIdentities)
|
||||
v3.GET("/cluster/db/user/:name", s.v3handler.cluster.ListStoreIdentity)
|
||||
v3.GET("/cluster/db/policies", s.v3handler.cluster.ListStorePolicies)
|
||||
v3.GET("/cluster/db/locks", s.v3handler.cluster.ListStoreLocks)
|
||||
|
||||
v3.GET("/cluster/iam/user", s.v3handler.cluster.ListIdentities)
|
||||
v3.GET("/cluster/iam/user/:name", s.v3handler.cluster.ListIdentity)
|
||||
|
Reference in New Issue
Block a user