diff --git a/config/config.go b/config/config.go index d285bef2..47dcf65c 100644 --- a/config/config.go +++ b/config/config.go @@ -276,7 +276,7 @@ func (d *Config) init() { d.vars.Register(value.NewDir(&d.Router.UIPath, "", d.fs), "router.ui_path", "CORE_ROUTER_UI_PATH", nil, "Path to a directory holding UI files mounted as /ui", false, false) // Resources - d.vars.Register(value.NewFloat(&d.Resources.MaxCPUUsage, 0), "resources.max_cpu_usage", "CORE_RESOURCES_MAX_CPU_USAGE", nil, "Maximum system CPU usage in percent, from 0 (no limit) to 100*ncpu", false, false) + d.vars.Register(value.NewFloat(&d.Resources.MaxCPUUsage, 0), "resources.max_cpu_usage", "CORE_RESOURCES_MAX_CPU_USAGE", nil, "Maximum system CPU usage in percent, from 0 (no limit) to 100", false, false) d.vars.Register(value.NewFloat(&d.Resources.MaxMemoryUsage, 0), "resources.max_memory_usage", "CORE_RESOURCES_MAX_MEMORY_USAGE", nil, "Maximum system usage in percent, from 0 (no limit) to 100", false, false) } @@ -460,12 +460,12 @@ func (d *Config) Validate(resetLogs bool) { // If resource limits are given, all values must be set if d.Resources.MaxCPUUsage > 0 || d.Resources.MaxMemoryUsage > 0 { - if d.Resources.MaxCPUUsage <= 0 { - d.vars.Log("error", "resources.max_cpu_usage", "must be greater than 0") + if d.Resources.MaxCPUUsage <= 0 || d.Resources.MaxCPUUsage > 100 { + d.vars.Log("error", "resources.max_cpu_usage", "must be greater than 0 and smaller or equal to 100") } if d.Resources.MaxMemoryUsage <= 0 { - d.vars.Log("error", "resources.max_memory_usage", "must be greater than 0") + d.vars.Log("error", "resources.max_memory_usage", "must be greater than 0 and smaller or equal to 100") } } } diff --git a/process/limits.go b/process/limits.go index b5ab8116..f208f55e 100644 --- a/process/limits.go +++ b/process/limits.go @@ -362,6 +362,7 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati l.logger.Debug().Log("CPU throttler disabled") }() + var workingrate float64 = -1 l.logger.Debug().Log("CPU throttler enabled") @@ -387,6 +388,7 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati sleeptime := float64(interval.Nanoseconds()) - worktime l.logger.Debug().WithFields(log.Fields{ + "pcpu": pcpu, "worktime": (time.Duration(worktime) * time.Nanosecond).String(), "sleeptime": (time.Duration(sleeptime) * time.Nanosecond).String(), }).Log("Throttler") diff --git a/psutil/process.go b/psutil/process.go index f0e63f5b..7580d3bf 100644 --- a/psutil/process.go +++ b/psutil/process.go @@ -40,17 +40,15 @@ type process struct { statCurrentTime time.Time statPrevious cpuTimesStat statPreviousTime time.Time - - imposeLimit bool + nTicks uint64 } -func (u *util) Process(pid int32, limit bool) (Process, error) { +func (u *util) Process(pid int32) (Process, error) { p := &process{ - pid: pid, - hasCgroup: u.hasCgroup, - cpuLimit: u.cpuLimit, - ncpu: u.ncpu, - imposeLimit: limit, + pid: pid, + hasCgroup: u.hasCgroup, + cpuLimit: u.cpuLimit, + ncpu: u.ncpu, } proc, err := psprocess.NewProcess(pid) @@ -68,7 +66,7 @@ func (u *util) Process(pid int32, limit bool) (Process, error) { } func NewProcess(pid int32, limit bool) (Process, error) { - return DefaultUtil.Process(pid, limit) + return DefaultUtil.Process(pid) } func (p *process) tick(ctx context.Context, interval time.Duration) { @@ -85,6 +83,7 @@ func (p *process) tick(ctx context.Context, interval time.Duration) { p.lock.Lock() p.statPrevious, p.statCurrent = p.statCurrent, stat p.statPreviousTime, p.statCurrentTime = p.statCurrentTime, t + p.nTicks++ p.lock.Unlock() } } @@ -137,6 +136,19 @@ func (p *process) cpuTimes() (*cpuTimesStat, error) { func (p *process) CPUPercent() (*CPUInfoStat, error) { var diff float64 + for { + p.lock.RLock() + nTicks := p.nTicks + p.lock.RUnlock() + + if nTicks < 2 { + time.Sleep(100 * time.Millisecond) + continue + } + + break + } + p.lock.RLock() defer p.lock.RUnlock() diff --git a/psutil/psutil.go b/psutil/psutil.go index f9b1441d..f6b95934 100644 --- a/psutil/psutil.go +++ b/psutil/psutil.go @@ -81,7 +81,7 @@ type Util interface { NetIOCounters(pernic bool) ([]net.IOCountersStat, error) // Process returns a process observer for a process with the given pid. - Process(pid int32, limit bool) (Process, error) + Process(pid int32) (Process, error) } type util struct { @@ -101,6 +101,7 @@ type util struct { statCurrentTime time.Time statPrevious cpuTimesStat statPreviousTime time.Time + nTicks uint64 } // New returns a new util, it will be started automatically @@ -246,6 +247,7 @@ func (u *util) tick(ctx context.Context, interval time.Duration) { u.lock.Lock() u.statPrevious, u.statCurrent = u.statCurrent, stat u.statPreviousTime, u.statCurrentTime = u.statCurrentTime, t + u.nTicks++ u.lock.Unlock() } } @@ -317,6 +319,19 @@ func (u *util) cpuTimes() (*cpuTimesStat, error) { func (u *util) CPUPercent() (*CPUInfoStat, error) { var total float64 + for { + u.lock.RLock() + nTicks := u.nTicks + u.lock.RUnlock() + + if nTicks < 2 { + time.Sleep(100 * time.Millisecond) + continue + } + + break + } + u.lock.RLock() defer u.lock.RUnlock() diff --git a/restream/resources/resources.go b/restream/resources/resources.go index b3b66364..8929c2f3 100644 --- a/restream/resources/resources.go +++ b/restream/resources/resources.go @@ -15,9 +15,6 @@ type resources struct { maxCPU float64 maxMemory uint64 - consumerCPU float64 - consumerMemory uint64 - limit chan bool isLimiting bool @@ -36,8 +33,7 @@ type Resources interface { Limit() <-chan bool - Add(cpu float64, memory uint64) bool - Remove(cpu float64, memory uint64) + Request(cpu float64, memory uint64) error } type Config struct { @@ -64,6 +60,7 @@ func New(config Config) (Resources, error) { r.ncpu = ncpu + r.maxCPU *= r.ncpu r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100) if r.logger == nil { @@ -71,6 +68,7 @@ func New(config Config) (Resources, error) { } r.logger = r.logger.WithFields(log.Fields{ + "ncpu": r.ncpu, "max_cpu": r.maxCPU, "max_memory": r.maxMemory, }) @@ -124,7 +122,7 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { case <-ticker.C: cpustat, err := psutil.CPUPercent() if err != nil { - r.logger.Warn().WithError(err).Log("Failed to determine CPU load") + r.logger.Warn().WithError(err).Log("Failed to determine system CPU usage") continue } @@ -132,6 +130,7 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { vmstat, err := psutil.VirtualMemory() if err != nil { + r.logger.Warn().WithError(err).Log("Failed to determine system memory usage") continue } @@ -179,75 +178,55 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { } } -func (r *resources) Add(cpu float64, memory uint64) bool { +func (r *resources) Request(cpu float64, memory uint64) error { r.lock.Lock() defer r.lock.Unlock() logger := r.logger.WithFields(log.Fields{ - "cpu": cpu, - "memory": memory, + "req_cpu": cpu, + "req_memory": memory, }) - logger.Debug().WithFields(log.Fields{ - "used_cpu": r.consumerCPU, - "used_memory": r.consumerMemory, - }).Log("Request for acquiring resources") + logger.Debug().Log("Request for acquiring resources") if r.isLimiting { logger.Debug().Log("Rejected, currently limiting") - return false + return fmt.Errorf("resources are currenlty actively limited") } if cpu <= 0 || memory == 0 { logger.Debug().Log("Rejected, invalid values") - return false + return fmt.Errorf("the cpu and/or memory values are invalid: cpu=%f, memory=%d", cpu, memory) } - if r.consumerCPU+cpu > r.maxCPU { - logger.Debug().Log("Rejected, CPU limit exceeded") - return false + cpustat, err := psutil.CPUPercent() + if err != nil { + r.logger.Warn().WithError(err).Log("Failed to determine system CPU usage") + return fmt.Errorf("the system CPU usage couldn't be determined") } - if r.consumerMemory+memory > r.maxMemory { - logger.Debug().Log("Rejected, memory limit exceeded") - return false + cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu + + vmstat, err := psutil.VirtualMemory() + if err != nil { + r.logger.Warn().WithError(err).Log("Failed to determine system memory usage") + return fmt.Errorf("the system memory usage couldn't be determined") } - r.consumerCPU += cpu - r.consumerMemory += memory + if cpuload+cpu > r.maxCPU { + logger.Debug().WithField("cur_cpu", cpuload).Log("Rejected, CPU limit exceeded") + return fmt.Errorf("the CPU limit would be exceeded: %f + %f > %f", cpuload, cpu, r.maxCPU) + } + + if vmstat.Used+memory > r.maxMemory { + logger.Debug().WithField("cur_memory", vmstat.Used).Log("Rejected, memory limit exceeded") + return fmt.Errorf("the memory limit would be exceeded: %d + %d > %d", vmstat.Used, memory, r.maxMemory) + } logger.Debug().WithFields(log.Fields{ - "used_cpu": r.consumerCPU, - "used_memory": r.consumerMemory, + "cur_cpu": cpuload, + "cur_memory": vmstat.Used, }).Log("Acquiring approved") - return true -} - -func (r *resources) Remove(cpu float64, memory uint64) { - r.lock.Lock() - defer r.lock.Unlock() - - logger := r.logger.WithFields(log.Fields{ - "cpu": cpu, - "memory": memory, - }) - - logger.Debug().WithFields(log.Fields{ - "used_cpu": r.consumerCPU, - "used_memory": r.consumerMemory, - }).Log("Request for releasing resources") - - r.consumerCPU -= cpu - r.consumerMemory -= memory - - if r.consumerCPU < 0 { - logger.Warn().WithField("used_cpu", r.consumerCPU).Log("Used CPU resources below 0") - r.consumerCPU = 0 - } - - logger.Debug().WithFields(log.Fields{ - "used_cpu": r.consumerCPU, - "used_memory": r.consumerMemory, - }).Log("Releasing approved") + return nil } diff --git a/restream/restream.go b/restream/restream.go index 2f76601d..cee934c9 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -442,23 +442,12 @@ func (r *restream) load() error { return nil } - if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 { - return fmt.Errorf("process needs to have CPU and memory limits defined") - } - - if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { - return fmt.Errorf("not enough resources available") + if err := r.resources.Request(t.config.LimitCPU, t.config.LimitMemory); err != nil { + return err } return nil }, - OnExit: func(string) { - if !r.enableSoftLimit { - return - } - - r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory) - }, }) if err != nil { return err @@ -627,23 +616,12 @@ func (r *restream) createTask(config *app.Config) (*task, error) { return nil } - if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 { - return fmt.Errorf("process needs to have CPU and memory limits defined") - } - - if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { - return fmt.Errorf("not enough resources available") + if err := r.resources.Request(t.config.LimitCPU, t.config.LimitMemory); err != nil { + return err } return nil }, - OnExit: func(string) { - if !r.enableSoftLimit { - return - } - - r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory) - }, }) if err != nil { return nil, err @@ -1379,23 +1357,12 @@ func (r *restream) reloadProcess(id string) error { return nil } - if t.config.LimitCPU <= 0 || t.config.LimitMemory == 0 { - return fmt.Errorf("process needs to have CPU and memory limits defined") - } - - if !r.resources.Add(t.config.LimitCPU, t.config.LimitMemory) { - return fmt.Errorf("not enough resources available") + if err := r.resources.Request(t.config.LimitCPU, t.config.LimitMemory); err != nil { + return err } return nil }, - OnExit: func(string) { - if !r.enableSoftLimit { - return - } - - r.resources.Remove(t.config.LimitCPU, t.config.LimitMemory) - }, }) if err != nil { return err