diff --git a/process/limits.go b/process/limiter.go similarity index 94% rename from process/limits.go rename to process/limiter.go index f208f55e..e2f6b2a6 100644 --- a/process/limits.go +++ b/process/limiter.go @@ -66,7 +66,7 @@ type Limiter interface { // Limit enables or disables the throttling of the CPU or killing because of to much // memory consumption. - Limit(enable bool) error + Limit(limit int) error } type limiter struct { @@ -80,6 +80,7 @@ type limiter struct { cpu float64 cpuCurrent float64 cpuMax float64 + cpuTop float64 cpuAvg float64 cpuAvgCounter uint64 cpuLast float64 @@ -88,6 +89,7 @@ type limiter struct { memory uint64 memoryCurrent uint64 memoryMax uint64 + memoryTop uint64 memoryAvg float64 memoryAvgCounter uint64 memoryLast uint64 @@ -152,12 +154,14 @@ func (l *limiter) reset() { l.cpuAvg = 0 l.cpuAvgCounter = 0 l.cpuMax = 0 + l.cpuTop = 0 l.memoryCurrent = 0 l.memoryLast = 0 l.memoryAvg = 0 l.memoryAvgCounter = 0 l.memoryMax = 0 + l.memoryTop = 0 } func (l *limiter) Start(process psutil.Process) error { @@ -232,6 +236,12 @@ func (l *limiter) collect(t time.Time) { l.memoryMax = l.memoryCurrent } + if l.memoryCurrent > l.memoryTop { + l.memoryTop = l.memoryCurrent + } else { + l.memoryTop = uint64(float64(l.memoryTop) * 0.95) + } + l.memoryAvgCounter++ l.memoryAvg = ((l.memoryAvg * float64(l.memoryAvgCounter-1)) + float64(l.memoryCurrent)) / float64(l.memoryAvgCounter) @@ -244,6 +254,12 @@ func (l *limiter) collect(t time.Time) { l.cpuMax = l.cpuCurrent } + if l.cpuCurrent > l.cpuTop { + l.cpuTop = l.cpuCurrent + } else { + l.cpuTop = l.cpuTop * 0.95 + } + l.cpuAvgCounter++ l.cpuAvg = ((l.cpuAvg * float64(l.cpuAvgCounter-1)) + l.cpuCurrent) / float64(l.cpuAvgCounter) @@ -295,7 +311,9 @@ func (l *limiter) collect(t time.Time) { l.logger.Debug().WithFields(log.Fields{ "cur_cpu": l.cpuCurrent * l.ncpuFactor, + "top_cpu": l.cpuTop * l.ncpuFactor, "cur_mem": l.memoryCurrent, + "top_mem": l.memoryTop, "exceeded": isLimitExceeded, }).Log("Observation") @@ -304,7 +322,7 @@ func (l *limiter) collect(t time.Time) { } } -func (l *limiter) Limit(enable bool) error { +func (l *limiter) Limit(limit int) error { l.lock.Lock() defer l.lock.Unlock() @@ -312,7 +330,7 @@ func (l *limiter) Limit(enable bool) error { return nil } - if enable { + if limit > 0 { if l.enableLimit { return nil } @@ -344,6 +362,7 @@ func (l *limiter) Limit(enable bool) error { l.cancelLimit = nil l.logger.Debug().Log("Limiter disabled") + } return nil @@ -384,6 +403,8 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati workingrate = math.Min(workingrate/pcpu*limit, 1) } + workingrate = limit + worktime := float64(interval.Nanoseconds()) * workingrate sleeptime := float64(interval.Nanoseconds()) - worktime diff --git a/process/limits_test.go b/process/limiter_test.go similarity index 100% rename from process/limits_test.go rename to process/limiter_test.go diff --git a/process/process.go b/process/process.go index 5510ff4d..0031b90c 100644 --- a/process/process.go +++ b/process/process.go @@ -46,7 +46,7 @@ type Process interface { // Limit enabled or disables CPU and memory limiting. CPU will be throttled // into the configured limit. If memory consumption is above the configured // limit, the process will be killed. - Limit(enable bool) error + Limit(limit int) error } // Config is the configuration of a process @@ -459,7 +459,7 @@ func (p *process) IsRunning() bool { return p.isRunning() } -func (p *process) Limit(enable bool) error { +func (p *process) Limit(limit int) error { if !p.isRunning() { return nil } @@ -468,9 +468,9 @@ func (p *process) Limit(enable bool) error { return nil } - p.logger.Warn().WithField("limit", enable).Log("Limiter triggered") + p.logger.Warn().WithField("limit", limit).Log("Limiter triggered") - return p.limits.Limit(enable) + return p.limits.Limit(limit) } // Start will start the process and sets the order to "start". If the diff --git a/restream/resources/resources.go b/restream/resources/resources.go index 8929c2f3..86a6625a 100644 --- a/restream/resources/resources.go +++ b/restream/resources/resources.go @@ -15,7 +15,8 @@ type resources struct { maxCPU float64 maxMemory uint64 - limit chan bool + limitCh chan int + limitRate int isLimiting bool cancelObserver context.CancelFunc @@ -31,7 +32,7 @@ type Resources interface { Start() Stop() - Limit() <-chan bool + Limit() <-chan int Request(cpu float64, memory uint64) error } @@ -82,7 +83,7 @@ func New(config Config) (Resources, error) { func (r *resources) Start() { r.startOnce.Do(func() { - r.limit = make(chan bool, 10) + r.limitCh = make(chan int, 10) ctx, cancel := context.WithCancel(context.Background()) r.cancelObserver = cancel @@ -105,8 +106,8 @@ func (r *resources) Stop() { }) } -func (r *resources) Limit() <-chan bool { - return r.limit +func (r *resources) Limit() <-chan int { + return r.limitCh } func (r *resources) observe(ctx context.Context, interval time.Duration) { @@ -139,37 +140,52 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) { "cur_memory": vmstat.Used, }).Log("Observation") - limit := false + doLimit := false if !r.isLimiting { if cpuload > r.maxCPU { r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached") - limit = true + doLimit = true } if vmstat.Used > r.maxMemory { r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached") - limit = true + doLimit = true } } else { - limit = true - if cpuload <= r.maxCPU*0.8 { - r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released") - limit = false - } - - if vmstat.Used <= uint64(float64(r.maxMemory)*0.8) { - r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached") - limit = false + doLimit = true + if cpuload <= r.maxCPU && vmstat.Used <= r.maxMemory { + doLimit = false } } r.lock.Lock() - if r.isLimiting != limit { - r.logger.Debug().WithField("enabled", limit).Log("Limiting") - r.isLimiting = limit + if r.isLimiting != doLimit { + if !r.isLimiting { + r.limitRate = 100 + } else { + if r.limitRate > 0 { + r.limitRate -= 10 + doLimit = true + + if r.limitRate == 0 { + r.logger.Debug().WithFields(log.Fields{ + "cpu": cpuload, + "memory": vmstat.Used, + }).Log("CPU and memory limit released") + doLimit = false + } + } + } + + r.logger.Debug().WithFields(log.Fields{ + "enabled": doLimit, + "rate": r.limitRate, + }).Log("Limiting") + + r.isLimiting = doLimit select { - case r.limit <- limit: + case r.limitCh <- r.limitRate: default: } }