diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 79935a58..85fe13b2 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -27,7 +27,6 @@ type Node interface { GetURL(path string) (string, error) GetFile(path string) (io.ReadCloser, error) - ProcessList() ([]Process, error) ProcessAdd(*app.Config) error ProcessStart(id string) error ProcessStop(id string) error @@ -39,9 +38,11 @@ type Node interface { type NodeReader interface { IPs() []string - Files() NodeFiles About() NodeAbout Version() NodeVersion + + Files() NodeFiles + ProcessList() ([]Process, error) } type NodeFiles struct { @@ -96,7 +97,8 @@ type node struct { peer client.RestClient peerLock sync.RWMutex - cancelPing context.CancelFunc + peerWg sync.WaitGroup + disconnect context.CancelFunc lastContact time.Time @@ -228,17 +230,20 @@ func (n *node) Connect() error { n.peer = peer ctx, cancel := context.WithCancel(context.Background()) - n.cancelPing = cancel + n.disconnect = cancel + + n.peerWg.Add(2) go func(ctx context.Context) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + defer n.peerWg.Done() for { select { case <-ticker.C: // Ping - ok, latency := n.peer.Ping() + ok, latency := n.Ping() n.stateLock.Lock() if !ok { @@ -258,6 +263,7 @@ func (n *node) Connect() error { go func(ctx context.Context) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + defer n.peerWg.Done() for { select { @@ -265,26 +271,22 @@ func (n *node) Connect() error { // Metrics metrics, err := n.peer.Metrics(clientapi.MetricsQuery{ Metrics: []clientapi.MetricsQueryMetric{ - { - Name: "cpu_ncpu", - }, - { - Name: "cpu_idle", - }, - { - Name: "mem_total", - }, - { - Name: "mem_free", - }, + {Name: "cpu_ncpu"}, + {Name: "cpu_idle"}, + {Name: "mem_total"}, + {Name: "mem_free"}, }, }) + if err != nil { n.stateLock.Lock() n.resources.cpu = 100 n.resources.ncpu = 1 n.resources.mem = 0 + n.resources.memTotal = 0 n.stateLock.Unlock() + + continue } cpu_ncpu := .0 @@ -325,16 +327,52 @@ func (n *node) Connect() error { return nil } -func (n *node) Disconnect() { - n.peerLock.Lock() - defer n.peerLock.Unlock() +func (n *node) Ping() (bool, time.Duration) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() - if n.cancelPing != nil { - n.cancelPing() - n.cancelPing = nil + if n.peer == nil { + return false, 0 } + return n.peer.Ping() +} + +func (n *node) Metrics(query clientapi.MetricsQuery) (clientapi.MetricsResponse, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return clientapi.MetricsResponse{}, fmt.Errorf("not connected") + } + + return n.peer.Metrics(query) +} + +func (n *node) AboutPeer() (clientapi.About, error) { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return clientapi.About{}, fmt.Errorf("not connected") + } + + return n.peer.About(), nil +} + +func (n *node) Disconnect() { + n.peerLock.Lock() + if n.disconnect != nil { + n.disconnect() + n.disconnect = nil + } + n.peerLock.Unlock() + + n.peerWg.Wait() + + n.peerLock.Lock() n.peer = nil + n.peerLock.Unlock() } func (n *node) StartFiles(updates chan<- NodeFiles) error { @@ -387,17 +425,11 @@ func (n *node) StopFiles() { } func (n *node) About() NodeAbout { - n.peerLock.RLock() - - if n.peer == nil { - n.peerLock.RUnlock() + about, err := n.AboutPeer() + if err != nil { return NodeAbout{} } - about := n.peer.About() - - n.peerLock.RUnlock() - createdAt, err := time.Parse(time.RFC3339, about.CreatedAt) if err != nil { createdAt = time.Now() @@ -428,15 +460,11 @@ func (n *node) About() NodeAbout { } func (n *node) Version() NodeVersion { - n.peerLock.RLock() - defer n.peerLock.RUnlock() - - if n.peer == nil { + about, err := n.AboutPeer() + if err != nil { return NodeVersion{} } - about := n.peer.About() - build, err := time.Parse(time.RFC3339, about.Version.Build) if err != nil { build = time.Time{} @@ -459,11 +487,13 @@ func (n *node) IPs() []string { } func (n *node) Files() NodeFiles { + id := n.About().ID + n.stateLock.RLock() defer n.stateLock.RUnlock() state := NodeFiles{ - ID: n.About().ID, + ID: id, LastUpdate: n.lastUpdate, } @@ -486,10 +516,6 @@ func (n *node) files() { defer wgList.Done() for file := range filesChan { - if len(file) == 0 { - return - } - filesList = append(filesList, file) } }() @@ -587,7 +613,7 @@ func (n *node) files() { wg.Wait() - filesChan <- "" + close(filesChan) wgList.Wait() @@ -658,6 +684,8 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { } func (n *node) ProcessList() ([]Process, error) { + id := n.About().ID + n.peerLock.RLock() defer n.peerLock.RUnlock() @@ -679,7 +707,7 @@ func (n *node) ProcessList() ([]Process, error) { for _, p := range list { process := Process{ - NodeID: n.About().ID, + NodeID: id, Order: p.State.Order, State: p.State.State, Mem: p.State.Memory, diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 24ddcbde..5d879575 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -266,8 +266,6 @@ func (p *proxy) AddNode(id string, node Node) (string, error) { for _, ip := range ips { p.limiter.RemoveBlock(ip) } - - return id, nil } ips := node.IPs() diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 5edb7c99..c919905a 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -183,6 +183,48 @@ func (h *ClusterHandler) GetNodeFiles(c echo.Context) error { return c.JSON(http.StatusOK, files) } +// ListNodeProcesses returns the list of processes running on a node of the cluster +// @Summary List of processes in the cluster on a node +// @Description List of processes in the cluster on a node +// @Tags v16.?.? +// @ID cluster-3-list-node-processes +// @Produce json +// @Param id path string true "Node ID" +// @Success 200 {array} api.ClusterProcess +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/node/:id/process [get] +func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { + id := util.PathParam(c, "id") + + peer, err := h.proxy.GetNode(id) + if err != nil { + return api.Err(http.StatusNotFound, "Node not found", "%s", err) + } + + procs, err := peer.ProcessList() + if err != nil { + return api.Err(http.StatusInternalServerError, "", "Node not connected: %s", err) + } + + processes := []api.ClusterProcess{} + + for _, p := range procs { + processes = append(processes, api.ClusterProcess{ + ProcessID: p.Config.ID, + NodeID: p.NodeID, + Reference: p.Config.Reference, + Order: p.Order, + State: p.State, + CPU: json.ToNumber(p.CPU), + Memory: p.Mem, + Runtime: int64(p.Runtime.Seconds()), + }) + } + + return c.JSON(http.StatusOK, processes) +} + // GetCluster returns the list of nodes in the cluster // @Summary List of nodes in the cluster // @Description List of nodes in the cluster @@ -221,36 +263,6 @@ func (h *ClusterHandler) About(c echo.Context) error { return c.JSON(http.StatusOK, about) } -// ListNodeProcesses returns the list of processes running on the nodes of the cluster -// @Summary List of processes in the cluster -// @Description List of processes in the cluster -// @Tags v16.?.? -// @ID cluster-3-list-node-processes -// @Produce json -// @Success 200 {array} api.ClusterProcess -// @Security ApiKeyAuth -// @Router /api/v3/cluster/node/process [get] -func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { - procs := h.proxy.ListProcesses() - - processes := []api.ClusterProcess{} - - for _, p := range procs { - processes = append(processes, api.ClusterProcess{ - ProcessID: p.Config.ID, - NodeID: p.NodeID, - Reference: p.Config.Reference, - Order: p.Order, - State: p.State, - CPU: json.ToNumber(p.CPU), - Memory: p.Mem, - Runtime: int64(p.Runtime.Seconds()), - }) - } - - return c.JSON(http.StatusOK, processes) -} - // ListStoreProcesses returns the list of processes stored in the DB of the cluster // @Summary List of processes in the cluster // @Description List of processes in the cluster @@ -260,7 +272,7 @@ func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { // @Success 200 {array} api.Process // @Security ApiKeyAuth // @Router /api/v3/cluster/process [get] -func (h *ClusterHandler) ListProcesses(c echo.Context) error { +func (h *ClusterHandler) ListStoreProcesses(c echo.Context) error { procs := h.cluster.ListProcesses() processes := []api.Process{} diff --git a/http/middleware/iam/iam.go b/http/middleware/iam/iam.go index 860a0a96..d3b93a01 100644 --- a/http/middleware/iam/iam.go +++ b/http/middleware/iam/iam.go @@ -125,7 +125,7 @@ func NewWithConfig(config Config) echo.MiddlewareFunc { resource := c.Request().URL.Path var domain string - if resource == "/ping" { + if resource == "/ping" || resource == "/profiling" { return next(c) } diff --git a/http/server.go b/http/server.go index 368caf1b..e340e8e4 100644 --- a/http/server.go +++ b/http/server.go @@ -656,13 +656,13 @@ func (s *server) setRoutesV3(v3 *echo.Group) { // v3 Cluster if s.v3handler.cluster != nil { v3.GET("/cluster", s.v3handler.cluster.About) - v3.GET("/cluster/process", s.v3handler.cluster.ListProcesses) + v3.GET("/cluster/process", s.v3handler.cluster.ListStoreProcesses) v3.GET("/cluster/iam/user", s.v3handler.cluster.ListIdentities) v3.GET("/cluster/node", s.v3handler.cluster.GetNodes) - v3.GET("/cluster/node/process", s.v3handler.cluster.ListNodeProcesses) v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) v3.GET("/cluster/node/:id/files", s.v3handler.cluster.GetNodeFiles) + v3.GET("/cluster/node/:id/process", s.v3handler.cluster.ListNodeProcesses) v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion) if !s.readOnly {