diff --git a/app/api/api.go b/app/api/api.go index 28211601..3e3948d9 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -1027,8 +1027,8 @@ func (a *api) start() error { } metrics.Register(monitor.NewUptimeCollector()) - metrics.Register(monitor.NewCPUCollector()) - metrics.Register(monitor.NewMemCollector()) + metrics.Register(monitor.NewCPUCollector(cfg.Resources.MaxCPUUsage)) + metrics.Register(monitor.NewMemCollector(cfg.Resources.MaxMemoryUsage)) metrics.Register(monitor.NewNetCollector()) metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base"))) metrics.Register(monitor.NewFilesystemCollector("diskfs", a.diskfs)) diff --git a/cluster/api.go b/cluster/api.go index e3cf588e..8654a443 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -91,11 +91,9 @@ func NewAPI(config APIConfig) (API, error) { })) a.router.Logger.SetOutput(httplog.NewWrapper(a.logger)) - swagHandler := echoSwagger.EchoWrapHandler(echoSwagger.InstanceName("ClusterAPI")) - // Swagger API documentation router group doc := a.router.Group("/v1/swagger/*") - doc.GET("", swagHandler) + doc.GET("", echoSwagger.EchoWrapHandler(echoSwagger.InstanceName("ClusterAPI"))) a.router.POST("/v1/server", a.AddServer) a.router.DELETE("/v1/server/:id", a.RemoveServer) diff --git a/cluster/cluster.go b/cluster/cluster.go index b89deee9..3f4bcc2c 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -79,6 +79,9 @@ type ClusterConfig struct { Address string // Listen address for the raft protocol Peers []Peer // Address of a member of a cluster to join + SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes + NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes + CoreAPIAddress string // Address of the core API CoreAPIUsername string // Username for the core API CoreAPIPassword string // Password for the core API diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 80efb616..1626f7e9 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -106,8 +106,9 @@ type node struct { resources struct { ncpu float64 cpu float64 + cpuLimit float64 mem uint64 - memTotal uint64 + memLimit uint64 } state nodeState @@ -269,8 +270,10 @@ func (n *node) Connect() error { Metrics: []clientapi.MetricsQueryMetric{ {Name: "cpu_ncpu"}, {Name: "cpu_idle"}, + {Name: "cpu_limit"}, {Name: "mem_total"}, {Name: "mem_free"}, + {Name: "mem_limit"}, }, }) @@ -278,8 +281,10 @@ func (n *node) Connect() error { n.stateLock.Lock() n.resources.cpu = 100 n.resources.ncpu = 1 + n.resources.cpuLimit = 0 n.resources.mem = 0 - n.resources.memTotal = 0 + n.resources.memLimit = 0 + n.state = stateDisconnected n.stateLock.Unlock() continue @@ -287,30 +292,37 @@ func (n *node) Connect() error { cpu_ncpu := .0 cpu_idle := .0 + cpu_limit := .0 mem_total := uint64(0) mem_free := uint64(0) + mem_limit := uint64(0) for _, x := range metrics.Metrics { if x.Name == "cpu_idle" { cpu_idle = x.Values[0].Value } else if x.Name == "cpu_ncpu" { cpu_ncpu = x.Values[0].Value + } else if x.Name == "cpu_limit" { + cpu_limit = x.Values[0].Value } else if x.Name == "mem_total" { mem_total = uint64(x.Values[0].Value) } else if x.Name == "mem_free" { mem_free = uint64(x.Values[0].Value) + } else if x.Name == "mem_limit" { + mem_limit = uint64(x.Values[0].Value) } } n.stateLock.Lock() n.resources.ncpu = cpu_ncpu n.resources.cpu = (100 - cpu_idle) * cpu_ncpu + n.resources.cpuLimit = cpu_limit * cpu_ncpu if mem_total != 0 { n.resources.mem = mem_total - mem_free - n.resources.memTotal = mem_total + n.resources.memLimit = mem_limit } else { n.resources.mem = 0 - n.resources.memTotal = 0 + n.resources.memLimit = 0 } n.lastContact = time.Now() n.stateLock.Unlock() @@ -423,7 +435,9 @@ func (n *node) StopFiles() { func (n *node) About() NodeAbout { about, err := n.AboutPeer() if err != nil { - return NodeAbout{} + return NodeAbout{ + State: stateDisconnected.String(), + } } createdAt, err := time.Parse(time.RFC3339, about.CreatedAt) @@ -434,11 +448,16 @@ func (n *node) About() NodeAbout { n.stateLock.RLock() defer n.stateLock.RUnlock() - state := NodeAbout{ + state := n.state + if time.Since(n.lastContact) > 3*time.Second { + state = stateDisconnected + } + + nodeAbout := NodeAbout{ ID: about.ID, Name: about.Name, Address: n.address, - State: n.state.String(), + State: state.String(), CreatedAt: createdAt, Uptime: time.Since(createdAt), LastContact: n.lastContact, @@ -446,13 +465,13 @@ func (n *node) About() NodeAbout { Resources: NodeResources{ NCPU: n.resources.ncpu, CPU: n.resources.cpu, - CPULimit: 90 * n.resources.ncpu, + CPULimit: n.resources.cpuLimit, Mem: n.resources.mem, - MemLimit: uint64(float64(n.resources.memTotal) * 0.9), + MemLimit: n.resources.memLimit, }, } - return state + return nodeAbout } func (n *node) Version() NodeVersion { diff --git a/config/data.go b/config/data.go index 00bfaf1e..0dff3ff5 100644 --- a/config/data.go +++ b/config/data.go @@ -169,15 +169,15 @@ type Data struct { UIPath string `json:"ui_path"` } `json:"router"` Resources struct { - MaxCPUUsage float64 `json:"max_cpu_usage"` - MaxMemoryUsage float64 `json:"max_memory_usage"` + MaxCPUUsage float64 `json:"max_cpu_usage"` // percent 0-100 + MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100 } `json:"resources"` Cluster struct { Enable bool `json:"enable"` Bootstrap bool `json:"bootstrap"` Recover bool `json:"recover"` Debug bool `json:"debug"` - Address string `json:"address"` + Address string `json:"address"` // ip:port Peers []string `json:"peers"` } `json:"cluster"` } diff --git a/monitor/cpu.go b/monitor/cpu.go index 60b70ba9..1e89a874 100644 --- a/monitor/cpu.go +++ b/monitor/cpu.go @@ -11,13 +11,20 @@ type cpuCollector struct { userDescr *metric.Description idleDescr *metric.Description otherDescr *metric.Description + limitDescr *metric.Description - ncpu float64 + ncpu float64 + limit float64 } -func NewCPUCollector() metric.Collector { +func NewCPUCollector(limit float64) metric.Collector { c := &cpuCollector{ - ncpu: 1, + ncpu: 1, + limit: limit, + } + + if limit <= 0 || limit > 100 { + c.limit = 100 } c.ncpuDescr = metric.NewDesc("cpu_ncpu", "Number of logical CPUs in the system", nil) @@ -25,6 +32,7 @@ func NewCPUCollector() metric.Collector { c.userDescr = metric.NewDesc("cpu_user", "Percentage of CPU used for the user", nil) c.idleDescr = metric.NewDesc("cpu_idle", "Percentage of idle CPU", nil) c.otherDescr = metric.NewDesc("cpu_other", "Percentage of CPU used for other subsystems", nil) + c.limitDescr = metric.NewDesc("cpu_limit", "Percentage of CPU to be consumed", nil) if ncpu, err := psutil.CPUCounts(true); err == nil { c.ncpu = ncpu @@ -46,6 +54,7 @@ func (c *cpuCollector) Describe() []*metric.Description { c.userDescr, c.idleDescr, c.otherDescr, + c.limitDescr, } } @@ -53,6 +62,7 @@ func (c *cpuCollector) Collect() metric.Metrics { metrics := metric.NewMetrics() metrics.Add(metric.NewValue(c.ncpuDescr, c.ncpu)) + metrics.Add(metric.NewValue(c.limitDescr, c.limit)) stat, err := psutil.CPUPercent() if err != nil { diff --git a/monitor/mem.go b/monitor/mem.go index 04fb8465..1493130c 100644 --- a/monitor/mem.go +++ b/monitor/mem.go @@ -8,13 +8,23 @@ import ( type memCollector struct { totalDescr *metric.Description freeDescr *metric.Description + limitDescr *metric.Description + + limit float64 } -func NewMemCollector() metric.Collector { - c := &memCollector{} +func NewMemCollector(limit float64) metric.Collector { + c := &memCollector{ + limit: limit / 100, + } + + if limit <= 0 || limit > 1 { + c.limit = 1 + } c.totalDescr = metric.NewDesc("mem_total", "Total available memory in bytes", nil) c.freeDescr = metric.NewDesc("mem_free", "Free memory in bytes", nil) + c.limitDescr = metric.NewDesc("mem_limit", "Memory limit in bytes", nil) return c } @@ -27,6 +37,7 @@ func (c *memCollector) Describe() []*metric.Description { return []*metric.Description{ c.totalDescr, c.freeDescr, + c.limitDescr, } } @@ -40,6 +51,7 @@ func (c *memCollector) Collect() metric.Metrics { metrics.Add(metric.NewValue(c.totalDescr, float64(stat.Total))) metrics.Add(metric.NewValue(c.freeDescr, float64(stat.Available))) + metrics.Add(metric.NewValue(c.limitDescr, float64(stat.Total)*c.limit)) return metrics } diff --git a/prometheus/cpu.go b/prometheus/cpu.go index b2f37f19..896b0102 100644 --- a/prometheus/cpu.go +++ b/prometheus/cpu.go @@ -14,6 +14,7 @@ type cpuCollector struct { cpuUserTimeDesc *prometheus.Desc cpuIdleTimeDesc *prometheus.Desc cpuOtherTimeDesc *prometheus.Desc + cpuLimitDesc *prometheus.Desc } func NewCPUCollector(core string, c metric.Reader) prometheus.Collector { @@ -36,6 +37,10 @@ func NewCPUCollector(core string, c metric.Reader) prometheus.Collector { "cpu_other_time_percent", "CPU other time in percent", []string{"core"}, nil), + cpuLimitDesc: prometheus.NewDesc( + "cpu_limit_percent", + "Configured CPU limit in percent", + []string{"core"}, nil), } } @@ -44,6 +49,7 @@ func (c *cpuCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.cpuUserTimeDesc ch <- c.cpuIdleTimeDesc ch <- c.cpuOtherTimeDesc + ch <- c.cpuLimitDesc } func (c *cpuCollector) Collect(ch chan<- prometheus.Metric) { @@ -52,10 +58,12 @@ func (c *cpuCollector) Collect(ch chan<- prometheus.Metric) { metric.NewPattern("cpu_user"), metric.NewPattern("cpu_idle"), metric.NewPattern("cpu_other"), + metric.NewPattern("cpu_limit"), }) ch <- prometheus.MustNewConstMetric(c.cpuSystemTimeDesc, prometheus.GaugeValue, metrics.Value("cpu_system").Val(), c.core) ch <- prometheus.MustNewConstMetric(c.cpuUserTimeDesc, prometheus.GaugeValue, metrics.Value("cpu_user").Val(), c.core) ch <- prometheus.MustNewConstMetric(c.cpuIdleTimeDesc, prometheus.GaugeValue, metrics.Value("cpu_idle").Val(), c.core) ch <- prometheus.MustNewConstMetric(c.cpuOtherTimeDesc, prometheus.GaugeValue, metrics.Value("cpu_other").Val(), c.core) + ch <- prometheus.MustNewConstMetric(c.cpuLimitDesc, prometheus.GaugeValue, metrics.Value("cpu_limit").Val(), c.core) } diff --git a/prometheus/mem.go b/prometheus/mem.go index 736722c7..0f75c9f4 100644 --- a/prometheus/mem.go +++ b/prometheus/mem.go @@ -10,15 +10,16 @@ type memCollector struct { core string collector metric.Reader - memLimitDesc *prometheus.Desc + memTotalDesc *prometheus.Desc memFreeDesc *prometheus.Desc + memLimitDesc *prometheus.Desc } func NewMemCollector(core string, c metric.Reader) prometheus.Collector { return &memCollector{ core: core, collector: c, - memLimitDesc: prometheus.NewDesc( + memTotalDesc: prometheus.NewDesc( "mem_total_bytes", "Total available memory in bytes", []string{"core"}, nil), @@ -26,25 +27,27 @@ func NewMemCollector(core string, c metric.Reader) prometheus.Collector { "mem_free_bytes", "Free memory in bytes", []string{"core"}, nil), + memLimitDesc: prometheus.NewDesc( + "mem_limit_bytes", + "Configured memory limit in bytes", + []string{"core"}, nil), } } func (c *memCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.memLimitDesc ch <- c.memFreeDesc + ch <- c.memLimitDesc } func (c *memCollector) Collect(ch chan<- prometheus.Metric) { metrics := c.collector.Collect([]metric.Pattern{ metric.NewPattern("mem_total"), metric.NewPattern("mem_free"), + metric.NewPattern("mem_limit"), }) - for _, m := range metrics.Values("mem_total") { - ch <- prometheus.MustNewConstMetric(c.memLimitDesc, prometheus.GaugeValue, m.Val(), c.core) - } - - for _, m := range metrics.Values("mem_free") { - ch <- prometheus.MustNewConstMetric(c.memFreeDesc, prometheus.GaugeValue, m.Val(), c.core) - } + ch <- prometheus.MustNewConstMetric(c.memTotalDesc, prometheus.GaugeValue, metrics.Value("mem_total").Val(), c.core) + ch <- prometheus.MustNewConstMetric(c.memFreeDesc, prometheus.GaugeValue, metrics.Value("mem_free").Val(), c.core) + ch <- prometheus.MustNewConstMetric(c.memLimitDesc, prometheus.GaugeValue, metrics.Value("mem_limit").Val(), c.core) }