Allow to send command to process on the cluster

This commit is contained in:
Ingo Oppermann
2023-06-22 21:43:51 +02:00
parent f965d106d3
commit 944d487730
16 changed files with 660 additions and 29 deletions

View File

@@ -112,6 +112,7 @@ func NewAPI(config APIConfig) (API, error) {
a.router.POST("/v1/process", a.AddProcess) a.router.POST("/v1/process", a.AddProcess)
a.router.DELETE("/v1/process/:id", a.RemoveProcess) a.router.DELETE("/v1/process/:id", a.RemoveProcess)
a.router.PUT("/v1/process/:id", a.UpdateProcess) a.router.PUT("/v1/process/:id", a.UpdateProcess)
a.router.PUT("/v1/process/:id/command", a.SetProcessCommand)
a.router.PUT("/v1/process/:id/metadata/:key", a.SetProcessMetadata) a.router.PUT("/v1/process/:id/metadata/:key", a.SetProcessMetadata)
a.router.POST("/v1/iam/user", a.AddIdentity) a.router.POST("/v1/iam/user", a.AddIdentity)
@@ -357,6 +358,46 @@ func (a *api) UpdateProcess(c echo.Context) error {
return c.JSON(http.StatusOK, "OK") return c.JSON(http.StatusOK, "OK")
} }
// SetProcessCommand sets the order for a process
// @Summary Set the order for a process
// @Description Set the order for a process.
// @Tags v1.0.0
// @ID cluster-3-set-process-order
// @Produce json
// @Param id path string true "Process ID"
// @Param domain query string false "Domain to act on"
// @Param data body client.SetProcessCommandRequest true "Process order"
// @Success 200 {string} string
// @Failure 500 {object} Error
// @Failure 508 {object} Error
// @Router /v1/process/{id}/command [put]
func (a *api) SetProcessCommand(c echo.Context) error {
id := util.PathParam(c, "id")
domain := util.DefaultQuery(c, "domain", "")
r := client.SetProcessCommandRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil {
return Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
origin := c.Request().Header.Get("X-Cluster-Origin")
if origin == a.id {
return Err(http.StatusLoopDetected, "", "breaking circuit")
}
pid := app.ProcessID{ID: id, Domain: domain}
err := a.cluster.SetProcessCommand(origin, pid, r.Command)
if err != nil {
a.logger.Debug().WithError(err).WithField("id", pid).Log("Unable to set order")
return Err(http.StatusInternalServerError, "", "unable to set order: %s", err.Error())
}
return c.JSON(http.StatusOK, "OK")
}
// SetProcessMetadata stores metadata with a process // SetProcessMetadata stores metadata with a process
// @Summary Add JSON metadata with a process under the given key // @Summary Add JSON metadata with a process under the given key
// @Description Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created. // @Description Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.

View File

@@ -29,6 +29,10 @@ type UpdateProcessRequest struct {
Config app.Config `json:"config"` Config app.Config `json:"config"`
} }
type SetProcessCommandRequest struct {
Command string `json:"order"`
}
type SetProcessMetadataRequest struct { type SetProcessMetadataRequest struct {
Metadata interface{} `json:"metadata"` Metadata interface{} `json:"metadata"`
} }
@@ -150,6 +154,17 @@ func (c *APIClient) UpdateProcess(origin string, id app.ProcessID, r UpdateProce
return err return err
} }
func (c *APIClient) SetProcessCommand(origin string, id app.ProcessID, r SetProcessCommandRequest) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
_, err = c.call(http.MethodPut, "/v1/process/"+url.PathEscape(id.ID)+"/command?domain="+url.QueryEscape(id.Domain), "application/json", bytes.NewReader(data), origin)
return err
}
func (c *APIClient) SetProcessMetadata(origin string, id app.ProcessID, key string, r SetProcessMetadataRequest) error { func (c *APIClient) SetProcessMetadata(origin string, id app.ProcessID, key string, r SetProcessMetadataRequest) error {
data, err := json.Marshal(r) data, err := json.Marshal(r)
if err != nil { if err != nil {

View File

@@ -54,6 +54,7 @@ type Cluster interface {
AddProcess(origin string, config *app.Config) error AddProcess(origin string, config *app.Config) error
RemoveProcess(origin string, id app.ProcessID) error RemoveProcess(origin string, id app.ProcessID) error
UpdateProcess(origin string, id app.ProcessID, config *app.Config) error UpdateProcess(origin string, id app.ProcessID, config *app.Config) error
SetProcessCommand(origin string, id app.ProcessID, order string) error
SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error
IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error) IAM(superuser iamidentity.User, jwtRealm, jwtSecret string) (iam.IAM, error)
@@ -876,6 +877,47 @@ func (c *cluster) UpdateProcess(origin string, id app.ProcessID, config *app.Con
return c.applyCommand(cmd) return c.applyCommand(cmd)
} }
func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command string) error {
if ok, _ := c.IsDegraded(); ok {
return ErrDegraded
}
if !c.IsRaftLeader() {
return c.forwarder.SetProcessCommand(origin, id, command)
}
if command == "start" || command == "stop" {
cmd := &store.Command{
Operation: store.OpSetProcessOrder,
Data: &store.CommandSetProcessOrder{
ID: id,
Order: command,
},
}
return c.applyCommand(cmd)
}
procs := c.proxy.ListProxyProcesses()
nodeid := ""
for _, p := range procs {
if p.Config.ProcessID() != id {
continue
}
nodeid = p.NodeID
break
}
if len(nodeid) == 0 {
return fmt.Errorf("the process '%s' is not registered with any node", id.String())
}
return c.proxy.CommandProcess(nodeid, id, command)
}
func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { func (c *cluster) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error {
if ok, _ := c.IsDegraded(); ok { if ok, _ := c.IsDegraded(); ok {
return ErrDegraded return ErrDegraded

View File

@@ -661,6 +661,63 @@ const docTemplateClusterAPI = `{
} }
} }
}, },
"/v1/process/{id}/command": {
"put": {
"description": "Set the order for a process.",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Set the order for a process",
"operationId": "cluster-3-set-process-order",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain to act on",
"name": "domain",
"in": "query"
},
{
"description": "Process order",
"name": "data",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/client.SetProcessCommandRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
},
"/v1/process/{id}/metadata/{key}": { "/v1/process/{id}/metadata/{key}": {
"put": { "put": {
"description": "Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.", "description": "Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.",
@@ -1063,6 +1120,14 @@ const docTemplateClusterAPI = `{
} }
} }
}, },
"client.SetProcessCommandRequest": {
"type": "object",
"properties": {
"order": {
"type": "string"
}
}
},
"client.SetProcessMetadataRequest": { "client.SetProcessMetadataRequest": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -653,6 +653,63 @@
} }
} }
}, },
"/v1/process/{id}/command": {
"put": {
"description": "Set the order for a process.",
"produces": [
"application/json"
],
"tags": [
"v1.0.0"
],
"summary": "Set the order for a process",
"operationId": "cluster-3-set-process-order",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain to act on",
"name": "domain",
"in": "query"
},
{
"description": "Process order",
"name": "data",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/client.SetProcessCommandRequest"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
},
"508": {
"description": "Loop Detected",
"schema": {
"$ref": "#/definitions/cluster.Error"
}
}
}
}
},
"/v1/process/{id}/metadata/{key}": { "/v1/process/{id}/metadata/{key}": {
"put": { "put": {
"description": "Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.", "description": "Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.",
@@ -1055,6 +1112,14 @@
} }
} }
}, },
"client.SetProcessCommandRequest": {
"type": "object",
"properties": {
"order": {
"type": "string"
}
}
},
"client.SetProcessMetadataRequest": { "client.SetProcessMetadataRequest": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -132,6 +132,11 @@ definitions:
$ref: '#/definitions/access.Policy' $ref: '#/definitions/access.Policy'
type: array type: array
type: object type: object
client.SetProcessCommandRequest:
properties:
order:
type: string
type: object
client.SetProcessMetadataRequest: client.SetProcessMetadataRequest:
properties: properties:
metadata: {} metadata: {}
@@ -1096,6 +1101,44 @@ paths:
summary: Replace an existing process summary: Replace an existing process
tags: tags:
- v1.0.0 - v1.0.0
/v1/process/{id}/command:
put:
description: Set the order for a process.
operationId: cluster-3-set-process-order
parameters:
- description: Process ID
in: path
name: id
required: true
type: string
- description: Domain to act on
in: query
name: domain
type: string
- description: Process order
in: body
name: data
required: true
schema:
$ref: '#/definitions/client.SetProcessCommandRequest'
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/cluster.Error'
"508":
description: Loop Detected
schema:
$ref: '#/definitions/cluster.Error'
summary: Set the order for a process
tags:
- v1.0.0
/v1/process/{id}/metadata/{key}: /v1/process/{id}/metadata/{key}:
put: put:
description: Add arbitrary JSON metadata under the given key. If the key exists, description: Add arbitrary JSON metadata under the given key. If the key exists,

View File

@@ -24,8 +24,9 @@ type Forwarder interface {
AddProcess(origin string, config *app.Config) error AddProcess(origin string, config *app.Config) error
UpdateProcess(origin string, id app.ProcessID, config *app.Config) error UpdateProcess(origin string, id app.ProcessID, config *app.Config) error
SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error
RemoveProcess(origin string, id app.ProcessID) error RemoveProcess(origin string, id app.ProcessID) error
SetProcessCommand(origin string, id app.ProcessID, command string) error
SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error
AddIdentity(origin string, identity iamidentity.User) error AddIdentity(origin string, identity iamidentity.User) error
UpdateIdentity(origin, name string, identity iamidentity.User) error UpdateIdentity(origin, name string, identity iamidentity.User) error
@@ -177,6 +178,22 @@ func (f *forwarder) UpdateProcess(origin string, id app.ProcessID, config *app.C
return client.UpdateProcess(origin, id, r) return client.UpdateProcess(origin, id, r)
} }
func (f *forwarder) SetProcessCommand(origin string, id app.ProcessID, command string) error {
if origin == "" {
origin = f.id
}
r := apiclient.SetProcessCommandRequest{
Command: command,
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.SetProcessCommand(origin, id, r)
}
func (f *forwarder) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error { func (f *forwarder) SetProcessMetadata(origin string, id app.ProcessID, key string, data interface{}) error {
if origin == "" { if origin == "" {
origin = f.id origin = f.id

View File

@@ -621,6 +621,9 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
// we want to be running on the nodes. // we want to be running on the nodes.
wantMap := map[string]store.Process{} wantMap := map[string]store.Process{}
for _, process := range want { for _, process := range want {
if process.Order != "start" {
continue
}
pid := process.Config.ProcessID().String() pid := process.Config.ProcessID().String()
wantMap[pid] = process wantMap[pid] = process
} }
@@ -634,7 +637,8 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
for _, haveP := range have { for _, haveP := range have {
pid := haveP.Config.ProcessID().String() pid := haveP.Config.ProcessID().String()
if wantP, ok := wantMap[pid]; !ok { wantP, ok := wantMap[pid]
if !ok {
// The process is not on the wantMap. Delete it and adjust the resources. // The process is not on the wantMap. Delete it and adjust the resources.
opStack = append(opStack, processOpDelete{ opStack = append(opStack, processOpDelete{
nodeid: haveP.NodeID, nodeid: haveP.NodeID,
@@ -664,11 +668,12 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
delete(wantMap, pid) delete(wantMap, pid)
reality[pid] = haveP.NodeID reality[pid] = haveP.NodeID
if haveP.Order != "start" { if haveP.Order != wantP.Order { // wantP.Order is always "start" because we selected only those above
opStack = append(opStack, processOpStart{ opStack = append(opStack, processOpStart{
nodeid: haveP.NodeID, nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(), processid: haveP.Config.ProcessID(),
}) })
} }
haveAfterRemove = append(haveAfterRemove, haveP) haveAfterRemove = append(haveAfterRemove, haveP)

View File

@@ -35,6 +35,8 @@ type Node interface {
AddProcess(config *app.Config, metadata map[string]interface{}) error AddProcess(config *app.Config, metadata map[string]interface{}) error
StartProcess(id app.ProcessID) error StartProcess(id app.ProcessID) error
StopProcess(id app.ProcessID) error StopProcess(id app.ProcessID) error
RestartProcess(id app.ProcessID) error
ReloadProcess(id app.ProcessID) error
DeleteProcess(id app.ProcessID) error DeleteProcess(id app.ProcessID) error
UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error
@@ -1076,6 +1078,28 @@ func (n *node) StopProcess(id app.ProcessID) error {
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "stop") return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "stop")
} }
func (n *node) RestartProcess(id app.ProcessID) error {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
}
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "restart")
}
func (n *node) ReloadProcess(id app.ProcessID) error {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
}
return n.peer.ProcessCommand(client.NewProcessID(id.ID, id.Domain), "reload")
}
func (n *node) DeleteProcess(id app.ProcessID) error { func (n *node) DeleteProcess(id app.ProcessID) error {
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() defer n.peerLock.RUnlock()

View File

@@ -30,6 +30,7 @@ type Proxy interface {
DeleteProcess(nodeid string, id app.ProcessID) error DeleteProcess(nodeid string, id app.ProcessID) error
StartProcess(nodeid string, id app.ProcessID) error StartProcess(nodeid string, id app.ProcessID) error
UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error UpdateProcess(nodeid string, id app.ProcessID, config *app.Config, metadata map[string]interface{}) error
CommandProcess(nodeid string, id app.ProcessID, command string) error
} }
type ProxyReader interface { type ProxyReader interface {
@@ -652,3 +653,30 @@ func (p *proxy) UpdateProcess(nodeid string, id app.ProcessID, config *app.Confi
return node.UpdateProcess(id, config, metadata) return node.UpdateProcess(id, config, metadata)
} }
func (p *proxy) CommandProcess(nodeid string, id app.ProcessID, command string) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
var err error = nil
switch command {
case "start":
err = node.StartProcess(id)
case "stop":
err = node.StopProcess(id)
case "restart":
err = node.RestartProcess(id)
case "reload":
err = node.ReloadProcess(id)
default:
err = fmt.Errorf("unknown command: %s", command)
}
return err
}

View File

@@ -41,6 +41,7 @@ type Process struct {
CreatedAt time.Time CreatedAt time.Time
UpdatedAt time.Time UpdatedAt time.Time
Config *app.Config Config *app.Config
Order string
Metadata map[string]interface{} Metadata map[string]interface{}
} }
@@ -65,6 +66,7 @@ const (
OpAddProcess Operation = "addProcess" OpAddProcess Operation = "addProcess"
OpRemoveProcess Operation = "removeProcess" OpRemoveProcess Operation = "removeProcess"
OpUpdateProcess Operation = "updateProcess" OpUpdateProcess Operation = "updateProcess"
OpSetProcessOrder Operation = "setProcessOrder"
OpSetProcessMetadata Operation = "setProcessMetadata" OpSetProcessMetadata Operation = "setProcessMetadata"
OpAddIdentity Operation = "addIdentity" OpAddIdentity Operation = "addIdentity"
OpUpdateIdentity Operation = "updateIdentity" OpUpdateIdentity Operation = "updateIdentity"
@@ -96,6 +98,11 @@ type CommandRemoveProcess struct {
ID app.ProcessID ID app.ProcessID
} }
type CommandSetProcessOrder struct {
ID app.ProcessID
Order string
}
type CommandSetProcessMetadata struct { type CommandSetProcessMetadata struct {
ID app.ProcessID ID app.ProcessID
Key string Key string
@@ -244,7 +251,7 @@ func (s *store) Apply(entry *raft.Log) interface{} {
return nil return nil
} }
func convertCommand[T any](cmd T, data any) error { func decodeCommand[T any](cmd T, data any) error {
b, err := json.Marshal(data) b, err := json.Marshal(data)
if err != nil { if err != nil {
return err return err
@@ -261,7 +268,7 @@ func (s *store) applyCommand(c Command) error {
switch c.Operation { switch c.Operation {
case OpAddProcess: case OpAddProcess:
cmd := CommandAddProcess{} cmd := CommandAddProcess{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -269,7 +276,7 @@ func (s *store) applyCommand(c Command) error {
err = s.addProcess(cmd) err = s.addProcess(cmd)
case OpRemoveProcess: case OpRemoveProcess:
cmd := CommandRemoveProcess{} cmd := CommandRemoveProcess{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -277,15 +284,23 @@ func (s *store) applyCommand(c Command) error {
err = s.removeProcess(cmd) err = s.removeProcess(cmd)
case OpUpdateProcess: case OpUpdateProcess:
cmd := CommandUpdateProcess{} cmd := CommandUpdateProcess{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
err = s.updateProcess(cmd) err = s.updateProcess(cmd)
case OpSetProcessOrder:
cmd := CommandSetProcessOrder{}
err = decodeCommand(&cmd, c.Data)
if err != nil {
break
}
err = s.setProcessOrder(cmd)
case OpSetProcessMetadata: case OpSetProcessMetadata:
cmd := CommandSetProcessMetadata{} cmd := CommandSetProcessMetadata{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -293,7 +308,7 @@ func (s *store) applyCommand(c Command) error {
err = s.setProcessMetadata(cmd) err = s.setProcessMetadata(cmd)
case OpAddIdentity: case OpAddIdentity:
cmd := CommandAddIdentity{} cmd := CommandAddIdentity{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -301,7 +316,7 @@ func (s *store) applyCommand(c Command) error {
err = s.addIdentity(cmd) err = s.addIdentity(cmd)
case OpUpdateIdentity: case OpUpdateIdentity:
cmd := CommandUpdateIdentity{} cmd := CommandUpdateIdentity{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -309,7 +324,7 @@ func (s *store) applyCommand(c Command) error {
err = s.updateIdentity(cmd) err = s.updateIdentity(cmd)
case OpRemoveIdentity: case OpRemoveIdentity:
cmd := CommandRemoveIdentity{} cmd := CommandRemoveIdentity{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -317,7 +332,7 @@ func (s *store) applyCommand(c Command) error {
err = s.removeIdentity(cmd) err = s.removeIdentity(cmd)
case OpSetPolicies: case OpSetPolicies:
cmd := CommandSetPolicies{} cmd := CommandSetPolicies{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -325,7 +340,7 @@ func (s *store) applyCommand(c Command) error {
err = s.setPolicies(cmd) err = s.setPolicies(cmd)
case OpSetProcessNodeMap: case OpSetProcessNodeMap:
cmd := CommandSetProcessNodeMap{} cmd := CommandSetProcessNodeMap{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -333,7 +348,7 @@ func (s *store) applyCommand(c Command) error {
err = s.setProcessNodeMap(cmd) err = s.setProcessNodeMap(cmd)
case OpCreateLock: case OpCreateLock:
cmd := CommandCreateLock{} cmd := CommandCreateLock{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -341,7 +356,7 @@ func (s *store) applyCommand(c Command) error {
err = s.createLock(cmd) err = s.createLock(cmd)
case OpDeleteLock: case OpDeleteLock:
cmd := CommandDeleteLock{} cmd := CommandDeleteLock{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -349,7 +364,7 @@ func (s *store) applyCommand(c Command) error {
err = s.deleteLock(cmd) err = s.deleteLock(cmd)
case OpClearLocks: case OpClearLocks:
cmd := CommandClearLocks{} cmd := CommandClearLocks{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -357,7 +372,7 @@ func (s *store) applyCommand(c Command) error {
err = s.clearLocks(cmd) err = s.clearLocks(cmd)
case OpSetKV: case OpSetKV:
cmd := CommandSetKV{} cmd := CommandSetKV{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -365,7 +380,7 @@ func (s *store) applyCommand(c Command) error {
err = s.setKV(cmd) err = s.setKV(cmd)
case OpUnsetKV: case OpUnsetKV:
cmd := CommandUnsetKV{} cmd := CommandUnsetKV{}
err = convertCommand(&cmd, c.Data) err = decodeCommand(&cmd, c.Data)
if err != nil { if err != nil {
break break
} }
@@ -394,11 +409,18 @@ func (s *store) addProcess(cmd CommandAddProcess) error {
return fmt.Errorf("the process with the ID '%s' already exists", id) return fmt.Errorf("the process with the ID '%s' already exists", id)
} }
order := "stop"
if cmd.Config.Autostart {
order = "start"
cmd.Config.Autostart = false
}
now := time.Now() now := time.Now()
s.data.Process[id] = Process{ s.data.Process[id] = Process{
CreatedAt: now, CreatedAt: now,
UpdatedAt: now, UpdatedAt: now,
Config: cmd.Config, Config: cmd.Config,
Order: order,
Metadata: map[string]interface{}{}, Metadata: map[string]interface{}{},
} }
@@ -442,10 +464,10 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
} }
if srcid == dstid { if srcid == dstid {
s.data.Process[srcid] = Process{ p.UpdatedAt = time.Now()
UpdatedAt: time.Now(), p.Config = cmd.Config
Config: cmd.Config,
} s.data.Process[srcid] = p
return nil return nil
} }
@@ -455,12 +477,34 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
return fmt.Errorf("the process with the ID '%s' already exists", dstid) return fmt.Errorf("the process with the ID '%s' already exists", dstid)
} }
now := time.Now()
p.CreatedAt = now
p.UpdatedAt = now
p.Config = cmd.Config
delete(s.data.Process, srcid) delete(s.data.Process, srcid)
s.data.Process[dstid] = Process{ s.data.Process[dstid] = p
UpdatedAt: time.Now(),
Config: cmd.Config, return nil
}
func (s *store) setProcessOrder(cmd CommandSetProcessOrder) error {
s.lock.Lock()
defer s.lock.Unlock()
id := cmd.ID.String()
p, ok := s.data.Process[id]
if !ok {
return fmt.Errorf("the process with the ID '%s' doesn't exists", cmd.ID)
} }
p.Order = cmd.Order
p.UpdatedAt = time.Now()
s.data.Process[id] = p
return nil return nil
} }
@@ -736,6 +780,7 @@ func (s *store) ListProcesses() []Process {
CreatedAt: p.CreatedAt, CreatedAt: p.CreatedAt,
UpdatedAt: p.UpdatedAt, UpdatedAt: p.UpdatedAt,
Config: p.Config.Clone(), Config: p.Config.Clone(),
Order: p.Order,
Metadata: p.Metadata, Metadata: p.Metadata,
}) })
} }

View File

@@ -1273,6 +1273,77 @@ const docTemplate = `{
} }
} }
}, },
"/api/v3/cluster/process/{id}/command": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Issue a command to a process: start, stop, reload, restart",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Issue a command to a process in the cluster",
"operationId": "cluster-3-set-process-command",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain to act on",
"name": "domain",
"in": "query"
},
{
"description": "Process command",
"name": "command",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.Command"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/process/{id}/metadata/{key}": { "/api/v3/cluster/process/{id}/metadata/{key}": {
"put": { "put": {
"security": [ "security": [

View File

@@ -1265,6 +1265,77 @@
} }
} }
}, },
"/api/v3/cluster/process/{id}/command": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Issue a command to a process: start, stop, reload, restart",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Issue a command to a process in the cluster",
"operationId": "cluster-3-set-process-command",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"type": "string",
"description": "Domain to act on",
"name": "domain",
"in": "query"
},
{
"description": "Process command",
"name": "command",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.Command"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"403": {
"description": "Forbidden",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/process/{id}/metadata/{key}": { "/api/v3/cluster/process/{id}/metadata/{key}": {
"put": { "put": {
"security": [ "security": [

View File

@@ -3200,6 +3200,52 @@ paths:
summary: Replace an existing process summary: Replace an existing process
tags: tags:
- v16.?.? - v16.?.?
/api/v3/cluster/process/{id}/command:
put:
consumes:
- application/json
description: 'Issue a command to a process: start, stop, reload, restart'
operationId: cluster-3-set-process-command
parameters:
- description: Process ID
in: path
name: id
required: true
type: string
- description: Domain to act on
in: query
name: domain
type: string
- description: Process command
in: body
name: command
required: true
schema:
$ref: '#/definitions/api.Command'
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"403":
description: Forbidden
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Issue a command to a process in the cluster
tags:
- v16.?.?
/api/v3/cluster/process/{id}/metadata/{key}: /api/v3/cluster/process/{id}/metadata/{key}:
put: put:
description: Add arbitrary JSON metadata under the given key. If the key exists, description: Add arbitrary JSON metadata under the given key. If the key exists,

View File

@@ -595,6 +595,58 @@ func (h *ClusterHandler) UpdateProcess(c echo.Context) error {
return c.JSON(http.StatusOK, process) return c.JSON(http.StatusOK, process)
} }
// Command issues a command to a process in the cluster
// @Summary Issue a command to a process in the cluster
// @Description Issue a command to a process: start, stop, reload, restart
// @Tags v16.?.?
// @ID cluster-3-set-process-command
// @Accept json
// @Produce json
// @Param id path string true "Process ID"
// @Param domain query string false "Domain to act on"
// @Param command body api.Command true "Process command"
// @Success 200 {string} string
// @Failure 400 {object} api.Error
// @Failure 403 {object} api.Error
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process/{id}/command [put]
func (h *ClusterHandler) SetProcessCommand(c echo.Context) error {
id := util.PathParam(c, "id")
ctxuser := util.DefaultContext(c, "user", "")
domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
var command api.Command
if err := util.ShouldBindJSON(c, &command); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
pid := app.ProcessID{
ID: id,
Domain: domain,
}
switch command.Command {
case "start":
case "stop":
case "restart":
case "reload":
default:
return api.Err(http.StatusBadRequest, "", "unknown command provided. known commands are: start, stop, reload, restart")
}
if err := h.cluster.SetProcessCommand("", pid, command.Command); err != nil {
return api.Err(http.StatusNotFound, "", "command failed: %s", err)
}
return c.JSON(http.StatusOK, "OK")
}
// SetProcessMetadata stores metadata with a process // SetProcessMetadata stores metadata with a process
// @Summary Add JSON metadata with a process under the given key // @Summary Add JSON metadata with a process under the given key
// @Description Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created. // @Description Add arbitrary JSON metadata under the given key. If the key exists, all already stored metadata with this key will be overwritten. If the key doesn't exist, it will be created.
@@ -618,17 +670,17 @@ func (h *ClusterHandler) SetProcessMetadata(c echo.Context) error {
domain := util.DefaultQuery(c, "domain", "") domain := util.DefaultQuery(c, "domain", "")
if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") { if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") {
return api.Err(http.StatusForbidden, "Forbidden") return api.Err(http.StatusForbidden, "")
} }
if len(key) == 0 { if len(key) == 0 {
return api.Err(http.StatusBadRequest, "Invalid key", "The key must not be of length 0") return api.Err(http.StatusBadRequest, "", "invalid key: the key must not be of length 0")
} }
var data api.Metadata var data api.Metadata
if err := util.ShouldBindJSONValidation(c, &data, false); err != nil { if err := util.ShouldBindJSONValidation(c, &data, false); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
} }
pid := app.ProcessID{ pid := app.ProcessID{
@@ -637,7 +689,7 @@ func (h *ClusterHandler) SetProcessMetadata(c echo.Context) error {
} }
if err := h.cluster.SetProcessMetadata("", pid, key, data); err != nil { if err := h.cluster.SetProcessMetadata("", pid, key, data); err != nil {
return api.Err(http.StatusNotFound, "Unknown process ID", "%s", err) return api.Err(http.StatusNotFound, "", "setting metadata failed: %s", err.Error())
} }
return c.JSON(http.StatusOK, data) return c.JSON(http.StatusOK, data)

View File

@@ -700,6 +700,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.POST("/cluster/process", s.v3handler.cluster.AddProcess) v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)
v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess) v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess)
v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess) v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess)
v3.PUT("/cluster/process/:id/command", s.v3handler.cluster.SetProcessCommand)
v3.PUT("/cluster/process/:id/metadata/:key", s.v3handler.cluster.SetProcessMetadata) v3.PUT("/cluster/process/:id/metadata/:key", s.v3handler.cluster.SetProcessMetadata)
v3.PUT("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM) v3.PUT("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM)