From fc03bf73a24d2216841a7e06bdfc3fa5e8e65e5f Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 6 Jun 2023 21:28:08 +0200 Subject: [PATCH] Make resource manager a main module and expose more details --- app/api/api.go | 39 +++++++--- cluster/proxy/node.go | 35 ++++++--- monitor/cpu.go | 43 ++++++----- monitor/mem.go | 33 ++++++--- .../resources => resources}/resources.go | 72 ++++++++++--------- .../resources => resources}/resources_test.go | 34 +++++++-- restream/restream.go | 69 +++++++++--------- 7 files changed, 202 insertions(+), 123 deletions(-) rename {restream/resources => resources}/resources.go (85%) rename {restream/resources => resources}/resources_test.go (84%) diff --git a/app/api/api.go b/app/api/api.go index 3e3948d9..7613c33f 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -36,6 +36,7 @@ import ( "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/restream" restreamapp "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" @@ -92,6 +93,7 @@ type api struct { sidecarserver *gohttp.Server update update.Checker replacer replace.Replacer + resources resources.Resources cluster cluster.Cluster iam iam.IAM @@ -297,6 +299,16 @@ func (a *api) start() error { a.lock.Lock() defer a.lock.Unlock() + if a.errorChan == nil { + a.errorChan = make(chan error, 1) + } + + if a.state == "running" { + return fmt.Errorf("already running") + } + + a.state = "starting" + cfg := a.config.store.GetActive() if cfg.Debug.AutoMaxProcs { @@ -311,15 +323,18 @@ func (a *api) start() error { a.undoMaxprocs = undoMaxprocs } - if a.errorChan == nil { - a.errorChan = make(chan error, 1) + resources, err := resources.New(resources.Config{ + MaxCPU: cfg.Resources.MaxCPUUsage, + MaxMemory: cfg.Resources.MaxMemoryUsage, + Logger: a.log.logger.core.WithComponent("Resources"), + }) + if err != nil { + return fmt.Errorf("failed to initialize resource manager: %w", err) } - if a.state == "running" { - return fmt.Errorf("already running") - } + resources.Start() - a.state = "starting" + a.resources = resources if cfg.Sessions.Enable { sessionConfig := session.Config{ @@ -1005,8 +1020,7 @@ func (a *api) start() error { Rewrite: rw, FFmpeg: a.ffmpeg, MaxProcesses: cfg.FFmpeg.MaxProcesses, - MaxCPU: cfg.Resources.MaxCPUUsage, - MaxMemory: cfg.Resources.MaxMemoryUsage, + Resources: a.resources, IAM: a.iam, Logger: a.log.logger.core.WithComponent("Process"), }) @@ -1027,8 +1041,8 @@ func (a *api) start() error { } metrics.Register(monitor.NewUptimeCollector()) - metrics.Register(monitor.NewCPUCollector(cfg.Resources.MaxCPUUsage)) - metrics.Register(monitor.NewMemCollector(cfg.Resources.MaxMemoryUsage)) + metrics.Register(monitor.NewCPUCollector(a.resources)) + metrics.Register(monitor.NewMemCollector(a.resources)) metrics.Register(monitor.NewNetCollector()) metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base"))) metrics.Register(monitor.NewFilesystemCollector("diskfs", a.diskfs)) @@ -1800,6 +1814,11 @@ func (a *api) stop() { a.sidecarserver = nil } + // Stop resource observer + if a.resources != nil { + a.resources.Stop() + } + // Stop the GC ticker if a.gcTickerStop != nil { a.gcTickerStop() diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 1626f7e9..8ff5a94b 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -104,11 +104,12 @@ type node struct { lastContact time.Time resources struct { - ncpu float64 - cpu float64 - cpuLimit float64 - mem uint64 - memLimit uint64 + throttling bool + ncpu float64 + cpu float64 + cpuLimit float64 + mem uint64 + memLimit uint64 } state nodeState @@ -271,14 +272,17 @@ func (n *node) Connect() error { {Name: "cpu_ncpu"}, {Name: "cpu_idle"}, {Name: "cpu_limit"}, + {Name: "cpu_throttling"}, {Name: "mem_total"}, {Name: "mem_free"}, {Name: "mem_limit"}, + {Name: "mem_throttling"}, }, }) if err != nil { n.stateLock.Lock() + n.resources.throttling = true n.resources.cpu = 100 n.resources.ncpu = 1 n.resources.cpuLimit = 0 @@ -296,6 +300,7 @@ func (n *node) Connect() error { mem_total := uint64(0) mem_free := uint64(0) mem_limit := uint64(0) + throttling := .0 for _, x := range metrics.Metrics { if x.Name == "cpu_idle" { @@ -304,16 +309,25 @@ func (n *node) Connect() error { cpu_ncpu = x.Values[0].Value } else if x.Name == "cpu_limit" { cpu_limit = x.Values[0].Value + } else if x.Name == "cpu_throttling" { + throttling += x.Values[0].Value } else if x.Name == "mem_total" { mem_total = uint64(x.Values[0].Value) } else if x.Name == "mem_free" { mem_free = uint64(x.Values[0].Value) } else if x.Name == "mem_limit" { mem_limit = uint64(x.Values[0].Value) + } else if x.Name == "mem_throttling" { + throttling += x.Values[0].Value } } n.stateLock.Lock() + if throttling > 0 { + n.resources.throttling = true + } else { + n.resources.throttling = false + } n.resources.ncpu = cpu_ncpu n.resources.cpu = (100 - cpu_idle) * cpu_ncpu n.resources.cpuLimit = cpu_limit * cpu_ncpu @@ -463,11 +477,12 @@ func (n *node) About() NodeAbout { LastContact: n.lastContact, Latency: time.Duration(n.latency * float64(time.Second)), Resources: NodeResources{ - NCPU: n.resources.ncpu, - CPU: n.resources.cpu, - CPULimit: n.resources.cpuLimit, - Mem: n.resources.mem, - MemLimit: n.resources.memLimit, + IsThrottling: n.resources.throttling, + NCPU: n.resources.ncpu, + CPU: n.resources.cpu, + CPULimit: n.resources.cpuLimit, + Mem: n.resources.mem, + MemLimit: n.resources.memLimit, }, } diff --git a/monitor/cpu.go b/monitor/cpu.go index 1e89a874..83869653 100644 --- a/monitor/cpu.go +++ b/monitor/cpu.go @@ -3,28 +3,26 @@ package monitor import ( "github.com/datarhei/core/v16/monitor/metric" "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources" ) type cpuCollector struct { - ncpuDescr *metric.Description - systemDescr *metric.Description - userDescr *metric.Description - idleDescr *metric.Description - otherDescr *metric.Description - limitDescr *metric.Description + ncpuDescr *metric.Description + systemDescr *metric.Description + userDescr *metric.Description + idleDescr *metric.Description + otherDescr *metric.Description + limitDescr *metric.Description + throttleDescr *metric.Description - ncpu float64 - limit float64 + ncpu float64 + resources resources.Resources } -func NewCPUCollector(limit float64) metric.Collector { +func NewCPUCollector(rsc resources.Resources) metric.Collector { c := &cpuCollector{ - ncpu: 1, - limit: limit, - } - - if limit <= 0 || limit > 100 { - c.limit = 100 + ncpu: 1, + resources: rsc, } c.ncpuDescr = metric.NewDesc("cpu_ncpu", "Number of logical CPUs in the system", nil) @@ -33,6 +31,7 @@ func NewCPUCollector(limit float64) metric.Collector { c.idleDescr = metric.NewDesc("cpu_idle", "Percentage of idle CPU", nil) c.otherDescr = metric.NewDesc("cpu_other", "Percentage of CPU used for other subsystems", nil) c.limitDescr = metric.NewDesc("cpu_limit", "Percentage of CPU to be consumed", nil) + c.throttleDescr = metric.NewDesc("cpu_throttling", "Whether the CPU is currently throttled", nil) if ncpu, err := psutil.CPUCounts(true); err == nil { c.ncpu = ncpu @@ -55,6 +54,7 @@ func (c *cpuCollector) Describe() []*metric.Description { c.idleDescr, c.otherDescr, c.limitDescr, + c.throttleDescr, } } @@ -62,7 +62,18 @@ func (c *cpuCollector) Collect() metric.Metrics { metrics := metric.NewMetrics() metrics.Add(metric.NewValue(c.ncpuDescr, c.ncpu)) - metrics.Add(metric.NewValue(c.limitDescr, c.limit)) + + limit, _ := c.resources.Limits() + + metrics.Add(metric.NewValue(c.limitDescr, limit)) + + cpu, _ := c.resources.ShouldLimit() + throttling := .0 + if cpu { + throttling = 1 + } + + metrics.Add(metric.NewValue(c.throttleDescr, throttling)) stat, err := psutil.CPUPercent() if err != nil { diff --git a/monitor/mem.go b/monitor/mem.go index 1493130c..10a66f7f 100644 --- a/monitor/mem.go +++ b/monitor/mem.go @@ -3,28 +3,27 @@ package monitor import ( "github.com/datarhei/core/v16/monitor/metric" "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources" ) type memCollector struct { - totalDescr *metric.Description - freeDescr *metric.Description - limitDescr *metric.Description + totalDescr *metric.Description + freeDescr *metric.Description + limitDescr *metric.Description + throttleDescr *metric.Description - limit float64 + resources resources.Resources } -func NewMemCollector(limit float64) metric.Collector { +func NewMemCollector(rsc resources.Resources) metric.Collector { c := &memCollector{ - limit: limit / 100, - } - - if limit <= 0 || limit > 1 { - c.limit = 1 + resources: rsc, } c.totalDescr = metric.NewDesc("mem_total", "Total available memory in bytes", nil) c.freeDescr = metric.NewDesc("mem_free", "Free memory in bytes", nil) c.limitDescr = metric.NewDesc("mem_limit", "Memory limit in bytes", nil) + c.throttleDescr = metric.NewDesc("mem_throttling", "Whether the memory is currently throttled", nil) return c } @@ -38,12 +37,25 @@ func (c *memCollector) Describe() []*metric.Description { c.totalDescr, c.freeDescr, c.limitDescr, + c.throttleDescr, } } func (c *memCollector) Collect() metric.Metrics { metrics := metric.NewMetrics() + _, limit := c.resources.Limits() + + metrics.Add(metric.NewValue(c.limitDescr, float64(limit))) + + _, memory := c.resources.ShouldLimit() + throttling := .0 + if memory { + throttling = 1 + } + + metrics.Add(metric.NewValue(c.throttleDescr, throttling)) + stat, err := psutil.VirtualMemory() if err != nil { return metrics @@ -51,7 +63,6 @@ func (c *memCollector) Collect() metric.Metrics { metrics.Add(metric.NewValue(c.totalDescr, float64(stat.Total))) metrics.Add(metric.NewValue(c.freeDescr, float64(stat.Available))) - metrics.Add(metric.NewValue(c.limitDescr, float64(stat.Total)*c.limit)) return metrics } diff --git a/restream/resources/resources.go b/resources/resources.go similarity index 85% rename from restream/resources/resources.go rename to resources/resources.go index 85489f34..28ad14be 100644 --- a/restream/resources/resources.go +++ b/resources/resources.go @@ -14,18 +14,16 @@ type resources struct { psutil psutil.Util ncpu float64 - maxCPU float64 - maxMemory uint64 + maxCPU float64 // percent 0-100*ncpu + maxMemory uint64 // bytes - limitCPUCh chan bool - isCPULimiting bool - - limitMemoryCh chan bool + isUnlimited bool + isCPULimiting bool isMemoryLimiting bool cancelObserver context.CancelFunc - lock sync.Mutex + lock sync.RWMutex startOnce sync.Once stopOnce sync.Once @@ -36,8 +34,14 @@ type Resources interface { Start() Stop() - LimitCPU() <-chan bool - LimitMemory() <-chan bool + // HasLimits returns whether any limits have been set + HasLimits() bool + + // Limits returns the CPU (percent 0-100) and memory (bytes) limits + Limits() (float64, uint64) + + // ShouldLimit returns whether cpu and/or memory is currently limited + ShouldLimit() (bool, bool) Request(cpu float64, memory uint64) error } @@ -50,8 +54,12 @@ type Config struct { } func New(config Config) (Resources, error) { - if config.MaxCPU <= 0 || config.MaxMemory <= 0 { - return nil, fmt.Errorf("both MaxCPU and MaxMemory have to be set") + if config.MaxCPU <= 0 { + config.MaxCPU = 100 + } + + if config.MaxMemory <= 0 { + config.MaxMemory = 100 } if config.MaxCPU > 100 || config.MaxMemory > 100 { @@ -64,6 +72,10 @@ func New(config Config) (Resources, error) { logger: config.Logger, } + if config.MaxCPU == 1000 && config.MaxMemory == 1000 { + r.isUnlimited = true + } + if r.logger == nil { r.logger = log.New("") } @@ -87,9 +99,6 @@ func New(config Config) (Resources, error) { r.maxCPU *= r.ncpu r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100) - r.limitCPUCh = make(chan bool, 10) - r.limitMemoryCh = make(chan bool, 10) - r.logger = r.logger.WithFields(log.Fields{ "ncpu": r.ncpu, "max_cpu": r.maxCPU, @@ -126,14 +135,6 @@ func (r *resources) Stop() { }) } -func (r *resources) LimitCPU() <-chan bool { - return r.limitCPUCh -} - -func (r *resources) LimitMemory() <-chan bool { - return r.limitMemoryCh -} - func (r *resources) observe(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -201,10 +202,6 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { }).Log("Limiting CPU") r.isCPULimiting = doCPULimit - select { - case r.limitCPUCh <- doCPULimit: - default: - } } if r.isMemoryLimiting != doMemoryLimit { @@ -213,19 +210,30 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { }).Log("Limiting memory") r.isMemoryLimiting = doMemoryLimit - select { - case r.limitMemoryCh <- doMemoryLimit: - default: - } } r.lock.Unlock() } } } +func (r *resources) HasLimits() bool { + return r.isUnlimited +} + +func (r *resources) Limits() (float64, uint64) { + return r.maxCPU / r.ncpu, r.maxMemory +} + +func (r *resources) ShouldLimit() (bool, bool) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.isCPULimiting, r.isMemoryLimiting +} + func (r *resources) Request(cpu float64, memory uint64) error { - r.lock.Lock() - defer r.lock.Unlock() + r.lock.RLock() + defer r.lock.RUnlock() logger := r.logger.WithFields(log.Fields{ "req_cpu": cpu, diff --git a/restream/resources/resources_test.go b/resources/resources_test.go similarity index 84% rename from restream/resources/resources_test.go rename to resources/resources_test.go index c5c0883a..91132c4d 100644 --- a/restream/resources/resources_test.go +++ b/resources/resources_test.go @@ -6,10 +6,10 @@ import ( "time" "github.com/datarhei/core/v16/psutil" - "github.com/stretchr/testify/require" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/net" + "github.com/stretchr/testify/require" ) type util struct{} @@ -72,9 +72,19 @@ func TestMemoryLimit(t *testing.T) { timer := time.NewTimer(10 * time.Second) defer timer.Stop() - select { - case limit = <-r.LimitMemory(): - case <-timer.C: + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + _, limit = r.ShouldLimit() + if limit { + return + } + case <-timer.C: + return + } } }() @@ -109,9 +119,19 @@ func TestCPULimit(t *testing.T) { timer := time.NewTimer(10 * time.Second) defer timer.Stop() - select { - case limit = <-r.LimitCPU(): - case <-timer.C: + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + limit, _ = r.ShouldLimit() + if limit { + return + } + case <-timer.C: + return + } } }() diff --git a/restream/restream.go b/restream/restream.go index 085eb965..9d70204e 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -23,10 +23,10 @@ import ( "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net/url" "github.com/datarhei/core/v16/process" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/restream/app" rfs "github.com/datarhei/core/v16/restream/fs" "github.com/datarhei/core/v16/restream/replace" - "github.com/datarhei/core/v16/restream/resources" "github.com/datarhei/core/v16/restream/rewrite" "github.com/datarhei/core/v16/restream/store" jsonstore "github.com/datarhei/core/v16/restream/store/json" @@ -76,8 +76,7 @@ type Config struct { Rewrite rewrite.Rewriter FFmpeg ffmpeg.FFmpeg MaxProcesses int64 - MaxCPU float64 // percent 0-100 - MaxMemory float64 // percent 0-100 + Resources resources.Resources Logger log.Logger IAM iam.IAM } @@ -195,22 +194,13 @@ func New(config Config) (Restreamer, error) { } r.maxProc = config.MaxProcesses - - if config.MaxCPU > 0 || config.MaxMemory > 0 { - resources, err := resources.New(resources.Config{ - MaxCPU: config.MaxCPU, - MaxMemory: config.MaxMemory, - Logger: r.logger.WithComponent("Resources"), - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize resource manager: %w", err) - } - r.resources = resources - r.enableSoftLimit = true - - r.logger.Debug().Log("Enabling resource manager") + r.resources = config.Resources + if r.resources == nil { + return nil, fmt.Errorf("a resource manager must be provided") } + r.enableSoftLimit = r.resources.HasLimits() + if err := r.load(); err != nil { return nil, fmt.Errorf("failed to load data from DB: %w", err) } @@ -231,7 +221,7 @@ func (r *restream) Start() { r.cancelObserver = cancel if r.enableSoftLimit { - go r.resourceObserver(ctx, r.resources) + go r.resourceObserver(ctx, r.resources, time.Second) } for id, t := range r.tasks { @@ -321,9 +311,9 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int } } -func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources) { - rsc.Start() - defer rsc.Stop() +func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() limitCPU, limitMemory := false, false @@ -331,34 +321,39 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources select { case <-ctx.Done(): return - case limitCPU = <-rsc.LimitCPU(): - r.lock.Lock() - for id, t := range r.tasks { - if !t.valid { - continue - } + case <-ticker.C: + cpu, memory := rsc.ShouldLimit() - r.logger.Debug().WithFields(log.Fields{ - "limit_cpu": limitCPU, - "id": id, - }).Log("Limiting process CPU consumption") - t.ffmpeg.Limit(limitCPU, limitMemory) + hasChanges := false + + if cpu != limitCPU { + limitCPU = cpu + hasChanges = true } - r.lock.Unlock() - case limitMemory = <-rsc.LimitMemory(): - r.lock.Lock() + + if memory != limitMemory { + limitMemory = memory + hasChanges = true + } + + if !hasChanges { + break + } + + r.lock.RLock() for id, t := range r.tasks { if !t.valid { continue } r.logger.Debug().WithFields(log.Fields{ + "limit_cpu": limitCPU, "limit_memory": limitMemory, "id": id, - }).Log("Limiting process memory consumption") + }).Log("Limiting process CPU and memory consumption") t.ffmpeg.Limit(limitCPU, limitMemory) } - r.lock.Unlock() + r.lock.RUnlock() } } }