diff --git a/cluster/docs/ClusterAPI_docs.go b/cluster/docs/ClusterAPI_docs.go index ba957dfd..420267fe 100644 --- a/cluster/docs/ClusterAPI_docs.go +++ b/cluster/docs/ClusterAPI_docs.go @@ -1428,6 +1428,9 @@ const docTemplateClusterAPI = `{ "debug": { "type": "object", "properties": { + "agent_address": { + "type": "string" + }, "auto_max_procs": { "type": "boolean" }, @@ -1863,6 +1866,9 @@ const docTemplateClusterAPI = `{ "key_file": { "type": "string" }, + "secret": { + "type": "string" + }, "staging": { "type": "boolean" } diff --git a/cluster/docs/ClusterAPI_swagger.json b/cluster/docs/ClusterAPI_swagger.json index d59e4202..4c356bd1 100644 --- a/cluster/docs/ClusterAPI_swagger.json +++ b/cluster/docs/ClusterAPI_swagger.json @@ -1420,6 +1420,9 @@ "debug": { "type": "object", "properties": { + "agent_address": { + "type": "string" + }, "auto_max_procs": { "type": "boolean" }, @@ -1855,6 +1858,9 @@ "key_file": { "type": "string" }, + "secret": { + "type": "string" + }, "staging": { "type": "boolean" } diff --git a/cluster/docs/ClusterAPI_swagger.yaml b/cluster/docs/ClusterAPI_swagger.yaml index 726e5d88..d9078dfa 100644 --- a/cluster/docs/ClusterAPI_swagger.yaml +++ b/cluster/docs/ClusterAPI_swagger.yaml @@ -262,6 +262,8 @@ definitions: type: object debug: properties: + agent_address: + type: string auto_max_procs: type: boolean force_gc: @@ -556,6 +558,8 @@ definitions: type: boolean key_file: type: string + secret: + type: string staging: type: boolean type: object diff --git a/cluster/process.go b/cluster/process.go index 5881c4e0..c2805e73 100644 --- a/cluster/process.go +++ b/cluster/process.go @@ -78,11 +78,11 @@ func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command str return ErrDegraded } - if !c.IsRaftLeader() { - return c.forwarder.SetProcessCommand(origin, id, command) - } - if command == "start" || command == "stop" { + if !c.IsRaftLeader() { + return c.forwarder.SetProcessCommand(origin, id, command) + } + cmd := &store.Command{ Operation: store.OpSetProcessOrder, Data: &store.CommandSetProcessOrder{ @@ -94,21 +94,9 @@ func (c *cluster) SetProcessCommand(origin string, id app.ProcessID, command str return c.applyCommand(cmd) } - procs := c.proxy.ListProxyProcesses() - nodeid := "" - - for _, p := range procs { - if p.Config.ProcessID() != id { - continue - } - - nodeid = p.NodeID - - break - } - - if len(nodeid) == 0 { - return fmt.Errorf("the process '%s' is not registered with any node", id.String()) + nodeid, err := c.proxy.FindNodeFromProcess(id) + if err != nil { + return fmt.Errorf("the process '%s' is not registered with any node: %w", id.String(), err) } return c.proxy.CommandProcess(nodeid, id, command) diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 38b3920d..d0c2862d 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -40,6 +40,7 @@ type Node interface { ReloadProcess(id app.ProcessID) error DeleteProcess(id app.ProcessID) error UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error + ProbeProcess(id app.ProcessID) (clientapi.Probe, error) NodeReader } @@ -1106,3 +1107,21 @@ func (n *node) UpdateProcess(id app.ProcessID, config *app.Config, metadata map[ return n.peer.ProcessUpdate(client.NewProcessID(id.ID, id.Domain), cfg) } + +func (n *node) ProbeProcess(id app.ProcessID) (clientapi.Probe, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + probe := clientapi.Probe{ + Log: []string{fmt.Sprintf("the node %s where the process %s resides, is not connected", n.id, id.String())}, + } + return probe, fmt.Errorf("not connected") + } + + probe, err := n.peer.ProcessProbe(client.NewProcessID(id.ID, id.Domain)) + + probe.Log = append([]string{fmt.Sprintf("probed on node: %s", n.id)}, probe.Log...) + + return probe, err +} diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index fbd14efe..a90de881 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -35,9 +35,12 @@ type ProxyReader interface { ListNodes() []NodeReader GetNode(id string) (NodeReader, error) + FindNodeFromProcess(id app.ProcessID) (string, error) + Resources() map[string]NodeResources ListProcesses(ProcessListOptions) []clientapi.Process ListProxyProcesses() []Process + ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) GetURL(prefix, path string) (*url.URL, error) GetFile(prefix, path string, offset int64) (io.ReadCloser, error) @@ -68,6 +71,14 @@ func (p *proxyReader) GetNode(id string) (NodeReader, error) { return p.proxy.GetNode(id) } +func (p *proxyReader) FindNodeFromProcess(id app.ProcessID) (string, error) { + if p.proxy == nil { + return "", fmt.Errorf("no proxy provided") + } + + return p.proxy.FindNodeFromProcess(id) +} + func (p *proxyReader) Resources() map[string]NodeResources { if p.proxy == nil { return nil @@ -92,6 +103,16 @@ func (p *proxyReader) ListProxyProcesses() []Process { return p.proxy.ListProxyProcesses() } +func (p *proxyReader) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) { + if p.proxy == nil { + return clientapi.Probe{ + Log: []string{fmt.Sprintf("no proxy for node %s provided", nodeid)}, + }, fmt.Errorf("no proxy provided") + } + + return p.proxy.ProbeProcess(nodeid, id) +} + func (p *proxyReader) GetURL(prefix, path string) (*url.URL, error) { if p.proxy == nil { return nil, fmt.Errorf("no proxy provided") @@ -511,6 +532,27 @@ func (p *proxy) ListProxyProcesses() []Process { return processList } +func (p *proxy) FindNodeFromProcess(id app.ProcessID) (string, error) { + procs := p.ListProxyProcesses() + nodeid := "" + + for _, p := range procs { + if p.Config.ProcessID() != id { + continue + } + + nodeid = p.NodeID + + break + } + + if len(nodeid) == 0 { + return "", fmt.Errorf("the process '%s' is not registered with any node", id.String()) + } + + return nodeid, nil +} + func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { processChan := make(chan clientapi.Process, 64) processList := []clientapi.Process{} @@ -570,11 +612,6 @@ func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[strin return err } - err = node.StartProcess(config.ProcessID()) - if err != nil { - return err - } - return nil } @@ -633,3 +670,18 @@ func (p *proxy) CommandProcess(nodeid string, id app.ProcessID, command string) return err } + +func (p *proxy) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + node, ok := p.nodes[nodeid] + if !ok { + probe := clientapi.Probe{ + Log: []string{fmt.Sprintf("the node %s where the process %s should reside on, doesn't exist", nodeid, id.String())}, + } + return probe, fmt.Errorf("node not found") + } + + return node.ProbeProcess(id) +} diff --git a/docs/docs.go b/docs/docs.go index 3682e386..401bdbda 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1514,6 +1514,53 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/process/{id}/probe": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Probe an existing process to get a detailed stream information on the inputs.", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Probe a process in the cluster", + "operationId": "cluster-3-process-probe", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Probe" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/cluster/snapshot": { "get": { "security": [ @@ -4617,6 +4664,9 @@ const docTemplate = `{ "debug": { "type": "object", "properties": { + "agent_address": { + "type": "string" + }, "auto_max_procs": { "type": "boolean" }, @@ -5052,6 +5102,9 @@ const docTemplate = `{ "key_file": { "type": "string" }, + "secret": { + "type": "string" + }, "staging": { "type": "boolean" } @@ -6844,6 +6897,9 @@ const docTemplate = `{ "debug": { "type": "object", "properties": { + "agent_address": { + "type": "string" + }, "auto_max_procs": { "type": "boolean" }, @@ -7279,6 +7335,9 @@ const docTemplate = `{ "key_file": { "type": "string" }, + "secret": { + "type": "string" + }, "staging": { "type": "boolean" } diff --git a/docs/swagger.json b/docs/swagger.json index 1c81f68f..69f48ec3 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1506,6 +1506,53 @@ } } }, + "/api/v3/cluster/process/{id}/probe": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Probe an existing process to get a detailed stream information on the inputs.", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Probe a process in the cluster", + "operationId": "cluster-3-process-probe", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Domain to act on", + "name": "domain", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Probe" + } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, "/api/v3/cluster/snapshot": { "get": { "security": [ @@ -4609,6 +4656,9 @@ "debug": { "type": "object", "properties": { + "agent_address": { + "type": "string" + }, "auto_max_procs": { "type": "boolean" }, @@ -5044,6 +5094,9 @@ "key_file": { "type": "string" }, + "secret": { + "type": "string" + }, "staging": { "type": "boolean" } @@ -6836,6 +6889,9 @@ "debug": { "type": "object", "properties": { + "agent_address": { + "type": "string" + }, "auto_max_procs": { "type": "boolean" }, @@ -7271,6 +7327,9 @@ "key_file": { "type": "string" }, + "secret": { + "type": "string" + }, "staging": { "type": "boolean" } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index b6cdb571..b0fd9e69 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -301,6 +301,8 @@ definitions: type: object debug: properties: + agent_address: + type: string auto_max_procs: type: boolean force_gc: @@ -595,6 +597,8 @@ definitions: type: boolean key_file: type: string + secret: + type: string staging: type: boolean type: object @@ -1873,6 +1877,8 @@ definitions: type: object debug: properties: + agent_address: + type: string auto_max_procs: type: boolean force_gc: @@ -2167,6 +2173,8 @@ definitions: type: boolean key_file: type: string + secret: + type: string staging: type: boolean type: object @@ -3381,6 +3389,37 @@ paths: summary: Add JSON metadata with a process under the given key tags: - v16.?.? + /api/v3/cluster/process/{id}/probe: + get: + description: Probe an existing process to get a detailed stream information + on the inputs. + operationId: cluster-3-process-probe + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + - description: Domain to act on + in: query + name: domain + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.Probe' + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Probe a process in the cluster + tags: + - v16.?.? /api/v3/cluster/snapshot: get: description: Retrieve snapshot of the cluster DB diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index a500ade6..bea1520a 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -771,6 +771,44 @@ func (h *ClusterHandler) SetProcessMetadata(c echo.Context) error { return c.JSON(http.StatusOK, data) } +// Probe probes a process in the cluster +// @Summary Probe a process in the cluster +// @Description Probe an existing process to get a detailed stream information on the inputs. +// @Tags v16.?.? +// @ID cluster-3-process-probe +// @Produce json +// @Param id path string true "Process ID" +// @Param domain query string false "Domain to act on" +// @Success 200 {object} api.Probe +// @Failure 403 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process/{id}/probe [get] +func (h *ClusterHandler) ProbeProcess(c echo.Context) error { + id := util.PathParam(c, "id") + ctxuser := util.DefaultContext(c, "user", "") + domain := util.DefaultQuery(c, "domain", "") + + if !h.iam.Enforce(ctxuser, domain, "process:"+id, "write") { + return api.Err(http.StatusForbidden, "") + } + + pid := app.ProcessID{ + ID: id, + Domain: domain, + } + + nodeid, err := h.proxy.FindNodeFromProcess(pid) + if err != nil { + return c.JSON(http.StatusOK, api.Probe{ + Log: []string{fmt.Sprintf("the process can't be found: %s", err.Error())}, + }) + } + + probe, _ := h.proxy.ProbeProcess(nodeid, pid) + + return c.JSON(http.StatusOK, probe) +} + // Delete deletes the process with the given ID from the cluster // @Summary Delete a process by its ID // @Description Delete a process by its ID diff --git a/http/server.go b/http/server.go index 6ec64353..55a12906 100644 --- a/http/server.go +++ b/http/server.go @@ -707,6 +707,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/process", s.v3handler.cluster.ListAllNodesProcesses) v3.GET("/cluster/process/:id", s.v3handler.cluster.GetAllNodesProcess) + v3.GET("/cluster/process/:id/probe", s.v3handler.cluster.ProbeProcess) v3.GET("/cluster/node", s.v3handler.cluster.GetNodes) v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode)