diff --git a/app/api/api.go b/app/api/api.go index 9e64965c..d48b77e3 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -664,6 +664,8 @@ func (a *api) start() error { Replace: a.replacer, FFmpeg: a.ffmpeg, MaxProcesses: cfg.FFmpeg.MaxProcesses, + MaxCPU: cfg.Resources.MaxCPUUsage, + MaxMemory: cfg.Resources.MaxMemoryUsage, Logger: a.log.logger.core.WithComponent("Process"), }) diff --git a/config/config.go b/config/config.go index c6f3c114..d285bef2 100644 --- a/config/config.go +++ b/config/config.go @@ -457,6 +457,17 @@ func (d *Config) Validate(resetLogs bool) { d.vars.Log("error", "metrics.interval", "must be smaller than the range") } } + + // If resource limits are given, all values must be set + if d.Resources.MaxCPUUsage > 0 || d.Resources.MaxMemoryUsage > 0 { + if d.Resources.MaxCPUUsage <= 0 { + d.vars.Log("error", "resources.max_cpu_usage", "must be greater than 0") + } + + if d.Resources.MaxMemoryUsage <= 0 { + d.vars.Log("error", "resources.max_memory_usage", "must be greater than 0") + } + } } // Merge merges the values of the known environment variables into the configuration diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 6a86a43d..3b1e9710 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -132,6 +132,11 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { } } + limitMode := process.LimitModeHard + if config.LimitMode == "soft" { + limitMode = process.LimitModeSoft + } + ffmpeg, err := process.New(process.Config{ Binary: f.binary, Args: config.Args, @@ -142,7 +147,7 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { LimitCPU: config.LimitCPU, LimitMemory: config.LimitMemory, LimitDuration: config.LimitDuration, - LimitMode: process.LimitModeHard, + LimitMode: limitMode, Scheduler: scheduler, Parser: config.Parser, Logger: config.Logger, diff --git a/process/limits.go b/process/limits.go index cde0e8e7..b5ab8116 100644 --- a/process/limits.go +++ b/process/limits.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/psutil" ) @@ -41,6 +42,7 @@ type LimiterConfig struct { WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered OnLimit LimitFunc // Function to be triggered if limits are exceeded Mode LimitMode // How to limit CPU usage + Logger log.Logger } type Limiter interface { @@ -68,11 +70,12 @@ type Limiter interface { } type limiter struct { - ncpu float64 - proc psutil.Process - lock sync.Mutex - cancel context.CancelFunc - onLimit LimitFunc + ncpu float64 + ncpuFactor float64 + proc psutil.Process + lock sync.Mutex + cancel context.CancelFunc + onLimit LimitFunc cpu float64 cpuCurrent float64 @@ -94,6 +97,8 @@ type limiter struct { mode LimitMode enableLimit bool cancelLimit context.CancelFunc + + logger log.Logger } // NewLimiter returns a new Limiter @@ -104,6 +109,11 @@ func NewLimiter(config LimiterConfig) Limiter { waitFor: config.WaitFor, onLimit: config.OnLimit, mode: config.Mode, + logger: config.Logger, + } + + if l.logger == nil { + l.logger = log.New("") } if ncpu, err := psutil.CPUCounts(true); err != nil { @@ -112,18 +122,31 @@ func NewLimiter(config LimiterConfig) Limiter { l.ncpu = ncpu } + l.ncpuFactor = 1 + + mode := "hard" if l.mode == LimitModeSoft { + mode = "soft" l.cpu /= l.ncpu + l.ncpuFactor = l.ncpu } if l.onLimit == nil { l.onLimit = func(float64, uint64) {} } + l.logger = l.logger.WithFields(log.Fields{ + "cpu": l.cpu * l.ncpuFactor, + "memory": l.memory, + "mode": mode, + }) + return l } func (l *limiter) reset() { + l.enableLimit = false + l.cpuCurrent = 0 l.cpuLast = 0 l.cpuAvg = 0 @@ -152,7 +175,7 @@ func (l *limiter) Start(process psutil.Process) error { ctx, cancel := context.WithCancel(context.Background()) l.cancel = cancel - go l.ticker(ctx, 500*time.Millisecond) + go l.ticker(ctx, 1000*time.Millisecond) return nil } @@ -167,6 +190,13 @@ func (l *limiter) Stop() { l.cancel() + if l.cancelLimit != nil { + l.cancelLimit() + l.cancelLimit = nil + } + + l.enableLimit = false + l.proc.Stop() l.proc = nil @@ -232,6 +262,7 @@ func (l *limiter) collect(t time.Time) { } if time.Since(l.cpuLimitSince) >= l.waitFor { + l.logger.Warn().Log("CPU limit exceeded") isLimitExceeded = true } } @@ -247,6 +278,7 @@ func (l *limiter) collect(t time.Time) { } if time.Since(l.memoryLimitSince) >= l.waitFor { + l.logger.Warn().Log("Memory limit exceeded") isLimitExceeded = true } } @@ -255,17 +287,31 @@ func (l *limiter) collect(t time.Time) { if l.memory > 0 { if l.memoryCurrent > l.memory { // Current value is higher than the limit + l.logger.Warn().Log("Memory limit exceeded") isLimitExceeded = true } } } + l.logger.Debug().WithFields(log.Fields{ + "cur_cpu": l.cpuCurrent * l.ncpuFactor, + "cur_mem": l.memoryCurrent, + "exceeded": isLimitExceeded, + }).Log("Observation") + if isLimitExceeded { - go l.onLimit(l.cpuCurrent, l.memoryCurrent) + go l.onLimit(l.cpuCurrent*l.ncpuFactor, l.memoryCurrent) } } func (l *limiter) Limit(enable bool) error { + l.lock.Lock() + defer l.lock.Unlock() + + if l.mode == LimitModeHard { + return nil + } + if enable { if l.enableLimit { return nil @@ -280,6 +326,8 @@ func (l *limiter) Limit(enable bool) error { l.enableLimit = true + l.logger.Debug().Log("Limiter enabled") + go l.limit(ctx, l.cpu/100, time.Second) } else { if !l.enableLimit { @@ -290,8 +338,12 @@ func (l *limiter) Limit(enable bool) error { return nil } + l.enableLimit = false + l.cancelLimit() l.cancelLimit = nil + + l.logger.Debug().Log("Limiter disabled") } return nil @@ -301,33 +353,60 @@ func (l *limiter) Limit(enable bool) error { // normed to 0-1. The interval defines how long a time slot is that will be splitted // into sleeping and working. func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) { + defer func() { + l.lock.Lock() + if l.proc != nil { + l.proc.Resume() + } + l.lock.Unlock() + + l.logger.Debug().Log("CPU throttler disabled") + }() var workingrate float64 = -1 + l.logger.Debug().Log("CPU throttler enabled") + for { select { case <-ctx.Done(): return default: + } + + l.lock.Lock() + pcpu := l.cpuCurrent / 100 + l.lock.Unlock() + + if workingrate < 0 { + workingrate = limit + } else { + workingrate = math.Min(workingrate/pcpu*limit, 1) + } + + worktime := float64(interval.Nanoseconds()) * workingrate + sleeptime := float64(interval.Nanoseconds()) - worktime + + l.logger.Debug().WithFields(log.Fields{ + "worktime": (time.Duration(worktime) * time.Nanosecond).String(), + "sleeptime": (time.Duration(sleeptime) * time.Nanosecond).String(), + }).Log("Throttler") + + l.lock.Lock() + if l.proc != nil { + l.proc.Resume() + } + l.lock.Unlock() + + time.Sleep(time.Duration(worktime) * time.Nanosecond) + + if sleeptime > 0 { l.lock.Lock() - pcpu := l.cpuCurrent / 100 + if l.proc != nil { + l.proc.Suspend() + } l.lock.Unlock() - if workingrate < 0 { - workingrate = limit - } else { - workingrate = math.Min(workingrate/pcpu*limit, 1) - } - - worktime := float64(interval.Nanoseconds()) * workingrate - sleeptime := float64(interval.Nanoseconds()) - worktime - - l.proc.Resume() - time.Sleep(time.Duration(worktime) * time.Nanosecond) - - if sleeptime > 0 { - l.proc.Suspend() - time.Sleep(time.Duration(sleeptime) * time.Nanosecond) - } + time.Sleep(time.Duration(sleeptime) * time.Nanosecond) } } } diff --git a/process/process.go b/process/process.go index 847e340f..5510ff4d 100644 --- a/process/process.go +++ b/process/process.go @@ -271,7 +271,12 @@ func New(config Config) (Process, error) { Memory: config.LimitMemory, WaitFor: config.LimitDuration, Mode: config.LimitMode, + Logger: p.logger.WithComponent("ProcessLimiter"), OnLimit: func(cpu float64, memory uint64) { + if !p.isRunning() { + return + } + p.logger.WithFields(log.Fields{ "cpu": cpu, "memory": memory, @@ -455,6 +460,16 @@ func (p *process) IsRunning() bool { } func (p *process) Limit(enable bool) error { + if !p.isRunning() { + return nil + } + + if p.limits == nil { + return nil + } + + p.logger.Warn().WithField("limit", enable).Log("Limiter triggered") + return p.limits.Limit(enable) } diff --git a/psutil/psutil.go b/psutil/psutil.go index 9b08ffee..f9b1441d 100644 --- a/psutil/psutil.go +++ b/psutil/psutil.go @@ -138,7 +138,7 @@ func (u *util) Start() { ctx, cancel := context.WithCancel(context.Background()) u.stopTicker = cancel - go u.tick(ctx, 100*time.Millisecond) + go u.tick(ctx, 1000*time.Millisecond) }) } @@ -247,9 +247,6 @@ func (u *util) tick(ctx context.Context, interval time.Duration) { u.statPrevious, u.statCurrent = u.statCurrent, stat u.statPreviousTime, u.statCurrentTime = u.statCurrentTime, t u.lock.Unlock() - - //p, _ := u.CPUPercent() - //fmt.Printf("%+v\n", p) } } } diff --git a/restream/resources/resources.go b/restream/resources/resources.go index 1ccf526a..b3b66364 100644 --- a/restream/resources/resources.go +++ b/restream/resources/resources.go @@ -2,9 +2,11 @@ package resources import ( "context" + "fmt" "sync" "time" + "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/psutil" ) @@ -24,6 +26,8 @@ type resources struct { lock sync.Mutex startOnce sync.Once stopOnce sync.Once + + logger log.Logger } type Resources interface { @@ -36,24 +40,42 @@ type Resources interface { Remove(cpu float64, memory uint64) } -func New(maxCPU, maxMemory float64) (Resources, error) { +type Config struct { + MaxCPU float64 + MaxMemory float64 + Logger log.Logger +} + +func New(config Config) (Resources, error) { r := &resources{ - maxCPU: maxCPU, + maxCPU: config.MaxCPU, + logger: config.Logger, } vmstat, err := psutil.VirtualMemory() if err != nil { - return nil, err + return nil, fmt.Errorf("unable to determine available memory: %w", err) } ncpu, err := psutil.CPUCounts(true) if err != nil { - ncpu = 1 + return nil, fmt.Errorf("unable to determine number of logical CPUs: %w", err) } r.ncpu = ncpu - r.maxMemory = uint64(float64(vmstat.Total) * maxMemory / 100) + r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100) + + if r.logger == nil { + r.logger = log.New("") + } + + r.logger = r.logger.WithFields(log.Fields{ + "max_cpu": r.maxCPU, + "max_memory": r.maxMemory, + }) + + r.logger.Debug().Log("Created") r.stopOnce.Do(func() {}) @@ -70,6 +92,8 @@ func (r *resources) Start() { go r.observe(ctx, time.Second) r.stopOnce = sync.Once{} + + r.logger.Debug().Log("Started") }) } @@ -78,6 +102,8 @@ func (r *resources) Stop() { r.cancelObserver() r.startOnce = sync.Once{} + + r.logger.Debug().Log("Stopped") }) } @@ -89,35 +115,59 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() + r.logger.Debug().Log("Observer started") + for { select { case <-ctx.Done(): return case <-ticker.C: - limit := false - cpustat, err := psutil.CPUPercent() if err != nil { + r.logger.Warn().WithError(err).Log("Failed to determine CPU load") continue } - cpuload := cpustat.User + cpustat.System + cpustat.Other - - if cpuload > r.maxCPU { - limit = true - } + cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu vmstat, err := psutil.VirtualMemory() if err != nil { continue } - if vmstat.Used > r.maxMemory { + r.logger.Debug().WithFields(log.Fields{ + "cur_cpu": cpuload, + "cur_memory": vmstat.Used, + }).Log("Observation") + + limit := false + + if !r.isLimiting { + if cpuload > r.maxCPU { + r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached") + limit = true + } + + if vmstat.Used > r.maxMemory { + r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached") + limit = true + } + } else { limit = true + if cpuload <= r.maxCPU*0.8 { + r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released") + limit = false + } + + if vmstat.Used <= uint64(float64(r.maxMemory)*0.8) { + r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached") + limit = false + } } r.lock.Lock() if r.isLimiting != limit { + r.logger.Debug().WithField("enabled", limit).Log("Limiting") r.isLimiting = limit select { case r.limit <- limit: @@ -133,21 +183,44 @@ func (r *resources) Add(cpu float64, memory uint64) bool { r.lock.Lock() defer r.lock.Unlock() + logger := r.logger.WithFields(log.Fields{ + "cpu": cpu, + "memory": memory, + }) + + logger.Debug().WithFields(log.Fields{ + "used_cpu": r.consumerCPU, + "used_memory": r.consumerMemory, + }).Log("Request for acquiring resources") + if r.isLimiting { + logger.Debug().Log("Rejected, currently limiting") + return false + } + + if cpu <= 0 || memory == 0 { + logger.Debug().Log("Rejected, invalid values") return false } if r.consumerCPU+cpu > r.maxCPU { + logger.Debug().Log("Rejected, CPU limit exceeded") return false } if r.consumerMemory+memory > r.maxMemory { + logger.Debug().Log("Rejected, memory limit exceeded") return false } r.consumerCPU += cpu r.consumerMemory += memory + logger.Debug().WithFields(log.Fields{ + "used_cpu": r.consumerCPU, + "used_memory": r.consumerMemory, + }).Log("Acquiring approved") + return true } @@ -155,6 +228,26 @@ func (r *resources) Remove(cpu float64, memory uint64) { r.lock.Lock() defer r.lock.Unlock() + logger := r.logger.WithFields(log.Fields{ + "cpu": cpu, + "memory": memory, + }) + + logger.Debug().WithFields(log.Fields{ + "used_cpu": r.consumerCPU, + "used_memory": r.consumerMemory, + }).Log("Request for releasing resources") + r.consumerCPU -= cpu r.consumerMemory -= memory + + if r.consumerCPU < 0 { + logger.Warn().WithField("used_cpu", r.consumerCPU).Log("Used CPU resources below 0") + r.consumerCPU = 0 + } + + logger.Debug().WithFields(log.Fields{ + "used_cpu": r.consumerCPU, + "used_memory": r.consumerMemory, + }).Log("Releasing approved") } diff --git a/restream/restream.go b/restream/restream.go index e5c51d32..2f76601d 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -167,12 +167,26 @@ func New(config Config) (Restreamer, error) { r.maxProc = config.MaxProcesses if config.MaxCPU > 0 || config.MaxMemory > 0 { - r.resources, _ = resources.New(config.MaxCPU, config.MaxMemory) + if config.MaxCPU <= 0 || config.MaxMemory <= 0 { + return nil, fmt.Errorf("both MaxCPU and MaxMemory have to be set") + } + + resources, err := resources.New(resources.Config{ + MaxCPU: config.MaxCPU, + MaxMemory: config.MaxMemory, + Logger: r.logger.WithComponent("Resources"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize resource manager: %w", err) + } + r.resources = resources r.enableSoftLimit = true + + r.logger.Debug().Log("Enabling resource manager") } if err := r.load(); err != nil { - return nil, fmt.Errorf("failed to load data from DB (%w)", err) + return nil, fmt.Errorf("failed to load data from DB: %w", err) } r.save() @@ -291,10 +305,6 @@ func (r *restream) resourceObserver(ctx context.Context, rsc resources.Resources case <-ctx.Done(): return case limit := <-rsc.Limit(): - if limit { - r.logger.Warn().WithField("limit", limit).Log("limiter triggered") - } - r.lock.Lock() for id, t := range r.tasks { if !t.valid { @@ -363,6 +373,8 @@ func (r *restream) load() error { // replaced, we can resolve references and validate the // inputs and outputs. for _, t := range tasks { + t := t + // Just warn if the ffmpeg version constraint doesn't match the available ffmpeg version if c, err := semver.NewConstraint(t.config.FFVersion); err == nil { if v, err := semver.NewVersion(skills.FFmpeg.Version); err == nil { @@ -430,6 +442,10 @@ func (r *restream) load() error { return nil } + if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 { + return fmt.Errorf("process needs to have CPU and memory limits defined") + } + if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { return fmt.Errorf("not enough resources available") } @@ -611,6 +627,10 @@ func (r *restream) createTask(config *app.Config) (*task, error) { return nil } + if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 { + return fmt.Errorf("process needs to have CPU and memory limits defined") + } + if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { return fmt.Errorf("not enough resources available") } @@ -1359,6 +1379,10 @@ func (r *restream) reloadProcess(id string) error { return nil } + if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 { + return fmt.Errorf("process needs to have CPU and memory limits defined") + } + if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { return fmt.Errorf("not enough resources available") }