Add resource manager

This commit is contained in:
Ingo Oppermann
2023-04-27 17:02:20 +02:00
parent b723f4e5fd
commit 7527f7833b
8 changed files with 274 additions and 48 deletions

View File

@@ -664,6 +664,8 @@ func (a *api) start() error {
Replace: a.replacer, Replace: a.replacer,
FFmpeg: a.ffmpeg, FFmpeg: a.ffmpeg,
MaxProcesses: cfg.FFmpeg.MaxProcesses, MaxProcesses: cfg.FFmpeg.MaxProcesses,
MaxCPU: cfg.Resources.MaxCPUUsage,
MaxMemory: cfg.Resources.MaxMemoryUsage,
Logger: a.log.logger.core.WithComponent("Process"), Logger: a.log.logger.core.WithComponent("Process"),
}) })

View File

@@ -457,6 +457,17 @@ func (d *Config) Validate(resetLogs bool) {
d.vars.Log("error", "metrics.interval", "must be smaller than the range") d.vars.Log("error", "metrics.interval", "must be smaller than the range")
} }
} }
// If resource limits are given, all values must be set
if d.Resources.MaxCPUUsage > 0 || d.Resources.MaxMemoryUsage > 0 {
if d.Resources.MaxCPUUsage <= 0 {
d.vars.Log("error", "resources.max_cpu_usage", "must be greater than 0")
}
if d.Resources.MaxMemoryUsage <= 0 {
d.vars.Log("error", "resources.max_memory_usage", "must be greater than 0")
}
}
} }
// Merge merges the values of the known environment variables into the configuration // Merge merges the values of the known environment variables into the configuration

View File

@@ -132,6 +132,11 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
} }
} }
limitMode := process.LimitModeHard
if config.LimitMode == "soft" {
limitMode = process.LimitModeSoft
}
ffmpeg, err := process.New(process.Config{ ffmpeg, err := process.New(process.Config{
Binary: f.binary, Binary: f.binary,
Args: config.Args, Args: config.Args,
@@ -142,7 +147,7 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
LimitCPU: config.LimitCPU, LimitCPU: config.LimitCPU,
LimitMemory: config.LimitMemory, LimitMemory: config.LimitMemory,
LimitDuration: config.LimitDuration, LimitDuration: config.LimitDuration,
LimitMode: process.LimitModeHard, LimitMode: limitMode,
Scheduler: scheduler, Scheduler: scheduler,
Parser: config.Parser, Parser: config.Parser,
Logger: config.Logger, Logger: config.Logger,

View File

@@ -7,6 +7,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/psutil" "github.com/datarhei/core/v16/psutil"
) )
@@ -41,6 +42,7 @@ type LimiterConfig struct {
WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered
OnLimit LimitFunc // Function to be triggered if limits are exceeded OnLimit LimitFunc // Function to be triggered if limits are exceeded
Mode LimitMode // How to limit CPU usage Mode LimitMode // How to limit CPU usage
Logger log.Logger
} }
type Limiter interface { type Limiter interface {
@@ -68,11 +70,12 @@ type Limiter interface {
} }
type limiter struct { type limiter struct {
ncpu float64 ncpu float64
proc psutil.Process ncpuFactor float64
lock sync.Mutex proc psutil.Process
cancel context.CancelFunc lock sync.Mutex
onLimit LimitFunc cancel context.CancelFunc
onLimit LimitFunc
cpu float64 cpu float64
cpuCurrent float64 cpuCurrent float64
@@ -94,6 +97,8 @@ type limiter struct {
mode LimitMode mode LimitMode
enableLimit bool enableLimit bool
cancelLimit context.CancelFunc cancelLimit context.CancelFunc
logger log.Logger
} }
// NewLimiter returns a new Limiter // NewLimiter returns a new Limiter
@@ -104,6 +109,11 @@ func NewLimiter(config LimiterConfig) Limiter {
waitFor: config.WaitFor, waitFor: config.WaitFor,
onLimit: config.OnLimit, onLimit: config.OnLimit,
mode: config.Mode, mode: config.Mode,
logger: config.Logger,
}
if l.logger == nil {
l.logger = log.New("")
} }
if ncpu, err := psutil.CPUCounts(true); err != nil { if ncpu, err := psutil.CPUCounts(true); err != nil {
@@ -112,18 +122,31 @@ func NewLimiter(config LimiterConfig) Limiter {
l.ncpu = ncpu l.ncpu = ncpu
} }
l.ncpuFactor = 1
mode := "hard"
if l.mode == LimitModeSoft { if l.mode == LimitModeSoft {
mode = "soft"
l.cpu /= l.ncpu l.cpu /= l.ncpu
l.ncpuFactor = l.ncpu
} }
if l.onLimit == nil { if l.onLimit == nil {
l.onLimit = func(float64, uint64) {} l.onLimit = func(float64, uint64) {}
} }
l.logger = l.logger.WithFields(log.Fields{
"cpu": l.cpu * l.ncpuFactor,
"memory": l.memory,
"mode": mode,
})
return l return l
} }
func (l *limiter) reset() { func (l *limiter) reset() {
l.enableLimit = false
l.cpuCurrent = 0 l.cpuCurrent = 0
l.cpuLast = 0 l.cpuLast = 0
l.cpuAvg = 0 l.cpuAvg = 0
@@ -152,7 +175,7 @@ func (l *limiter) Start(process psutil.Process) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
l.cancel = cancel l.cancel = cancel
go l.ticker(ctx, 500*time.Millisecond) go l.ticker(ctx, 1000*time.Millisecond)
return nil return nil
} }
@@ -167,6 +190,13 @@ func (l *limiter) Stop() {
l.cancel() l.cancel()
if l.cancelLimit != nil {
l.cancelLimit()
l.cancelLimit = nil
}
l.enableLimit = false
l.proc.Stop() l.proc.Stop()
l.proc = nil l.proc = nil
@@ -232,6 +262,7 @@ func (l *limiter) collect(t time.Time) {
} }
if time.Since(l.cpuLimitSince) >= l.waitFor { if time.Since(l.cpuLimitSince) >= l.waitFor {
l.logger.Warn().Log("CPU limit exceeded")
isLimitExceeded = true isLimitExceeded = true
} }
} }
@@ -247,6 +278,7 @@ func (l *limiter) collect(t time.Time) {
} }
if time.Since(l.memoryLimitSince) >= l.waitFor { if time.Since(l.memoryLimitSince) >= l.waitFor {
l.logger.Warn().Log("Memory limit exceeded")
isLimitExceeded = true isLimitExceeded = true
} }
} }
@@ -255,17 +287,31 @@ func (l *limiter) collect(t time.Time) {
if l.memory > 0 { if l.memory > 0 {
if l.memoryCurrent > l.memory { if l.memoryCurrent > l.memory {
// Current value is higher than the limit // Current value is higher than the limit
l.logger.Warn().Log("Memory limit exceeded")
isLimitExceeded = true isLimitExceeded = true
} }
} }
} }
l.logger.Debug().WithFields(log.Fields{
"cur_cpu": l.cpuCurrent * l.ncpuFactor,
"cur_mem": l.memoryCurrent,
"exceeded": isLimitExceeded,
}).Log("Observation")
if isLimitExceeded { if isLimitExceeded {
go l.onLimit(l.cpuCurrent, l.memoryCurrent) go l.onLimit(l.cpuCurrent*l.ncpuFactor, l.memoryCurrent)
} }
} }
func (l *limiter) Limit(enable bool) error { func (l *limiter) Limit(enable bool) error {
l.lock.Lock()
defer l.lock.Unlock()
if l.mode == LimitModeHard {
return nil
}
if enable { if enable {
if l.enableLimit { if l.enableLimit {
return nil return nil
@@ -280,6 +326,8 @@ func (l *limiter) Limit(enable bool) error {
l.enableLimit = true l.enableLimit = true
l.logger.Debug().Log("Limiter enabled")
go l.limit(ctx, l.cpu/100, time.Second) go l.limit(ctx, l.cpu/100, time.Second)
} else { } else {
if !l.enableLimit { if !l.enableLimit {
@@ -290,8 +338,12 @@ func (l *limiter) Limit(enable bool) error {
return nil return nil
} }
l.enableLimit = false
l.cancelLimit() l.cancelLimit()
l.cancelLimit = nil l.cancelLimit = nil
l.logger.Debug().Log("Limiter disabled")
} }
return nil return nil
@@ -301,33 +353,60 @@ func (l *limiter) Limit(enable bool) error {
// normed to 0-1. The interval defines how long a time slot is that will be splitted // normed to 0-1. The interval defines how long a time slot is that will be splitted
// into sleeping and working. // into sleeping and working.
func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) { func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) {
defer func() {
l.lock.Lock()
if l.proc != nil {
l.proc.Resume()
}
l.lock.Unlock()
l.logger.Debug().Log("CPU throttler disabled")
}()
var workingrate float64 = -1 var workingrate float64 = -1
l.logger.Debug().Log("CPU throttler enabled")
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
}
l.lock.Lock()
pcpu := l.cpuCurrent / 100
l.lock.Unlock()
if workingrate < 0 {
workingrate = limit
} else {
workingrate = math.Min(workingrate/pcpu*limit, 1)
}
worktime := float64(interval.Nanoseconds()) * workingrate
sleeptime := float64(interval.Nanoseconds()) - worktime
l.logger.Debug().WithFields(log.Fields{
"worktime": (time.Duration(worktime) * time.Nanosecond).String(),
"sleeptime": (time.Duration(sleeptime) * time.Nanosecond).String(),
}).Log("Throttler")
l.lock.Lock()
if l.proc != nil {
l.proc.Resume()
}
l.lock.Unlock()
time.Sleep(time.Duration(worktime) * time.Nanosecond)
if sleeptime > 0 {
l.lock.Lock() l.lock.Lock()
pcpu := l.cpuCurrent / 100 if l.proc != nil {
l.proc.Suspend()
}
l.lock.Unlock() l.lock.Unlock()
if workingrate < 0 { time.Sleep(time.Duration(sleeptime) * time.Nanosecond)
workingrate = limit
} else {
workingrate = math.Min(workingrate/pcpu*limit, 1)
}
worktime := float64(interval.Nanoseconds()) * workingrate
sleeptime := float64(interval.Nanoseconds()) - worktime
l.proc.Resume()
time.Sleep(time.Duration(worktime) * time.Nanosecond)
if sleeptime > 0 {
l.proc.Suspend()
time.Sleep(time.Duration(sleeptime) * time.Nanosecond)
}
} }
} }
} }

View File

@@ -271,7 +271,12 @@ func New(config Config) (Process, error) {
Memory: config.LimitMemory, Memory: config.LimitMemory,
WaitFor: config.LimitDuration, WaitFor: config.LimitDuration,
Mode: config.LimitMode, Mode: config.LimitMode,
Logger: p.logger.WithComponent("ProcessLimiter"),
OnLimit: func(cpu float64, memory uint64) { OnLimit: func(cpu float64, memory uint64) {
if !p.isRunning() {
return
}
p.logger.WithFields(log.Fields{ p.logger.WithFields(log.Fields{
"cpu": cpu, "cpu": cpu,
"memory": memory, "memory": memory,
@@ -455,6 +460,16 @@ func (p *process) IsRunning() bool {
} }
func (p *process) Limit(enable bool) error { func (p *process) Limit(enable bool) error {
if !p.isRunning() {
return nil
}
if p.limits == nil {
return nil
}
p.logger.Warn().WithField("limit", enable).Log("Limiter triggered")
return p.limits.Limit(enable) return p.limits.Limit(enable)
} }

View File

@@ -138,7 +138,7 @@ func (u *util) Start() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
u.stopTicker = cancel u.stopTicker = cancel
go u.tick(ctx, 100*time.Millisecond) go u.tick(ctx, 1000*time.Millisecond)
}) })
} }
@@ -247,9 +247,6 @@ func (u *util) tick(ctx context.Context, interval time.Duration) {
u.statPrevious, u.statCurrent = u.statCurrent, stat u.statPrevious, u.statCurrent = u.statCurrent, stat
u.statPreviousTime, u.statCurrentTime = u.statCurrentTime, t u.statPreviousTime, u.statCurrentTime = u.statCurrentTime, t
u.lock.Unlock() u.lock.Unlock()
//p, _ := u.CPUPercent()
//fmt.Printf("%+v\n", p)
} }
} }
} }

View File

@@ -2,9 +2,11 @@ package resources
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"time" "time"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/psutil" "github.com/datarhei/core/v16/psutil"
) )
@@ -24,6 +26,8 @@ type resources struct {
lock sync.Mutex lock sync.Mutex
startOnce sync.Once startOnce sync.Once
stopOnce sync.Once stopOnce sync.Once
logger log.Logger
} }
type Resources interface { type Resources interface {
@@ -36,24 +40,42 @@ type Resources interface {
Remove(cpu float64, memory uint64) Remove(cpu float64, memory uint64)
} }
func New(maxCPU, maxMemory float64) (Resources, error) { type Config struct {
MaxCPU float64
MaxMemory float64
Logger log.Logger
}
func New(config Config) (Resources, error) {
r := &resources{ r := &resources{
maxCPU: maxCPU, maxCPU: config.MaxCPU,
logger: config.Logger,
} }
vmstat, err := psutil.VirtualMemory() vmstat, err := psutil.VirtualMemory()
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("unable to determine available memory: %w", err)
} }
ncpu, err := psutil.CPUCounts(true) ncpu, err := psutil.CPUCounts(true)
if err != nil { if err != nil {
ncpu = 1 return nil, fmt.Errorf("unable to determine number of logical CPUs: %w", err)
} }
r.ncpu = ncpu r.ncpu = ncpu
r.maxMemory = uint64(float64(vmstat.Total) * maxMemory / 100) r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100)
if r.logger == nil {
r.logger = log.New("")
}
r.logger = r.logger.WithFields(log.Fields{
"max_cpu": r.maxCPU,
"max_memory": r.maxMemory,
})
r.logger.Debug().Log("Created")
r.stopOnce.Do(func() {}) r.stopOnce.Do(func() {})
@@ -70,6 +92,8 @@ func (r *resources) Start() {
go r.observe(ctx, time.Second) go r.observe(ctx, time.Second)
r.stopOnce = sync.Once{} r.stopOnce = sync.Once{}
r.logger.Debug().Log("Started")
}) })
} }
@@ -78,6 +102,8 @@ func (r *resources) Stop() {
r.cancelObserver() r.cancelObserver()
r.startOnce = sync.Once{} r.startOnce = sync.Once{}
r.logger.Debug().Log("Stopped")
}) })
} }
@@ -89,35 +115,59 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
r.logger.Debug().Log("Observer started")
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
limit := false
cpustat, err := psutil.CPUPercent() cpustat, err := psutil.CPUPercent()
if err != nil { if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine CPU load")
continue continue
} }
cpuload := cpustat.User + cpustat.System + cpustat.Other cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu
if cpuload > r.maxCPU {
limit = true
}
vmstat, err := psutil.VirtualMemory() vmstat, err := psutil.VirtualMemory()
if err != nil { if err != nil {
continue continue
} }
if vmstat.Used > r.maxMemory { r.logger.Debug().WithFields(log.Fields{
"cur_cpu": cpuload,
"cur_memory": vmstat.Used,
}).Log("Observation")
limit := false
if !r.isLimiting {
if cpuload > r.maxCPU {
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached")
limit = true
}
if vmstat.Used > r.maxMemory {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
limit = true
}
} else {
limit = true limit = true
if cpuload <= r.maxCPU*0.8 {
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released")
limit = false
}
if vmstat.Used <= uint64(float64(r.maxMemory)*0.8) {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
limit = false
}
} }
r.lock.Lock() r.lock.Lock()
if r.isLimiting != limit { if r.isLimiting != limit {
r.logger.Debug().WithField("enabled", limit).Log("Limiting")
r.isLimiting = limit r.isLimiting = limit
select { select {
case r.limit <- limit: case r.limit <- limit:
@@ -133,21 +183,44 @@ func (r *resources) Add(cpu float64, memory uint64) bool {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
logger := r.logger.WithFields(log.Fields{
"cpu": cpu,
"memory": memory,
})
logger.Debug().WithFields(log.Fields{
"used_cpu": r.consumerCPU,
"used_memory": r.consumerMemory,
}).Log("Request for acquiring resources")
if r.isLimiting { if r.isLimiting {
logger.Debug().Log("Rejected, currently limiting")
return false
}
if cpu <= 0 || memory == 0 {
logger.Debug().Log("Rejected, invalid values")
return false return false
} }
if r.consumerCPU+cpu > r.maxCPU { if r.consumerCPU+cpu > r.maxCPU {
logger.Debug().Log("Rejected, CPU limit exceeded")
return false return false
} }
if r.consumerMemory+memory > r.maxMemory { if r.consumerMemory+memory > r.maxMemory {
logger.Debug().Log("Rejected, memory limit exceeded")
return false return false
} }
r.consumerCPU += cpu r.consumerCPU += cpu
r.consumerMemory += memory r.consumerMemory += memory
logger.Debug().WithFields(log.Fields{
"used_cpu": r.consumerCPU,
"used_memory": r.consumerMemory,
}).Log("Acquiring approved")
return true return true
} }
@@ -155,6 +228,26 @@ func (r *resources) Remove(cpu float64, memory uint64) {
r.lock.Lock() r.lock.Lock()
defer r.lock.Unlock() defer r.lock.Unlock()
logger := r.logger.WithFields(log.Fields{
"cpu": cpu,
"memory": memory,
})
logger.Debug().WithFields(log.Fields{
"used_cpu": r.consumerCPU,
"used_memory": r.consumerMemory,
}).Log("Request for releasing resources")
r.consumerCPU -= cpu r.consumerCPU -= cpu
r.consumerMemory -= memory r.consumerMemory -= memory
if r.consumerCPU < 0 {
logger.Warn().WithField("used_cpu", r.consumerCPU).Log("Used CPU resources below 0")
r.consumerCPU = 0
}
logger.Debug().WithFields(log.Fields{
"used_cpu": r.consumerCPU,
"used_memory": r.consumerMemory,
}).Log("Releasing approved")
} }

View File

@@ -167,12 +167,26 @@ func New(config Config) (Restreamer, error) {
r.maxProc = config.MaxProcesses r.maxProc = config.MaxProcesses
if config.MaxCPU > 0 || config.MaxMemory > 0 { if config.MaxCPU > 0 || config.MaxMemory > 0 {
r.resources, _ = resources.New(config.MaxCPU, config.MaxMemory) if config.MaxCPU <= 0 || config.MaxMemory <= 0 {
return nil, fmt.Errorf("both MaxCPU and MaxMemory have to be set")
}
resources, err := resources.New(resources.Config{
MaxCPU: config.MaxCPU,
MaxMemory: config.MaxMemory,
Logger: r.logger.WithComponent("Resources"),
})
if err != nil {
return nil, fmt.Errorf("failed to initialize resource manager: %w", err)
}
r.resources = resources
r.enableSoftLimit = true r.enableSoftLimit = true
r.logger.Debug().Log("Enabling resource manager")
} }
if err := r.load(); err != nil { if err := r.load(); err != nil {
return nil, fmt.Errorf("failed to load data from DB (%w)", err) return nil, fmt.Errorf("failed to load data from DB: %w", err)
} }
r.save() r.save()
@@ -291,10 +305,6 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources
case <-ctx.Done(): case <-ctx.Done():
return return
case limit := <-rsc.Limit(): case limit := <-rsc.Limit():
if limit {
r.logger.Warn().WithField("limit", limit).Log("limiter triggered")
}
r.lock.Lock() r.lock.Lock()
for id, t := range r.tasks { for id, t := range r.tasks {
if !t.valid { if !t.valid {
@@ -363,6 +373,8 @@ func (r *restream) load() error {
// replaced, we can resolve references and validate the // replaced, we can resolve references and validate the
// inputs and outputs. // inputs and outputs.
for _, t := range tasks { for _, t := range tasks {
t := t
// Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version
if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if c, err := semver.NewConstraint(t.config.FFVersion); err == nil {
if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil {
@@ -430,6 +442,10 @@ func (r *restream) load() error {
return nil return nil
} }
if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 {
return fmt.Errorf("process needs to have CPU and memory limits defined")
}
if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) {
return fmt.Errorf("not enough resources available") return fmt.Errorf("not enough resources available")
} }
@@ -611,6 +627,10 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
return nil return nil
} }
if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 {
return fmt.Errorf("process needs to have CPU and memory limits defined")
}
if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) {
return fmt.Errorf("not enough resources available") return fmt.Errorf("not enough resources available")
} }
@@ -1359,6 +1379,10 @@ func (r *restream) reloadProcess(id string) error {
return nil return nil
} }
if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 {
return fmt.Errorf("process needs to have CPU and memory limits defined")
}
if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) {
return fmt.Errorf("not enough resources available") return fmt.Errorf("not enough resources available")
} }