Make resource manager a main module and expose more details

This commit is contained in:
Ingo Oppermann
2023-06-06 21:28:08 +02:00
parent 3ac7ead20d
commit fc03bf73a2
7 changed files with 202 additions and 123 deletions

View File

@@ -36,6 +36,7 @@ import (
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/prometheus"
"github.com/datarhei/core/v16/psutil"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/restream"
restreamapp "github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/replace"
@@ -92,6 +93,7 @@ type api struct {
sidecarserver *gohttp.Server
update update.Checker
replacer replace.Replacer
resources resources.Resources
cluster cluster.Cluster
iam iam.IAM
@@ -297,6 +299,16 @@ func (a *api) start() error {
a.lock.Lock()
defer a.lock.Unlock()
if a.errorChan == nil {
a.errorChan = make(chan error, 1)
}
if a.state == "running" {
return fmt.Errorf("already running")
}
a.state = "starting"
cfg := a.config.store.GetActive()
if cfg.Debug.AutoMaxProcs {
@@ -311,15 +323,18 @@ func (a *api) start() error {
a.undoMaxprocs = undoMaxprocs
}
if a.errorChan == nil {
a.errorChan = make(chan error, 1)
resources, err := resources.New(resources.Config{
MaxCPU: cfg.Resources.MaxCPUUsage,
MaxMemory: cfg.Resources.MaxMemoryUsage,
Logger: a.log.logger.core.WithComponent("Resources"),
})
if err != nil {
return fmt.Errorf("failed to initialize resource manager: %w", err)
}
if a.state == "running" {
return fmt.Errorf("already running")
}
resources.Start()
a.state = "starting"
a.resources = resources
if cfg.Sessions.Enable {
sessionConfig := session.Config{
@@ -1005,8 +1020,7 @@ func (a *api) start() error {
Rewrite: rw,
FFmpeg: a.ffmpeg,
MaxProcesses: cfg.FFmpeg.MaxProcesses,
MaxCPU: cfg.Resources.MaxCPUUsage,
MaxMemory: cfg.Resources.MaxMemoryUsage,
Resources: a.resources,
IAM: a.iam,
Logger: a.log.logger.core.WithComponent("Process"),
})
@@ -1027,8 +1041,8 @@ func (a *api) start() error {
}
metrics.Register(monitor.NewUptimeCollector())
metrics.Register(monitor.NewCPUCollector(cfg.Resources.MaxCPUUsage))
metrics.Register(monitor.NewMemCollector(cfg.Resources.MaxMemoryUsage))
metrics.Register(monitor.NewCPUCollector(a.resources))
metrics.Register(monitor.NewMemCollector(a.resources))
metrics.Register(monitor.NewNetCollector())
metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base")))
metrics.Register(monitor.NewFilesystemCollector("diskfs", a.diskfs))
@@ -1800,6 +1814,11 @@ func (a *api) stop() {
a.sidecarserver = nil
}
// Stop resource observer
if a.resources != nil {
a.resources.Stop()
}
// Stop the GC ticker
if a.gcTickerStop != nil {
a.gcTickerStop()

View File

@@ -104,6 +104,7 @@ type node struct {
lastContact time.Time
resources struct {
throttling bool
ncpu float64
cpu float64
cpuLimit float64
@@ -271,14 +272,17 @@ func (n *node) Connect() error {
{Name: "cpu_ncpu"},
{Name: "cpu_idle"},
{Name: "cpu_limit"},
{Name: "cpu_throttling"},
{Name: "mem_total"},
{Name: "mem_free"},
{Name: "mem_limit"},
{Name: "mem_throttling"},
},
})
if err != nil {
n.stateLock.Lock()
n.resources.throttling = true
n.resources.cpu = 100
n.resources.ncpu = 1
n.resources.cpuLimit = 0
@@ -296,6 +300,7 @@ func (n *node) Connect() error {
mem_total := uint64(0)
mem_free := uint64(0)
mem_limit := uint64(0)
throttling := .0
for _, x := range metrics.Metrics {
if x.Name == "cpu_idle" {
@@ -304,16 +309,25 @@ func (n *node) Connect() error {
cpu_ncpu = x.Values[0].Value
} else if x.Name == "cpu_limit" {
cpu_limit = x.Values[0].Value
} else if x.Name == "cpu_throttling" {
throttling += x.Values[0].Value
} else if x.Name == "mem_total" {
mem_total = uint64(x.Values[0].Value)
} else if x.Name == "mem_free" {
mem_free = uint64(x.Values[0].Value)
} else if x.Name == "mem_limit" {
mem_limit = uint64(x.Values[0].Value)
} else if x.Name == "mem_throttling" {
throttling += x.Values[0].Value
}
}
n.stateLock.Lock()
if throttling > 0 {
n.resources.throttling = true
} else {
n.resources.throttling = false
}
n.resources.ncpu = cpu_ncpu
n.resources.cpu = (100 - cpu_idle) * cpu_ncpu
n.resources.cpuLimit = cpu_limit * cpu_ncpu
@@ -463,6 +477,7 @@ func (n *node) About() NodeAbout {
LastContact: n.lastContact,
Latency: time.Duration(n.latency * float64(time.Second)),
Resources: NodeResources{
IsThrottling: n.resources.throttling,
NCPU: n.resources.ncpu,
CPU: n.resources.cpu,
CPULimit: n.resources.cpuLimit,

View File

@@ -3,6 +3,7 @@ package monitor
import (
"github.com/datarhei/core/v16/monitor/metric"
"github.com/datarhei/core/v16/psutil"
"github.com/datarhei/core/v16/resources"
)
type cpuCollector struct {
@@ -12,19 +13,16 @@ type cpuCollector struct {
idleDescr *metric.Description
otherDescr *metric.Description
limitDescr *metric.Description
throttleDescr *metric.Description
ncpu float64
limit float64
resources resources.Resources
}
func NewCPUCollector(limit float64) metric.Collector {
func NewCPUCollector(rsc resources.Resources) metric.Collector {
c := &cpuCollector{
ncpu: 1,
limit: limit,
}
if limit <= 0 || limit > 100 {
c.limit = 100
resources: rsc,
}
c.ncpuDescr = metric.NewDesc("cpu_ncpu", "Number of logical CPUs in the system", nil)
@@ -33,6 +31,7 @@ func NewCPUCollector(limit float64) metric.Collector {
c.idleDescr = metric.NewDesc("cpu_idle", "Percentage of idle CPU", nil)
c.otherDescr = metric.NewDesc("cpu_other", "Percentage of CPU used for other subsystems", nil)
c.limitDescr = metric.NewDesc("cpu_limit", "Percentage of CPU to be consumed", nil)
c.throttleDescr = metric.NewDesc("cpu_throttling", "Whether the CPU is currently throttled", nil)
if ncpu, err := psutil.CPUCounts(true); err == nil {
c.ncpu = ncpu
@@ -55,6 +54,7 @@ func (c *cpuCollector) Describe() []*metric.Description {
c.idleDescr,
c.otherDescr,
c.limitDescr,
c.throttleDescr,
}
}
@@ -62,7 +62,18 @@ func (c *cpuCollector) Collect() metric.Metrics {
metrics := metric.NewMetrics()
metrics.Add(metric.NewValue(c.ncpuDescr, c.ncpu))
metrics.Add(metric.NewValue(c.limitDescr, c.limit))
limit, _ := c.resources.Limits()
metrics.Add(metric.NewValue(c.limitDescr, limit))
cpu, _ := c.resources.ShouldLimit()
throttling := .0
if cpu {
throttling = 1
}
metrics.Add(metric.NewValue(c.throttleDescr, throttling))
stat, err := psutil.CPUPercent()
if err != nil {

View File

@@ -3,28 +3,27 @@ package monitor
import (
"github.com/datarhei/core/v16/monitor/metric"
"github.com/datarhei/core/v16/psutil"
"github.com/datarhei/core/v16/resources"
)
type memCollector struct {
totalDescr *metric.Description
freeDescr *metric.Description
limitDescr *metric.Description
throttleDescr *metric.Description
limit float64
resources resources.Resources
}
func NewMemCollector(limit float64) metric.Collector {
func NewMemCollector(rsc resources.Resources) metric.Collector {
c := &memCollector{
limit: limit / 100,
}
if limit <= 0 || limit > 1 {
c.limit = 1
resources: rsc,
}
c.totalDescr = metric.NewDesc("mem_total", "Total available memory in bytes", nil)
c.freeDescr = metric.NewDesc("mem_free", "Free memory in bytes", nil)
c.limitDescr = metric.NewDesc("mem_limit", "Memory limit in bytes", nil)
c.throttleDescr = metric.NewDesc("mem_throttling", "Whether the memory is currently throttled", nil)
return c
}
@@ -38,12 +37,25 @@ func (c *memCollector) Describe() []*metric.Description {
c.totalDescr,
c.freeDescr,
c.limitDescr,
c.throttleDescr,
}
}
func (c *memCollector) Collect() metric.Metrics {
metrics := metric.NewMetrics()
_, limit := c.resources.Limits()
metrics.Add(metric.NewValue(c.limitDescr, float64(limit)))
_, memory := c.resources.ShouldLimit()
throttling := .0
if memory {
throttling = 1
}
metrics.Add(metric.NewValue(c.throttleDescr, throttling))
stat, err := psutil.VirtualMemory()
if err != nil {
return metrics
@@ -51,7 +63,6 @@ func (c *memCollector) Collect() metric.Metrics {
metrics.Add(metric.NewValue(c.totalDescr, float64(stat.Total)))
metrics.Add(metric.NewValue(c.freeDescr, float64(stat.Available)))
metrics.Add(metric.NewValue(c.limitDescr, float64(stat.Total)*c.limit))
return metrics
}

View File

@@ -14,18 +14,16 @@ type resources struct {
psutil psutil.Util
ncpu float64
maxCPU float64
maxMemory uint64
maxCPU float64 // percent 0-100*ncpu
maxMemory uint64 // bytes
limitCPUCh chan bool
isUnlimited bool
isCPULimiting bool
limitMemoryCh chan bool
isMemoryLimiting bool
cancelObserver context.CancelFunc
lock sync.Mutex
lock sync.RWMutex
startOnce sync.Once
stopOnce sync.Once
@@ -36,8 +34,14 @@ type Resources interface {
Start()
Stop()
LimitCPU() <-chan bool
LimitMemory() <-chan bool
// HasLimits returns whether any limits have been set
HasLimits() bool
// Limits returns the CPU (percent 0-100) and memory (bytes) limits
Limits() (float64, uint64)
// ShouldLimit returns whether cpu and/or memory is currently limited
ShouldLimit() (bool, bool)
Request(cpu float64, memory uint64) error
}
@@ -50,8 +54,12 @@ type Config struct {
}
func New(config Config) (Resources, error) {
if config.MaxCPU <= 0 || config.MaxMemory <= 0 {
return nil, fmt.Errorf("both MaxCPU and MaxMemory have to be set")
if config.MaxCPU <= 0 {
config.MaxCPU = 100
}
if config.MaxMemory <= 0 {
config.MaxMemory = 100
}
if config.MaxCPU > 100 || config.MaxMemory > 100 {
@@ -64,6 +72,10 @@ func New(config Config) (Resources, error) {
logger: config.Logger,
}
if config.MaxCPU == 1000 && config.MaxMemory == 1000 {
r.isUnlimited = true
}
if r.logger == nil {
r.logger = log.New("")
}
@@ -87,9 +99,6 @@ func New(config Config) (Resources, error) {
r.maxCPU *= r.ncpu
r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100)
r.limitCPUCh = make(chan bool, 10)
r.limitMemoryCh = make(chan bool, 10)
r.logger = r.logger.WithFields(log.Fields{
"ncpu": r.ncpu,
"max_cpu": r.maxCPU,
@@ -126,14 +135,6 @@ func (r *resources) Stop() {
})
}
func (r *resources) LimitCPU() <-chan bool {
return r.limitCPUCh
}
func (r *resources) LimitMemory() <-chan bool {
return r.limitMemoryCh
}
func (r *resources) observe(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -201,10 +202,6 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) {
}).Log("Limiting CPU")
r.isCPULimiting = doCPULimit
select {
case r.limitCPUCh <- doCPULimit:
default:
}
}
if r.isMemoryLimiting != doMemoryLimit {
@@ -213,19 +210,30 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) {
}).Log("Limiting memory")
r.isMemoryLimiting = doMemoryLimit
select {
case r.limitMemoryCh <- doMemoryLimit:
default:
}
}
r.lock.Unlock()
}
}
}
func (r *resources) HasLimits() bool {
return r.isUnlimited
}
func (r *resources) Limits() (float64, uint64) {
return r.maxCPU / r.ncpu, r.maxMemory
}
func (r *resources) ShouldLimit() (bool, bool) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.isCPULimiting, r.isMemoryLimiting
}
func (r *resources) Request(cpu float64, memory uint64) error {
r.lock.Lock()
defer r.lock.Unlock()
r.lock.RLock()
defer r.lock.RUnlock()
logger := r.logger.WithFields(log.Fields{
"req_cpu": cpu,

View File

@@ -6,10 +6,10 @@ import (
"time"
"github.com/datarhei/core/v16/psutil"
"github.com/stretchr/testify/require"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/net"
"github.com/stretchr/testify/require"
)
type util struct{}
@@ -72,9 +72,19 @@ func TestMemoryLimit(t *testing.T) {
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case limit = <-r.LimitMemory():
case <-ticker.C:
_, limit = r.ShouldLimit()
if limit {
return
}
case <-timer.C:
return
}
}
}()
@@ -109,9 +119,19 @@ func TestCPULimit(t *testing.T) {
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case limit = <-r.LimitCPU():
case <-ticker.C:
limit, _ = r.ShouldLimit()
if limit {
return
}
case <-timer.C:
return
}
}
}()

View File

@@ -23,10 +23,10 @@ import (
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/net/url"
"github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/restream/app"
rfs "github.com/datarhei/core/v16/restream/fs"
"github.com/datarhei/core/v16/restream/replace"
"github.com/datarhei/core/v16/restream/resources"
"github.com/datarhei/core/v16/restream/rewrite"
"github.com/datarhei/core/v16/restream/store"
jsonstore "github.com/datarhei/core/v16/restream/store/json"
@@ -76,8 +76,7 @@ type Config struct {
Rewrite rewrite.Rewriter
FFmpeg ffmpeg.FFmpeg
MaxProcesses int64
MaxCPU float64 // percent 0-100
MaxMemory float64 // percent 0-100
Resources resources.Resources
Logger log.Logger
IAM iam.IAM
}
@@ -195,21 +194,12 @@ func New(config Config) (Restreamer, error) {
}
r.maxProc = config.MaxProcesses
if config.MaxCPU > 0 || config.MaxMemory > 0 {
resources, err := resources.New(resources.Config{
MaxCPU: config.MaxCPU,
MaxMemory: config.MaxMemory,
Logger: r.logger.WithComponent("Resources"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize resource manager: %w", err)
r.resources = config.Resources
if r.resources == nil {
return nil, fmt.Errorf("a resource manager must be provided")
}
r.resources = resources
r.enableSoftLimit = true
r.logger.Debug().Log("Enabling resource manager")
}
r.enableSoftLimit = r.resources.HasLimits()
if err := r.load(); err != nil {
return nil, fmt.Errorf("failed to load data from DB: %w", err)
@@ -231,7 +221,7 @@ func (r *restream) Start() {
r.cancelObserver = cancel
if r.enableSoftLimit {
go r.resourceObserver(ctx, r.resources)
go r.resourceObserver(ctx, r.resources, time.Second)
}
for id, t := range r.tasks {
@@ -321,9 +311,9 @@ func (r *restream) filesystemObserver(ctx context.Context, fs fs.Filesystem, int
}
}
func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources) {
rsc.Start()
defer rsc.Stop()
func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
limitCPU, limitMemory := false, false
@@ -331,8 +321,26 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
select {
case <-ctx.Done():
return
case limitCPU = <-rsc.LimitCPU():
r.lock.Lock()
case <-ticker.C:
cpu, memory := rsc.ShouldLimit()
hasChanges := false
if cpu != limitCPU {
limitCPU = cpu
hasChanges = true
}
if memory != limitMemory {
limitMemory = memory
hasChanges = true
}
if !hasChanges {
break
}
r.lock.RLock()
for id, t := range r.tasks {
if !t.valid {
continue
@@ -340,25 +348,12 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
r.logger.Debug().WithFields(log.Fields{
"limit_cpu": limitCPU,
"id": id,
}).Log("Limiting process CPU consumption")
t.ffmpeg.Limit(limitCPU, limitMemory)
}
r.lock.Unlock()
case limitMemory = <-rsc.LimitMemory():
r.lock.Lock()
for id, t := range r.tasks {
if !t.valid {
continue
}
r.logger.Debug().WithFields(log.Fields{
"limit_memory": limitMemory,
"id": id,
}).Log("Limiting process memory consumption")
}).Log("Limiting process CPU and memory consumption")
t.ffmpeg.Limit(limitCPU, limitMemory)
}
r.lock.Unlock()
r.lock.RUnlock()
}
}
}