Allow hard and soft limiting a process

A hard limit will kill the process as soon as either CPU or memory
consumption are above a defined limit for a certain amount of time.

A soft limit will throttle the CPU usage if above a defined limit and
kill the process if memory consumption is above a defined limit. The
soft limit can be enabled/disabled on demand.

The default is hard limit.
This commit is contained in:
Ingo Oppermann
2023-04-26 16:01:50 +02:00
parent d73d915e89
commit d59158de03
10 changed files with 344 additions and 159 deletions

View File

@@ -1,6 +1,6 @@
# Core # Core
### Core v16.11.0 > v16.?.? ### Core v16.12.0 > v16.?.?
- Add PUT /api/v3/fs endpoint for file operations - Add PUT /api/v3/fs endpoint for file operations
- Add GET /api/v3/process/:id/report query parameters for retrieving a specific report history entry - Add GET /api/v3/process/:id/report query parameters for retrieving a specific report history entry

View File

@@ -36,13 +36,15 @@ type ProcessConfig struct {
LimitCPU float64 // Kill the process if the CPU usage in percent is above this value. LimitCPU float64 // Kill the process if the CPU usage in percent is above this value.
LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value. LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value.
LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration. LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration.
LimitMode string // How to limit the process, "hard" or "soft"
Scheduler string // A scheduler for starting the process, either a concrete date (RFC3339) or in crontab syntax Scheduler string // A scheduler for starting the process, either a concrete date (RFC3339) or in crontab syntax
Args []string // Arguments for the process Args []string // Arguments for the process
Parser process.Parser // Parser for the process output Parser process.Parser // Parser for the process output
Logger log.Logger // Logger Logger log.Logger // Logger
OnArgs func([]string) []string // Callback before starting the process to retrieve new arguments OnArgs func([]string) []string // Callback before starting the process to retrieve new arguments
OnExit func(state string) // Callback called after the process stopped with exit state as argument OnBeforeStart func() error // Callback which is called before the process will be started. If error is non-nil, the start will be refused.
OnStart func() // Callback called after process has been started OnStart func() // Callback called after process has been started
OnExit func(state string) // Callback called after the process stopped with exit state as argument
OnStateChange func(from, to string) // Callback called on state change OnStateChange func(from, to string) // Callback called on state change
} }
@@ -140,10 +142,12 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
LimitCPU: config.LimitCPU, LimitCPU: config.LimitCPU,
LimitMemory: config.LimitMemory, LimitMemory: config.LimitMemory,
LimitDuration: config.LimitDuration, LimitDuration: config.LimitDuration,
LimitMode: process.LimitModeHard,
Scheduler: scheduler, Scheduler: scheduler,
Parser: config.Parser, Parser: config.Parser,
Logger: config.Logger, Logger: config.Logger,
OnArgs: config.OnArgs, OnArgs: config.OnArgs,
OnBeforeStart: config.OnBeforeStart,
OnStart: config.OnStart, OnStart: config.OnStart,
OnExit: config.OnExit, OnExit: config.OnExit,
OnStateChange: func(from, to string) { OnStateChange: func(from, to string) {

View File

@@ -3,6 +3,7 @@ package process
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"sync" "sync"
"time" "time"
@@ -11,10 +12,11 @@ import (
type Usage struct { type Usage struct {
CPU struct { CPU struct {
Current float64 // percent 0-100 NCPU float64 // number of logical processors
Average float64 // percent 0-100 Current float64 // percent 0-100*ncpu
Max float64 // percent 0-100 Average float64 // percent 0-100*ncpu
Limit float64 // percent 0-100 Max float64 // percent 0-100*ncpu
Limit float64 // percent 0-100*ncpu
} }
Memory struct { Memory struct {
Current uint64 // bytes Current uint64 // bytes
@@ -26,11 +28,19 @@ type Usage struct {
type LimitFunc func(cpu float64, memory uint64) type LimitFunc func(cpu float64, memory uint64)
type LimitMode int
const (
LimitModeHard LimitMode = 0 // Killing the process if either CPU or memory is above the limit (for a certain time)
LimitModeSoft LimitMode = 1 // Throttling the CPU if activated, killing the process if memory is above the limit
)
type LimiterConfig struct { type LimiterConfig struct {
CPU float64 // Max. CPU usage in percent CPU float64 // Max. CPU usage in percent 0-100 in hard mode, 0-100*ncpu in softmode
Memory uint64 // Max. memory usage in bytes Memory uint64 // Max. memory usage in bytes
WaitFor time.Duration // Duration one of the limits has to be above the limit until OnLimit gets triggered WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered
OnLimit LimitFunc // Function to be triggered if limits are exceeded OnLimit LimitFunc // Function to be triggered if limits are exceeded
Mode LimitMode // How to limit CPU usage
} }
type Limiter interface { type Limiter interface {
@@ -41,9 +51,11 @@ type Limiter interface {
Stop() Stop()
// Current returns the current CPU and memory values // Current returns the current CPU and memory values
// Deprecated: use Usage()
Current() (cpu float64, memory uint64) Current() (cpu float64, memory uint64)
// Limits returns the defined CPU and memory limits. Values <= 0 means no limit // Limits returns the defined CPU and memory limits. Values <= 0 means no limit
// Deprecated: use Usage()
Limits() (cpu float64, memory uint64) Limits() (cpu float64, memory uint64)
// Usage returns the current state of the limiter, such as current, average, max, and // Usage returns the current state of the limiter, such as current, average, max, and
@@ -56,6 +68,7 @@ type Limiter interface {
} }
type limiter struct { type limiter struct {
ncpu float64
proc psutil.Process proc psutil.Process
lock sync.Mutex lock sync.Mutex
cancel context.CancelFunc cancel context.CancelFunc
@@ -77,7 +90,10 @@ type limiter struct {
memoryLast uint64 memoryLast uint64
memoryLimitSince time.Time memoryLimitSince time.Time
waitFor time.Duration waitFor time.Duration
mode LimitMode
enableLimit bool
cancelLimit context.CancelFunc
} }
// NewLimiter returns a new Limiter // NewLimiter returns a new Limiter
@@ -87,6 +103,17 @@ func NewLimiter(config LimiterConfig) Limiter {
memory: config.Memory, memory: config.Memory,
waitFor: config.WaitFor, waitFor: config.WaitFor,
onLimit: config.OnLimit, onLimit: config.OnLimit,
mode: config.Mode,
}
if ncpu, err := psutil.CPUCounts(true); err != nil {
l.ncpu = 1
} else {
l.ncpu = ncpu
}
if l.mode == LimitModeSoft {
l.cpu /= l.ncpu
} }
if l.onLimit == nil { if l.onLimit == nil {
@@ -194,31 +221,40 @@ func (l *limiter) collect(t time.Time) {
isLimitExceeded := false isLimitExceeded := false
if l.cpu > 0 { if l.mode == LimitModeHard {
if l.cpuCurrent > l.cpu { if l.cpu > 0 {
// Current value is higher than the limit if l.cpuCurrent > l.cpu {
if l.cpuLast <= l.cpu { // Current value is higher than the limit
// If the previous value is below the limit, then we reached the if l.cpuLast <= l.cpu {
// limit as of now // If the previous value is below the limit, then we reached the
l.cpuLimitSince = time.Now() // limit as of now
} l.cpuLimitSince = time.Now()
}
if time.Since(l.cpuLimitSince) >= l.waitFor { if time.Since(l.cpuLimitSince) >= l.waitFor {
isLimitExceeded = true isLimitExceeded = true
}
} }
} }
}
if l.memory > 0 { if l.memory > 0 {
if l.memoryCurrent > l.memory { if l.memoryCurrent > l.memory {
// Current value is higher than the limit // Current value is higher than the limit
if l.memoryLast <= l.memory { if l.memoryLast <= l.memory {
// If the previous value is below the limit, then we reached the // If the previous value is below the limit, then we reached the
// limit as of now // limit as of now
l.memoryLimitSince = time.Now() l.memoryLimitSince = time.Now()
}
if time.Since(l.memoryLimitSince) >= l.waitFor {
isLimitExceeded = true
}
} }
}
if time.Since(l.memoryLimitSince) >= l.waitFor { } else if l.mode == LimitModeSoft && l.enableLimit {
if l.memory > 0 {
if l.memoryCurrent > l.memory {
// Current value is higher than the limit
isLimitExceeded = true isLimitExceeded = true
} }
} }
@@ -229,6 +265,73 @@ func (l *limiter) collect(t time.Time) {
} }
} }
func (l *limiter) Limit(enable bool) error {
if enable {
if l.enableLimit {
return nil
}
if l.cancelLimit != nil {
l.cancelLimit()
}
ctx, cancel := context.WithCancel(context.Background())
l.cancelLimit = cancel
l.enableLimit = true
go l.limit(ctx, l.cpu/100, time.Second)
} else {
if !l.enableLimit {
return nil
}
if l.cancelLimit == nil {
return nil
}
l.cancelLimit()
l.cancelLimit = nil
}
return nil
}
// limit will limit the CPU usage of this process. The limit is the max. CPU usage
// normed to 0-1. The interval defines how long a time slot is that will be splitted
// into sleeping and working.
func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) {
var workingrate float64 = -1
for {
select {
case <-ctx.Done():
return
default:
l.lock.Lock()
pcpu := l.cpuCurrent / 100
l.lock.Unlock()
if workingrate < 0 {
workingrate = limit
} else {
workingrate = math.Min(workingrate/pcpu*limit, 1)
}
worktime := float64(interval.Nanoseconds()) * workingrate
sleeptime := float64(interval.Nanoseconds()) - worktime
l.proc.Resume()
time.Sleep(time.Duration(worktime) * time.Nanosecond)
if sleeptime > 0 {
l.proc.Suspend()
time.Sleep(time.Duration(sleeptime) * time.Nanosecond)
}
}
}
}
func (l *limiter) Current() (cpu float64, memory uint64) { func (l *limiter) Current() (cpu float64, memory uint64) {
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
@@ -245,10 +348,11 @@ func (l *limiter) Usage() Usage {
usage := Usage{} usage := Usage{}
usage.CPU.Limit = l.cpu usage.CPU.NCPU = l.ncpu
usage.CPU.Current = l.cpuCurrent usage.CPU.Limit = l.cpu * l.ncpu
usage.CPU.Average = l.cpuAvg usage.CPU.Current = l.cpuCurrent * l.ncpu
usage.CPU.Max = l.cpuMax usage.CPU.Average = l.cpuAvg * l.ncpu
usage.CPU.Max = l.cpuMax * l.ncpu
usage.Memory.Limit = l.memory usage.Memory.Limit = l.memory
usage.Memory.Current = l.memoryCurrent usage.Memory.Current = l.memoryCurrent
@@ -261,7 +365,3 @@ func (l *limiter) Usage() Usage {
func (l *limiter) Limits() (cpu float64, memory uint64) { func (l *limiter) Limits() (cpu float64, memory uint64) {
return l.cpu, l.memory return l.cpu, l.memory
} }
func (l *limiter) Limit(enable bool) error {
return nil
}

View File

@@ -25,7 +25,9 @@ func (p *psproc) VirtualMemory() (uint64, error) {
return 197, nil return 197, nil
} }
func (p *psproc) Stop() {} func (p *psproc) Stop() {}
func (p *psproc) Suspend() error { return nil }
func (p *psproc) Resume() error { return nil }
func TestCPULimit(t *testing.T) { func TestCPULimit(t *testing.T) {
lock := sync.Mutex{} lock := sync.Mutex{}

View File

@@ -52,3 +52,29 @@ func (p *nullParser) Stop(string, Usage) {}
func (p *nullParser) ResetStats() {} func (p *nullParser) ResetStats() {}
func (p *nullParser) ResetLog() {} func (p *nullParser) ResetLog() {}
func (p *nullParser) Log() []Line { return []Line{} } func (p *nullParser) Log() []Line { return []Line{} }
type bufferParser struct {
log []Line
}
// NewBufferParser returns a dummy parser that is just storing
// the lines and returns progress.
func NewBufferParser() Parser {
return &bufferParser{}
}
var _ Parser = &bufferParser{}
func (p *bufferParser) Parse(line string) uint64 {
p.log = append(p.log, Line{
Timestamp: time.Now(),
Data: line,
})
return 1
}
func (p *bufferParser) Stop(string, Usage) {}
func (p *bufferParser) ResetStats() {}
func (p *bufferParser) ResetLog() {
p.log = []Line{}
}
func (p *bufferParser) Log() []Line { return p.log }

View File

@@ -60,6 +60,7 @@ type Config struct {
LimitCPU float64 // Kill the process if the CPU usage in percent is above this value. LimitCPU float64 // Kill the process if the CPU usage in percent is above this value.
LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value. LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value.
LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration. LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration.
LimitMode LimitMode // Select limiting mode
Scheduler Scheduler // A scheduler. Scheduler Scheduler // A scheduler.
Parser Parser // A parser for the output of the process. Parser Parser // A parser for the output of the process.
OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args. OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args.
@@ -268,6 +269,7 @@ func New(config Config) (Process, error) {
CPU: config.LimitCPU, CPU: config.LimitCPU,
Memory: config.LimitMemory, Memory: config.LimitMemory,
WaitFor: config.LimitDuration, WaitFor: config.LimitDuration,
Mode: config.LimitMode,
OnLimit: func(cpu float64, memory uint64) { OnLimit: func(cpu float64, memory uint64) {
p.logger.WithFields(log.Fields{ p.logger.WithFields(log.Fields{
"cpu": cpu, "cpu": cpu,
@@ -381,11 +383,9 @@ func (p *process) setState(state stateType) (stateType, error) {
p.state.time = time.Now() p.state.time = time.Now()
p.callbacks.lock.Lock()
if p.callbacks.onStateChange != nil { if p.callbacks.onStateChange != nil {
p.callbacks.onStateChange(prevState.String(), p.state.state.String()) p.callbacks.onStateChange(prevState.String(), p.state.state.String())
} }
p.callbacks.lock.Unlock()
return prevState, nil return prevState, nil
} }
@@ -432,10 +432,14 @@ func (p *process) Status() Status {
Reconnect: time.Duration(-1), Reconnect: time.Duration(-1),
Duration: time.Since(stateTime), Duration: time.Since(stateTime),
Time: stateTime, Time: stateTime,
CPU: usage.CPU,
Memory: usage.Memory, Memory: usage.Memory,
} }
s.CPU.Current = usage.CPU.Current
s.CPU.Average = usage.CPU.Average
s.CPU.Max = usage.CPU.Max
s.CPU.Limit = usage.CPU.Limit
s.CommandArgs = make([]string, len(p.args)) s.CommandArgs = make([]string, len(p.args))
copy(s.CommandArgs, p.args) copy(s.CommandArgs, p.args)
@@ -516,7 +520,6 @@ func (p *process) start() error {
args := p.args args := p.args
p.callbacks.lock.Lock()
if p.callbacks.onArgs != nil { if p.callbacks.onArgs != nil {
args = make([]string, len(p.args)) args = make([]string, len(p.args))
copy(args, p.args) copy(args, p.args)
@@ -533,10 +536,9 @@ func (p *process) start() error {
p.reconnect(p.delay(stateFailed)) p.reconnect(p.delay(stateFailed))
return err return nil
} }
} }
p.callbacks.lock.Unlock()
// Start the stop timeout if enabled // Start the stop timeout if enabled
if p.timeout > time.Duration(0) { if p.timeout > time.Duration(0) {
@@ -590,11 +592,9 @@ func (p *process) start() error {
p.logger.Info().Log("Started") p.logger.Info().Log("Started")
p.debuglogger.Debug().Log("Started") p.debuglogger.Debug().Log("Started")
p.callbacks.lock.Lock()
if p.callbacks.onStart != nil { if p.callbacks.onStart != nil {
p.callbacks.onStart() p.callbacks.onStart()
} }
p.callbacks.lock.Unlock()
// Start the reader // Start the reader
go p.reader() go p.reader()

View File

@@ -1,6 +1,7 @@
package process package process
import ( import (
"fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
@@ -23,15 +24,19 @@ func TestProcess(t *testing.T) {
p.Start() p.Start()
require.Equal(t, "running", p.Status().State) require.Eventually(t, func() bool {
return p.Status().State == "running"
time.Sleep(5 * time.Second) }, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "running", p.Status().State) require.Equal(t, "running", p.Status().State)
time.Sleep(2 * time.Second)
p.Stop(false) p.Stop(false)
time.Sleep(2 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "killed"
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "killed", p.Status().State) require.Equal(t, "killed", p.Status().State)
} }
@@ -49,9 +54,11 @@ func TestReconnectProcess(t *testing.T) {
p.Start() p.Start()
time.Sleep(3 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "finished"
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "finished", p.Status().State) require.Greater(t, p.Status().Reconnect, time.Duration(0))
p.Stop(false) p.Stop(false)
@@ -70,9 +77,9 @@ func TestStaleProcess(t *testing.T) {
p.Start() p.Start()
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "killed"
require.Equal(t, "killed", p.Status().State) }, 10*time.Second, 500*time.Millisecond)
p.Stop(false) p.Stop(false)
@@ -85,15 +92,24 @@ func TestStaleReconnectProcess(t *testing.T) {
Args: []string{ Args: []string{
"10", "10",
}, },
Reconnect: false, Reconnect: true,
StaleTimeout: 2 * time.Second, ReconnectDelay: 2 * time.Second,
StaleTimeout: 3 * time.Second,
}) })
p.Start() p.Start()
time.Sleep(10 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "killed"
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "killed", p.Status().State) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
require.Eventually(t, func() bool {
return p.Status().State == "killed"
}, 10*time.Second, 500*time.Millisecond)
p.Stop(false) p.Stop(false)
@@ -113,9 +129,11 @@ func TestNonExistingProcess(t *testing.T) {
p.Start() p.Start()
time.Sleep(3 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "failed"
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "failed", p.Status().State) require.Negative(t, p.Status().Reconnect)
p.Stop(false) p.Stop(false)
@@ -135,9 +153,11 @@ func TestNonExistingReconnectProcess(t *testing.T) {
p.Start() p.Start()
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "failed"
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "failed", p.Status().State) require.Greater(t, p.Status().Reconnect, time.Duration(0))
p.Stop(false) p.Stop(false)
@@ -156,7 +176,9 @@ func TestProcessFailed(t *testing.T) {
p.Start() p.Start()
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "failed"
}, 10*time.Second, 500*time.Millisecond)
p.Stop(false) p.Stop(false)
@@ -173,16 +195,22 @@ func TestFFmpegWaitStop(t *testing.T) {
Reconnect: false, Reconnect: false,
StaleTimeout: 0, StaleTimeout: 0,
OnExit: func(state string) { OnExit: func(state string) {
time.Sleep(2 * time.Second) time.Sleep(3 * time.Second)
}, },
}) })
err = p.Start() err = p.Start()
require.NoError(t, err) require.NoError(t, err)
time.Sleep(4 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
p.Stop(true) start := time.Now()
err = p.Stop(true)
require.NoError(t, err)
require.Greater(t, time.Since(start), 2*time.Second)
require.Equal(t, "finished", p.Status().State) require.Equal(t, "finished", p.Status().State)
} }
@@ -201,13 +229,15 @@ func TestFFmpegKill(t *testing.T) {
err = p.Start() err = p.Start()
require.NoError(t, err) require.NoError(t, err)
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
p.Stop(false) p.Stop(false)
time.Sleep(3 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "finished"
require.Equal(t, "finished", p.Status().State) }, 10*time.Second, 500*time.Millisecond)
} }
func TestProcessForceKill(t *testing.T) { func TestProcessForceKill(t *testing.T) {
@@ -224,17 +254,23 @@ func TestProcessForceKill(t *testing.T) {
err = p.Start() err = p.Start()
require.NoError(t, err) require.NoError(t, err)
time.Sleep(3 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
start := time.Now()
p.Stop(false) p.Stop(false)
time.Sleep(1 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "finishing"
}, 10*time.Second, 500*time.Millisecond)
require.Equal(t, "finishing", p.Status().State) require.Eventually(t, func() bool {
return p.Status().State == "killed"
}, 10*time.Second, 500*time.Millisecond)
time.Sleep(5 * time.Second) require.Greater(t, time.Since(start), 5*time.Second)
require.Equal(t, "killed", p.Status().State)
} }
func TestProcessDuration(t *testing.T) { func TestProcessDuration(t *testing.T) {
@@ -252,16 +288,23 @@ func TestProcessDuration(t *testing.T) {
require.Equal(t, "stop", status.Order) require.Equal(t, "stop", status.Order)
require.Equal(t, "finished", status.State) require.Equal(t, "finished", status.State)
start := time.Now()
err = p.Start() err = p.Start()
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Second) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
status = p.Status() status = p.Status()
require.Equal(t, "start", status.Order) require.Equal(t, "start", status.Order)
require.Equal(t, "running", status.State)
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "finished"
}, 10*time.Second, 500*time.Millisecond)
require.InEpsilon(t, float64(3), time.Since(start).Seconds(), 1)
status = p.Status() status = p.Status()
require.Equal(t, "start", status.Order) require.Equal(t, "start", status.Order)
@@ -296,18 +339,21 @@ func TestProcessSchedulePointInTime(t *testing.T) {
status = p.Status() status = p.Status()
require.Equal(t, "start", status.Order) require.Equal(t, "start", status.Order)
require.Equal(t, "finished", status.State) require.Equal(t, "finished", status.State)
require.Greater(t, status.Reconnect, time.Duration(0)) require.Positive(t, status.Reconnect)
time.Sleep(status.Reconnect + (2 * time.Second)) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
status = p.Status() require.InEpsilon(t, time.Since(now).Seconds(), 5, 1)
require.Equal(t, "running", status.State)
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "finished"
}, 10*time.Second, 500*time.Millisecond)
status = p.Status() status = p.Status()
require.Equal(t, "finished", status.State) require.Equal(t, "finished", status.State)
require.Less(t, status.Reconnect, time.Duration(0)) require.Negative(t, status.Reconnect)
} }
func TestProcessSchedulePointInTimeGone(t *testing.T) { func TestProcessSchedulePointInTimeGone(t *testing.T) {
@@ -357,17 +403,19 @@ func TestProcessScheduleCron(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
status = p.Status() status = p.Status()
require.Positive(t, status.Reconnect)
time.Sleep(status.Reconnect + (2 * time.Second)) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 60*time.Second, 500*time.Millisecond)
status = p.Status() require.Eventually(t, func() bool {
require.Equal(t, "running", status.State) return p.Status().State == "finished"
}, 10*time.Second, 500*time.Millisecond)
time.Sleep(5 * time.Second)
status = p.Status() status = p.Status()
require.Equal(t, "finished", status.State) require.Equal(t, "finished", status.State)
require.Greater(t, status.Reconnect, time.Duration(0)) require.Positive(t, status.Reconnect)
} }
func TestProcessDelayNoScheduler(t *testing.T) { func TestProcessDelayNoScheduler(t *testing.T) {
@@ -511,6 +559,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) {
func TestProcessCallbacks(t *testing.T) { func TestProcessCallbacks(t *testing.T) {
var args []string var args []string
onBeforeStart := false
onStart := false onStart := false
onExit := "" onExit := ""
onState := []string{} onState := []string{}
@@ -531,6 +580,14 @@ func TestProcessCallbacks(t *testing.T) {
copy(args, a) copy(args, a)
return a return a
}, },
OnBeforeStart: func() error {
lock.Lock()
defer lock.Unlock()
onBeforeStart = true
return nil
},
OnStart: func() { OnStart: func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
@@ -555,10 +612,17 @@ func TestProcessCallbacks(t *testing.T) {
err = p.Start() err = p.Start()
require.NoError(t, err) require.NoError(t, err)
time.Sleep(5 * time.Second) require.Eventually(t, func() bool {
return p.Status().State == "running"
}, 10*time.Second, 500*time.Millisecond)
require.Eventually(t, func() bool {
return p.Status().State == "finished"
}, 10*time.Second, 500*time.Millisecond)
lock.Lock() lock.Lock()
require.ElementsMatch(t, []string{"2"}, args) require.ElementsMatch(t, []string{"2"}, args)
require.True(t, onBeforeStart)
require.True(t, onStart) require.True(t, onStart)
require.Equal(t, stateFinished.String(), onExit) require.Equal(t, stateFinished.String(), onExit)
require.ElementsMatch(t, []string{ require.ElementsMatch(t, []string{
@@ -568,3 +632,37 @@ func TestProcessCallbacks(t *testing.T) {
}, onState) }, onState)
lock.Unlock() lock.Unlock()
} }
func TestProcessCallbacksOnBeforeStart(t *testing.T) {
parser := NewBufferParser()
p, err := New(Config{
Binary: "sleep",
Args: []string{
"2",
},
Parser: parser,
Reconnect: true,
ReconnectDelay: 10 * time.Second,
OnBeforeStart: func() error {
return fmt.Errorf("no, not now")
},
})
require.NoError(t, err)
err = p.Start()
require.NoError(t, err)
require.Eventually(t, func() bool {
return p.Status().State == "failed"
}, 10*time.Second, 500*time.Millisecond)
status := p.Status()
require.Equal(t, "failed", status.State)
require.Equal(t, "start", status.Order)
require.Positive(t, status.Reconnect)
lines := parser.Log()
require.Equal(t, 1, len(lines))
require.Equal(t, "no, not now", lines[0].Data)
}

View File

@@ -2,7 +2,6 @@ package psutil
import ( import (
"context" "context"
"math"
"sync" "sync"
"time" "time"
@@ -19,6 +18,12 @@ type Process interface {
// Stop will stop collecting CPU and memory data for this process. // Stop will stop collecting CPU and memory data for this process.
Stop() Stop()
// Suspend will send SIGSTOP to the process
Suspend() error
// Resume will send SIGCONT to the process
Resume() error
} }
type process struct { type process struct {
@@ -70,10 +75,6 @@ func (p *process) tick(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
if p.imposeLimit {
go p.limit(ctx, interval)
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -85,66 +86,6 @@ func (p *process) tick(ctx context.Context, interval time.Duration) {
p.statPrevious, p.statCurrent = p.statCurrent, stat p.statPrevious, p.statCurrent = p.statCurrent, stat
p.statPreviousTime, p.statCurrentTime = p.statCurrentTime, t p.statPreviousTime, p.statCurrentTime = p.statCurrentTime, t
p.lock.Unlock() p.lock.Unlock()
/*
pct, _ := p.CPUPercent()
pcpu := (pct.System + pct.User + pct.Other) / 100
fmt.Printf("%d\t%0.2f%%\n", p.pid, pcpu*100*p.ncpu)
*/
}
}
}
func (p *process) limit(ctx context.Context, interval time.Duration) {
var limit float64 = 50.0 / 100.0 / p.ncpu
var workingrate float64 = -1
counter := 0
for {
select {
case <-ctx.Done():
return
default:
pct, _ := p.CPUPercent()
/*
pct.System *= p.ncpu
pct.Idle *= p.ncpu
pct.User *= p.ncpu
pct.Other *= p.ncpu
*/
pcpu := (pct.System + pct.User + pct.Other) / 100
if workingrate < 0 {
workingrate = limit
} else {
workingrate = math.Min(workingrate/pcpu*limit, 1)
}
worktime := float64(interval.Nanoseconds()) * workingrate
sleeptime := float64(interval.Nanoseconds()) - worktime
/*
if counter%20 == 0 {
fmt.Printf("\nPID\t%%CPU\twork quantum\tsleep quantum\tactive rate\n")
counter = 0
}
fmt.Printf("%d\t%0.2f%%\t%.2f us\t%.2f us\t%0.2f%%\n", p.pid, pcpu*100*p.ncpu, worktime/1000, sleeptime/1000, workingrate*100)
*/
if p.imposeLimit {
p.proc.Resume()
}
time.Sleep(time.Duration(worktime) * time.Nanosecond)
if sleeptime > 0 {
if p.imposeLimit {
p.proc.Suspend()
}
time.Sleep(time.Duration(sleeptime) * time.Nanosecond)
}
counter++
} }
} }
} }
@@ -165,6 +106,14 @@ func (p *process) Stop() {
p.stopTicker() p.stopTicker()
} }
func (p *process) Suspend() error {
return p.proc.Suspend()
}
func (p *process) Resume() error {
return p.proc.Resume()
}
func (p *process) cpuTimes() (*cpuTimesStat, error) { func (p *process) cpuTimes() (*cpuTimesStat, error) {
times, err := p.proc.Times() times, err := p.proc.Times()
if err != nil { if err != nil {

View File

@@ -368,11 +368,13 @@ func (r *restream) load() error {
LimitCPU: t.config.LimitCPU, LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory, LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: "hard",
Scheduler: t.config.Scheduler, Scheduler: t.config.Scheduler,
Args: t.command, Args: t.command,
Parser: t.parser, Parser: t.parser,
Logger: t.logger, Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()), OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error { return nil },
}) })
if err != nil { if err != nil {
return err return err
@@ -525,11 +527,13 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
LimitCPU: t.config.LimitCPU, LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory, LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: "hard",
Scheduler: t.config.Scheduler, Scheduler: t.config.Scheduler,
Args: t.command, Args: t.command,
Parser: t.parser, Parser: t.parser,
Logger: t.logger, Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()), OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error { return nil },
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -1249,11 +1253,13 @@ func (r *restream) reloadProcess(id string) error {
LimitCPU: t.config.LimitCPU, LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory, LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
LimitMode: "hard",
Scheduler: t.config.Scheduler, Scheduler: t.config.Scheduler,
Args: t.command, Args: t.command,
Parser: t.parser, Parser: t.parser,
Logger: t.logger, Logger: t.logger,
OnArgs: r.onArgs(t.config.Clone()), OnArgs: r.onArgs(t.config.Clone()),
OnBeforeStart: func() error { return nil },
}) })
if err != nil { if err != nil {
return err return err

View File

@@ -1237,6 +1237,6 @@ func TestProcessLimit(t *testing.T) {
status := task.ffmpeg.Status() status := task.ffmpeg.Status()
require.Equal(t, float64(61), status.CPU.Limit) require.Equal(t, float64(244), status.CPU.Limit)
require.Equal(t, uint64(42), status.Memory.Limit) require.Equal(t, uint64(42), status.Memory.Limit)
} }