mirror of
https://github.com/datarhei/core.git
synced 2025-10-18 22:04:38 +08:00
WIP: release CPU throttling stepwise
This commit is contained in:
@@ -66,7 +66,7 @@ type Limiter interface {
|
||||
|
||||
// Limit enables or disables the throttling of the CPU or killing because of to much
|
||||
// memory consumption.
|
||||
Limit(enable bool) error
|
||||
Limit(limit int) error
|
||||
}
|
||||
|
||||
type limiter struct {
|
||||
@@ -80,6 +80,7 @@ type limiter struct {
|
||||
cpu float64
|
||||
cpuCurrent float64
|
||||
cpuMax float64
|
||||
cpuTop float64
|
||||
cpuAvg float64
|
||||
cpuAvgCounter uint64
|
||||
cpuLast float64
|
||||
@@ -88,6 +89,7 @@ type limiter struct {
|
||||
memory uint64
|
||||
memoryCurrent uint64
|
||||
memoryMax uint64
|
||||
memoryTop uint64
|
||||
memoryAvg float64
|
||||
memoryAvgCounter uint64
|
||||
memoryLast uint64
|
||||
@@ -152,12 +154,14 @@ func (l *limiter) reset() {
|
||||
l.cpuAvg = 0
|
||||
l.cpuAvgCounter = 0
|
||||
l.cpuMax = 0
|
||||
l.cpuTop = 0
|
||||
|
||||
l.memoryCurrent = 0
|
||||
l.memoryLast = 0
|
||||
l.memoryAvg = 0
|
||||
l.memoryAvgCounter = 0
|
||||
l.memoryMax = 0
|
||||
l.memoryTop = 0
|
||||
}
|
||||
|
||||
func (l *limiter) Start(process psutil.Process) error {
|
||||
@@ -232,6 +236,12 @@ func (l *limiter) collect(t time.Time) {
|
||||
l.memoryMax = l.memoryCurrent
|
||||
}
|
||||
|
||||
if l.memoryCurrent > l.memoryTop {
|
||||
l.memoryTop = l.memoryCurrent
|
||||
} else {
|
||||
l.memoryTop = uint64(float64(l.memoryTop) * 0.95)
|
||||
}
|
||||
|
||||
l.memoryAvgCounter++
|
||||
|
||||
l.memoryAvg = ((l.memoryAvg * float64(l.memoryAvgCounter-1)) + float64(l.memoryCurrent)) / float64(l.memoryAvgCounter)
|
||||
@@ -244,6 +254,12 @@ func (l *limiter) collect(t time.Time) {
|
||||
l.cpuMax = l.cpuCurrent
|
||||
}
|
||||
|
||||
if l.cpuCurrent > l.cpuTop {
|
||||
l.cpuTop = l.cpuCurrent
|
||||
} else {
|
||||
l.cpuTop = l.cpuTop * 0.95
|
||||
}
|
||||
|
||||
l.cpuAvgCounter++
|
||||
|
||||
l.cpuAvg = ((l.cpuAvg * float64(l.cpuAvgCounter-1)) + l.cpuCurrent) / float64(l.cpuAvgCounter)
|
||||
@@ -295,7 +311,9 @@ func (l *limiter) collect(t time.Time) {
|
||||
|
||||
l.logger.Debug().WithFields(log.Fields{
|
||||
"cur_cpu": l.cpuCurrent * l.ncpuFactor,
|
||||
"top_cpu": l.cpuTop * l.ncpuFactor,
|
||||
"cur_mem": l.memoryCurrent,
|
||||
"top_mem": l.memoryTop,
|
||||
"exceeded": isLimitExceeded,
|
||||
}).Log("Observation")
|
||||
|
||||
@@ -304,7 +322,7 @@ func (l *limiter) collect(t time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *limiter) Limit(enable bool) error {
|
||||
func (l *limiter) Limit(limit int) error {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
@@ -312,7 +330,7 @@ func (l *limiter) Limit(enable bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if enable {
|
||||
if limit > 0 {
|
||||
if l.enableLimit {
|
||||
return nil
|
||||
}
|
||||
@@ -344,6 +362,7 @@ func (l *limiter) Limit(enable bool) error {
|
||||
l.cancelLimit = nil
|
||||
|
||||
l.logger.Debug().Log("Limiter disabled")
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -384,6 +403,8 @@ func (l *limiter) limit(ctx context.Context, limit float64, interval time.Durati
|
||||
workingrate = math.Min(workingrate/pcpu*limit, 1)
|
||||
}
|
||||
|
||||
workingrate = limit
|
||||
|
||||
worktime := float64(interval.Nanoseconds()) * workingrate
|
||||
sleeptime := float64(interval.Nanoseconds()) - worktime
|
||||
|
@@ -46,7 +46,7 @@ type Process interface {
|
||||
// Limit enabled or disables CPU and memory limiting. CPU will be throttled
|
||||
// into the configured limit. If memory consumption is above the configured
|
||||
// limit, the process will be killed.
|
||||
Limit(enable bool) error
|
||||
Limit(limit int) error
|
||||
}
|
||||
|
||||
// Config is the configuration of a process
|
||||
@@ -459,7 +459,7 @@ func (p *process) IsRunning() bool {
|
||||
return p.isRunning()
|
||||
}
|
||||
|
||||
func (p *process) Limit(enable bool) error {
|
||||
func (p *process) Limit(limit int) error {
|
||||
if !p.isRunning() {
|
||||
return nil
|
||||
}
|
||||
@@ -468,9 +468,9 @@ func (p *process) Limit(enable bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.logger.Warn().WithField("limit", enable).Log("Limiter triggered")
|
||||
p.logger.Warn().WithField("limit", limit).Log("Limiter triggered")
|
||||
|
||||
return p.limits.Limit(enable)
|
||||
return p.limits.Limit(limit)
|
||||
}
|
||||
|
||||
// Start will start the process and sets the order to "start". If the
|
||||
|
@@ -15,7 +15,8 @@ type resources struct {
|
||||
maxCPU float64
|
||||
maxMemory uint64
|
||||
|
||||
limit chan bool
|
||||
limitCh chan int
|
||||
limitRate int
|
||||
isLimiting bool
|
||||
|
||||
cancelObserver context.CancelFunc
|
||||
@@ -31,7 +32,7 @@ type Resources interface {
|
||||
Start()
|
||||
Stop()
|
||||
|
||||
Limit() <-chan bool
|
||||
Limit() <-chan int
|
||||
|
||||
Request(cpu float64, memory uint64) error
|
||||
}
|
||||
@@ -82,7 +83,7 @@ func New(config Config) (Resources, error) {
|
||||
|
||||
func (r *resources) Start() {
|
||||
r.startOnce.Do(func() {
|
||||
r.limit = make(chan bool, 10)
|
||||
r.limitCh = make(chan int, 10)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r.cancelObserver = cancel
|
||||
@@ -105,8 +106,8 @@ func (r *resources) Stop() {
|
||||
})
|
||||
}
|
||||
|
||||
func (r *resources) Limit() <-chan bool {
|
||||
return r.limit
|
||||
func (r *resources) Limit() <-chan int {
|
||||
return r.limitCh
|
||||
}
|
||||
|
||||
func (r *resources) observe(ctx context.Context, interval time.Duration) {
|
||||
@@ -139,37 +140,52 @@ func (r *resources) observe(ctx context.Context, interval time.Duration) {
|
||||
"cur_memory": vmstat.Used,
|
||||
}).Log("Observation")
|
||||
|
||||
limit := false
|
||||
doLimit := false
|
||||
|
||||
if !r.isLimiting {
|
||||
if cpuload > r.maxCPU {
|
||||
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached")
|
||||
limit = true
|
||||
doLimit = true
|
||||
}
|
||||
|
||||
if vmstat.Used > r.maxMemory {
|
||||
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
|
||||
limit = true
|
||||
doLimit = true
|
||||
}
|
||||
} else {
|
||||
limit = true
|
||||
if cpuload <= r.maxCPU*0.8 {
|
||||
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released")
|
||||
limit = false
|
||||
}
|
||||
|
||||
if vmstat.Used <= uint64(float64(r.maxMemory)*0.8) {
|
||||
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
|
||||
limit = false
|
||||
doLimit = true
|
||||
if cpuload <= r.maxCPU && vmstat.Used <= r.maxMemory {
|
||||
doLimit = false
|
||||
}
|
||||
}
|
||||
|
||||
r.lock.Lock()
|
||||
if r.isLimiting != limit {
|
||||
r.logger.Debug().WithField("enabled", limit).Log("Limiting")
|
||||
r.isLimiting = limit
|
||||
if r.isLimiting != doLimit {
|
||||
if !r.isLimiting {
|
||||
r.limitRate = 100
|
||||
} else {
|
||||
if r.limitRate > 0 {
|
||||
r.limitRate -= 10
|
||||
doLimit = true
|
||||
|
||||
if r.limitRate == 0 {
|
||||
r.logger.Debug().WithFields(log.Fields{
|
||||
"cpu": cpuload,
|
||||
"memory": vmstat.Used,
|
||||
}).Log("CPU and memory limit released")
|
||||
doLimit = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.Debug().WithFields(log.Fields{
|
||||
"enabled": doLimit,
|
||||
"rate": r.limitRate,
|
||||
}).Log("Limiting")
|
||||
|
||||
r.isLimiting = doLimit
|
||||
select {
|
||||
case r.limit <- limit:
|
||||
case r.limitCh <- r.limitRate:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user