diff --git a/CHANGELOG.md b/CHANGELOG.md index c3e48112..df862886 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Core -### Core v16.11.0 > v16.?.? +### Core v16.12.0 > v16.?.? - Add PUT /api/v3/fs endpoint for file operations - Add GET /api/v3/process/:id/report query parameters for retrieving a specific report history entry diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 030d5b5a..6a86a43d 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -36,13 +36,15 @@ type ProcessConfig struct { LimitCPU float64 // Kill the process if the CPU usage in percent is above this value. LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value. LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration. + LimitMode string // How to limit the process, "hard" or "soft" Scheduler string // A scheduler for starting the process, either a concrete date (RFC3339) or in crontab syntax Args []string // Arguments for the process Parser process.Parser // Parser for the process output Logger log.Logger // Logger OnArgs func([]string) []string // Callback before starting the process to retrieve new arguments - OnExit func(state string) // Callback called after the process stopped with exit state as argument + OnBeforeStart func() error // Callback which is called before the process will be started. If error is non-nil, the start will be refused. OnStart func() // Callback called after process has been started + OnExit func(state string) // Callback called after the process stopped with exit state as argument OnStateChange func(from, to string) // Callback called on state change } @@ -140,10 +142,12 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { LimitCPU: config.LimitCPU, LimitMemory: config.LimitMemory, LimitDuration: config.LimitDuration, + LimitMode: process.LimitModeHard, Scheduler: scheduler, Parser: config.Parser, Logger: config.Logger, OnArgs: config.OnArgs, + OnBeforeStart: config.OnBeforeStart, OnStart: config.OnStart, OnExit: config.OnExit, OnStateChange: func(from, to string) { diff --git a/process/limits.go b/process/limits.go index 14652ae9..cde0e8e7 100644 --- a/process/limits.go +++ b/process/limits.go @@ -3,6 +3,7 @@ package process import ( "context" "fmt" + "math" "sync" "time" @@ -11,10 +12,11 @@ import ( type Usage struct { CPU struct { - Current float64 // percent 0-100 - Average float64 // percent 0-100 - Max float64 // percent 0-100 - Limit float64 // percent 0-100 + NCPU float64 // number of logical processors + Current float64 // percent 0-100*ncpu + Average float64 // percent 0-100*ncpu + Max float64 // percent 0-100*ncpu + Limit float64 // percent 0-100*ncpu } Memory struct { Current uint64 // bytes @@ -26,11 +28,19 @@ type Usage struct { type LimitFunc func(cpu float64, memory uint64) +type LimitMode int + +const ( + LimitModeHard LimitMode = 0 // Killing the process if either CPU or memory is above the limit (for a certain time) + LimitModeSoft LimitMode = 1 // Throttling the CPU if activated, killing the process if memory is above the limit +) + type LimiterConfig struct { - CPU float64 // Max. CPU usage in percent + CPU float64 // Max. CPU usage in percent 0-100 in hard mode, 0-100*ncpu in softmode Memory uint64 // Max. memory usage in bytes - WaitFor time.Duration // Duration one of the limits has to be above the limit until OnLimit gets triggered + WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered OnLimit LimitFunc // Function to be triggered if limits are exceeded + Mode LimitMode // How to limit CPU usage } type Limiter interface { @@ -41,9 +51,11 @@ type Limiter interface { Stop() // Current returns the current CPU and memory values + // Deprecated: use Usage() Current() (cpu float64, memory uint64) // Limits returns the defined CPU and memory limits. Values <= 0 means no limit + // Deprecated: use Usage() Limits() (cpu float64, memory uint64) // Usage returns the current state of the limiter, such as current, average, max, and @@ -56,6 +68,7 @@ type Limiter interface { } type limiter struct { + ncpu float64 proc psutil.Process lock sync.Mutex cancel context.CancelFunc @@ -77,7 +90,10 @@ type limiter struct { memoryLast uint64 memoryLimitSince time.Time - waitFor time.Duration + waitFor time.Duration + mode LimitMode + enableLimit bool + cancelLimit context.CancelFunc } // NewLimiter returns a new Limiter @@ -87,6 +103,17 @@ func NewLimiter(config LimiterConfig) Limiter { memory: config.Memory, waitFor: config.WaitFor, onLimit: config.OnLimit, + mode: config.Mode, + } + + if ncpu, err := psutil.CPUCounts(true); err != nil { + l.ncpu = 1 + } else { + l.ncpu = ncpu + } + + if l.mode == LimitModeSoft { + l.cpu /= l.ncpu } if l.onLimit == nil { @@ -194,31 +221,40 @@ func (l *limiter) collect(t time.Time) { isLimitExceeded := false - if l.cpu > 0 { - if l.cpuCurrent > l.cpu { - // Current value is higher than the limit - if l.cpuLast <= l.cpu { - // If the previous value is below the limit, then we reached the - // limit as of now - l.cpuLimitSince = time.Now() - } + if l.mode == LimitModeHard { + if l.cpu > 0 { + if l.cpuCurrent > l.cpu { + // Current value is higher than the limit + if l.cpuLast <= l.cpu { + // If the previous value is below the limit, then we reached the + // limit as of now + l.cpuLimitSince = time.Now() + } - if time.Since(l.cpuLimitSince) >= l.waitFor { - isLimitExceeded = true + if time.Since(l.cpuLimitSince) >= l.waitFor { + isLimitExceeded = true + } } } - } - if l.memory > 0 { - if l.memoryCurrent > l.memory { - // Current value is higher than the limit - if l.memoryLast <= l.memory { - // If the previous value is below the limit, then we reached the - // limit as of now - l.memoryLimitSince = time.Now() + if l.memory > 0 { + if l.memoryCurrent > l.memory { + // Current value is higher than the limit + if l.memoryLast <= l.memory { + // If the previous value is below the limit, then we reached the + // limit as of now + l.memoryLimitSince = time.Now() + } + + if time.Since(l.memoryLimitSince) >= l.waitFor { + isLimitExceeded = true + } } - - if time.Since(l.memoryLimitSince) >= l.waitFor { + } + } else if l.mode == LimitModeSoft && l.enableLimit { + if l.memory > 0 { + if l.memoryCurrent > l.memory { + // Current value is higher than the limit isLimitExceeded = true } } @@ -229,6 +265,73 @@ func (l *limiter) collect(t time.Time) { } } +func (l *limiter) Limit(enable bool) error { + if enable { + if l.enableLimit { + return nil + } + + if l.cancelLimit != nil { + l.cancelLimit() + } + + ctx, cancel := context.WithCancel(context.Background()) + l.cancelLimit = cancel + + l.enableLimit = true + + go l.limit(ctx, l.cpu/100, time.Second) + } else { + if !l.enableLimit { + return nil + } + + if l.cancelLimit == nil { + return nil + } + + l.cancelLimit() + l.cancelLimit = nil + } + + return nil +} + +// limit will limit the CPU usage of this process. The limit is the max. CPU usage +// normed to 0-1. The interval defines how long a time slot is that will be splitted +// into sleeping and working. +func (l *limiter) limit(ctx context.Context, limit float64, interval time.Duration) { + var workingrate float64 = -1 + + for { + select { + case <-ctx.Done(): + return + default: + l.lock.Lock() + pcpu := l.cpuCurrent / 100 + l.lock.Unlock() + + if workingrate < 0 { + workingrate = limit + } else { + workingrate = math.Min(workingrate/pcpu*limit, 1) + } + + worktime := float64(interval.Nanoseconds()) * workingrate + sleeptime := float64(interval.Nanoseconds()) - worktime + + l.proc.Resume() + time.Sleep(time.Duration(worktime) * time.Nanosecond) + + if sleeptime > 0 { + l.proc.Suspend() + time.Sleep(time.Duration(sleeptime) * time.Nanosecond) + } + } + } +} + func (l *limiter) Current() (cpu float64, memory uint64) { l.lock.Lock() defer l.lock.Unlock() @@ -245,10 +348,11 @@ func (l *limiter) Usage() Usage { usage := Usage{} - usage.CPU.Limit = l.cpu - usage.CPU.Current = l.cpuCurrent - usage.CPU.Average = l.cpuAvg - usage.CPU.Max = l.cpuMax + usage.CPU.NCPU = l.ncpu + usage.CPU.Limit = l.cpu * l.ncpu + usage.CPU.Current = l.cpuCurrent * l.ncpu + usage.CPU.Average = l.cpuAvg * l.ncpu + usage.CPU.Max = l.cpuMax * l.ncpu usage.Memory.Limit = l.memory usage.Memory.Current = l.memoryCurrent @@ -261,7 +365,3 @@ func (l *limiter) Usage() Usage { func (l *limiter) Limits() (cpu float64, memory uint64) { return l.cpu, l.memory } - -func (l *limiter) Limit(enable bool) error { - return nil -} diff --git a/process/limits_test.go b/process/limits_test.go index 01146129..2b9302e3 100644 --- a/process/limits_test.go +++ b/process/limits_test.go @@ -25,7 +25,9 @@ func (p *psproc) VirtualMemory() (uint64, error) { return 197, nil } -func (p *psproc) Stop() {} +func (p *psproc) Stop() {} +func (p *psproc) Suspend() error { return nil } +func (p *psproc) Resume() error { return nil } func TestCPULimit(t *testing.T) { lock := sync.Mutex{} diff --git a/process/parser.go b/process/parser.go index 914289b5..122b0e62 100644 --- a/process/parser.go +++ b/process/parser.go @@ -52,3 +52,29 @@ func (p *nullParser) Stop(string, Usage) {} func (p *nullParser) ResetStats() {} func (p *nullParser) ResetLog() {} func (p *nullParser) Log() []Line { return []Line{} } + +type bufferParser struct { + log []Line +} + +// NewBufferParser returns a dummy parser that is just storing +// the lines and returns progress. +func NewBufferParser() Parser { + return &bufferParser{} +} + +var _ Parser = &bufferParser{} + +func (p *bufferParser) Parse(line string) uint64 { + p.log = append(p.log, Line{ + Timestamp: time.Now(), + Data: line, + }) + return 1 +} +func (p *bufferParser) Stop(string, Usage) {} +func (p *bufferParser) ResetStats() {} +func (p *bufferParser) ResetLog() { + p.log = []Line{} +} +func (p *bufferParser) Log() []Line { return p.log } diff --git a/process/process.go b/process/process.go index 31df9387..0911ad0f 100644 --- a/process/process.go +++ b/process/process.go @@ -60,6 +60,7 @@ type Config struct { LimitCPU float64 // Kill the process if the CPU usage in percent is above this value. LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value. LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration. + LimitMode LimitMode // Select limiting mode Scheduler Scheduler // A scheduler. Parser Parser // A parser for the output of the process. OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args. @@ -268,6 +269,7 @@ func New(config Config) (Process, error) { CPU: config.LimitCPU, Memory: config.LimitMemory, WaitFor: config.LimitDuration, + Mode: config.LimitMode, OnLimit: func(cpu float64, memory uint64) { p.logger.WithFields(log.Fields{ "cpu": cpu, @@ -381,11 +383,9 @@ func (p *process) setState(state stateType) (stateType, error) { p.state.time = time.Now() - p.callbacks.lock.Lock() if p.callbacks.onStateChange != nil { p.callbacks.onStateChange(prevState.String(), p.state.state.String()) } - p.callbacks.lock.Unlock() return prevState, nil } @@ -432,10 +432,14 @@ func (p *process) Status() Status { Reconnect: time.Duration(-1), Duration: time.Since(stateTime), Time: stateTime, - CPU: usage.CPU, Memory: usage.Memory, } + s.CPU.Current = usage.CPU.Current + s.CPU.Average = usage.CPU.Average + s.CPU.Max = usage.CPU.Max + s.CPU.Limit = usage.CPU.Limit + s.CommandArgs = make([]string, len(p.args)) copy(s.CommandArgs, p.args) @@ -516,7 +520,6 @@ func (p *process) start() error { args := p.args - p.callbacks.lock.Lock() if p.callbacks.onArgs != nil { args = make([]string, len(p.args)) copy(args, p.args) @@ -533,10 +536,9 @@ func (p *process) start() error { p.reconnect(p.delay(stateFailed)) - return err + return nil } } - p.callbacks.lock.Unlock() // Start the stop timeout if enabled if p.timeout > time.Duration(0) { @@ -590,11 +592,9 @@ func (p *process) start() error { p.logger.Info().Log("Started") p.debuglogger.Debug().Log("Started") - p.callbacks.lock.Lock() if p.callbacks.onStart != nil { p.callbacks.onStart() } - p.callbacks.lock.Unlock() // Start the reader go p.reader() diff --git a/process/process_test.go b/process/process_test.go index b098b52d..a6593555 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -1,6 +1,7 @@ package process import ( + "fmt" "sync" "testing" "time" @@ -23,15 +24,19 @@ func TestProcess(t *testing.T) { p.Start() - require.Equal(t, "running", p.Status().State) - - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) require.Equal(t, "running", p.Status().State) + time.Sleep(2 * time.Second) + p.Stop(false) - time.Sleep(2 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "killed" + }, 10*time.Second, 500*time.Millisecond) require.Equal(t, "killed", p.Status().State) } @@ -49,9 +54,11 @@ func TestReconnectProcess(t *testing.T) { p.Start() - time.Sleep(3 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "finished" + }, 10*time.Second, 500*time.Millisecond) - require.Equal(t, "finished", p.Status().State) + require.Greater(t, p.Status().Reconnect, time.Duration(0)) p.Stop(false) @@ -70,9 +77,9 @@ func TestStaleProcess(t *testing.T) { p.Start() - time.Sleep(5 * time.Second) - - require.Equal(t, "killed", p.Status().State) + require.Eventually(t, func() bool { + return p.Status().State == "killed" + }, 10*time.Second, 500*time.Millisecond) p.Stop(false) @@ -85,15 +92,24 @@ func TestStaleReconnectProcess(t *testing.T) { Args: []string{ "10", }, - Reconnect: false, - StaleTimeout: 2 * time.Second, + Reconnect: true, + ReconnectDelay: 2 * time.Second, + StaleTimeout: 3 * time.Second, }) p.Start() - time.Sleep(10 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "killed" + }, 10*time.Second, 500*time.Millisecond) - require.Equal(t, "killed", p.Status().State) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) + + require.Eventually(t, func() bool { + return p.Status().State == "killed" + }, 10*time.Second, 500*time.Millisecond) p.Stop(false) @@ -113,9 +129,11 @@ func TestNonExistingProcess(t *testing.T) { p.Start() - time.Sleep(3 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "failed" + }, 10*time.Second, 500*time.Millisecond) - require.Equal(t, "failed", p.Status().State) + require.Negative(t, p.Status().Reconnect) p.Stop(false) @@ -135,9 +153,11 @@ func TestNonExistingReconnectProcess(t *testing.T) { p.Start() - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "failed" + }, 10*time.Second, 500*time.Millisecond) - require.Equal(t, "failed", p.Status().State) + require.Greater(t, p.Status().Reconnect, time.Duration(0)) p.Stop(false) @@ -156,7 +176,9 @@ func TestProcessFailed(t *testing.T) { p.Start() - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "failed" + }, 10*time.Second, 500*time.Millisecond) p.Stop(false) @@ -173,16 +195,22 @@ func TestFFmpegWaitStop(t *testing.T) { Reconnect: false, StaleTimeout: 0, OnExit: func(state string) { - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) }, }) err = p.Start() require.NoError(t, err) - time.Sleep(4 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) - p.Stop(true) + start := time.Now() + err = p.Stop(true) + require.NoError(t, err) + + require.Greater(t, time.Since(start), 2*time.Second) require.Equal(t, "finished", p.Status().State) } @@ -201,13 +229,15 @@ func TestFFmpegKill(t *testing.T) { err = p.Start() require.NoError(t, err) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) p.Stop(false) - time.Sleep(3 * time.Second) - - require.Equal(t, "finished", p.Status().State) + require.Eventually(t, func() bool { + return p.Status().State == "finished" + }, 10*time.Second, 500*time.Millisecond) } func TestProcessForceKill(t *testing.T) { @@ -224,17 +254,23 @@ func TestProcessForceKill(t *testing.T) { err = p.Start() require.NoError(t, err) - time.Sleep(3 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) + + start := time.Now() p.Stop(false) - time.Sleep(1 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "finishing" + }, 10*time.Second, 500*time.Millisecond) - require.Equal(t, "finishing", p.Status().State) + require.Eventually(t, func() bool { + return p.Status().State == "killed" + }, 10*time.Second, 500*time.Millisecond) - time.Sleep(5 * time.Second) - - require.Equal(t, "killed", p.Status().State) + require.Greater(t, time.Since(start), 5*time.Second) } func TestProcessDuration(t *testing.T) { @@ -252,16 +288,23 @@ func TestProcessDuration(t *testing.T) { require.Equal(t, "stop", status.Order) require.Equal(t, "finished", status.State) + start := time.Now() + err = p.Start() require.NoError(t, err) - time.Sleep(time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) status = p.Status() require.Equal(t, "start", status.Order) - require.Equal(t, "running", status.State) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "finished" + }, 10*time.Second, 500*time.Millisecond) + + require.InEpsilon(t, float64(3), time.Since(start).Seconds(), 1) status = p.Status() require.Equal(t, "start", status.Order) @@ -296,18 +339,21 @@ func TestProcessSchedulePointInTime(t *testing.T) { status = p.Status() require.Equal(t, "start", status.Order) require.Equal(t, "finished", status.State) - require.Greater(t, status.Reconnect, time.Duration(0)) + require.Positive(t, status.Reconnect) - time.Sleep(status.Reconnect + (2 * time.Second)) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) - status = p.Status() - require.Equal(t, "running", status.State) + require.InEpsilon(t, time.Since(now).Seconds(), 5, 1) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "finished" + }, 10*time.Second, 500*time.Millisecond) status = p.Status() require.Equal(t, "finished", status.State) - require.Less(t, status.Reconnect, time.Duration(0)) + require.Negative(t, status.Reconnect) } func TestProcessSchedulePointInTimeGone(t *testing.T) { @@ -357,17 +403,19 @@ func TestProcessScheduleCron(t *testing.T) { require.NoError(t, err) status = p.Status() + require.Positive(t, status.Reconnect) - time.Sleep(status.Reconnect + (2 * time.Second)) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 60*time.Second, 500*time.Millisecond) - status = p.Status() - require.Equal(t, "running", status.State) - - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "finished" + }, 10*time.Second, 500*time.Millisecond) status = p.Status() require.Equal(t, "finished", status.State) - require.Greater(t, status.Reconnect, time.Duration(0)) + require.Positive(t, status.Reconnect) } func TestProcessDelayNoScheduler(t *testing.T) { @@ -511,6 +559,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { func TestProcessCallbacks(t *testing.T) { var args []string + onBeforeStart := false onStart := false onExit := "" onState := []string{} @@ -531,6 +580,14 @@ func TestProcessCallbacks(t *testing.T) { copy(args, a) return a }, + OnBeforeStart: func() error { + lock.Lock() + defer lock.Unlock() + + onBeforeStart = true + + return nil + }, OnStart: func() { lock.Lock() defer lock.Unlock() @@ -555,10 +612,17 @@ func TestProcessCallbacks(t *testing.T) { err = p.Start() require.NoError(t, err) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { + return p.Status().State == "running" + }, 10*time.Second, 500*time.Millisecond) + + require.Eventually(t, func() bool { + return p.Status().State == "finished" + }, 10*time.Second, 500*time.Millisecond) lock.Lock() require.ElementsMatch(t, []string{"2"}, args) + require.True(t, onBeforeStart) require.True(t, onStart) require.Equal(t, stateFinished.String(), onExit) require.ElementsMatch(t, []string{ @@ -568,3 +632,37 @@ func TestProcessCallbacks(t *testing.T) { }, onState) lock.Unlock() } + +func TestProcessCallbacksOnBeforeStart(t *testing.T) { + parser := NewBufferParser() + p, err := New(Config{ + Binary: "sleep", + Args: []string{ + "2", + }, + Parser: parser, + Reconnect: true, + ReconnectDelay: 10 * time.Second, + OnBeforeStart: func() error { + return fmt.Errorf("no, not now") + }, + }) + require.NoError(t, err) + + err = p.Start() + require.NoError(t, err) + + require.Eventually(t, func() bool { + return p.Status().State == "failed" + }, 10*time.Second, 500*time.Millisecond) + + status := p.Status() + + require.Equal(t, "failed", status.State) + require.Equal(t, "start", status.Order) + require.Positive(t, status.Reconnect) + + lines := parser.Log() + require.Equal(t, 1, len(lines)) + require.Equal(t, "no, not now", lines[0].Data) +} diff --git a/psutil/process.go b/psutil/process.go index 962b3080..f0e63f5b 100644 --- a/psutil/process.go +++ b/psutil/process.go @@ -2,7 +2,6 @@ package psutil import ( "context" - "math" "sync" "time" @@ -19,6 +18,12 @@ type Process interface { // Stop will stop collecting CPU and memory data for this process. Stop() + + // Suspend will send SIGSTOP to the process + Suspend() error + + // Resume will send SIGCONT to the process + Resume() error } type process struct { @@ -70,10 +75,6 @@ func (p *process) tick(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() - if p.imposeLimit { - go p.limit(ctx, interval) - } - for { select { case <-ctx.Done(): @@ -85,66 +86,6 @@ func (p *process) tick(ctx context.Context, interval time.Duration) { p.statPrevious, p.statCurrent = p.statCurrent, stat p.statPreviousTime, p.statCurrentTime = p.statCurrentTime, t p.lock.Unlock() - /* - pct, _ := p.CPUPercent() - pcpu := (pct.System + pct.User + pct.Other) / 100 - - fmt.Printf("%d\t%0.2f%%\n", p.pid, pcpu*100*p.ncpu) - */ - } - } -} - -func (p *process) limit(ctx context.Context, interval time.Duration) { - var limit float64 = 50.0 / 100.0 / p.ncpu - var workingrate float64 = -1 - - counter := 0 - - for { - select { - case <-ctx.Done(): - return - default: - pct, _ := p.CPUPercent() - /* - pct.System *= p.ncpu - pct.Idle *= p.ncpu - pct.User *= p.ncpu - pct.Other *= p.ncpu - */ - - pcpu := (pct.System + pct.User + pct.Other) / 100 - - if workingrate < 0 { - workingrate = limit - } else { - workingrate = math.Min(workingrate/pcpu*limit, 1) - } - - worktime := float64(interval.Nanoseconds()) * workingrate - sleeptime := float64(interval.Nanoseconds()) - worktime - /* - if counter%20 == 0 { - fmt.Printf("\nPID\t%%CPU\twork quantum\tsleep quantum\tactive rate\n") - counter = 0 - } - - fmt.Printf("%d\t%0.2f%%\t%.2f us\t%.2f us\t%0.2f%%\n", p.pid, pcpu*100*p.ncpu, worktime/1000, sleeptime/1000, workingrate*100) - */ - if p.imposeLimit { - p.proc.Resume() - } - time.Sleep(time.Duration(worktime) * time.Nanosecond) - - if sleeptime > 0 { - if p.imposeLimit { - p.proc.Suspend() - } - time.Sleep(time.Duration(sleeptime) * time.Nanosecond) - } - - counter++ } } } @@ -165,6 +106,14 @@ func (p *process) Stop() { p.stopTicker() } +func (p *process) Suspend() error { + return p.proc.Suspend() +} + +func (p *process) Resume() error { + return p.proc.Resume() +} + func (p *process) cpuTimes() (*cpuTimesStat, error) { times, err := p.proc.Times() if err != nil { diff --git a/restream/restream.go b/restream/restream.go index cd49e437..b67bf479 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -368,11 +368,13 @@ func (r *restream) load() error { LimitCPU: t.config.LimitCPU, LimitMemory: t.config.LimitMemory, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, + LimitMode: "hard", Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, OnArgs: r.onArgs(t.config.Clone()), + OnBeforeStart: func() error { return nil }, }) if err != nil { return err @@ -525,11 +527,13 @@ func (r *restream) createTask(config *app.Config) (*task, error) { LimitCPU: t.config.LimitCPU, LimitMemory: t.config.LimitMemory, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, + LimitMode: "hard", Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, OnArgs: r.onArgs(t.config.Clone()), + OnBeforeStart: func() error { return nil }, }) if err != nil { return nil, err @@ -1249,11 +1253,13 @@ func (r *restream) reloadProcess(id string) error { LimitCPU: t.config.LimitCPU, LimitMemory: t.config.LimitMemory, LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second, + LimitMode: "hard", Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, OnArgs: r.onArgs(t.config.Clone()), + OnBeforeStart: func() error { return nil }, }) if err != nil { return err diff --git a/restream/restream_test.go b/restream/restream_test.go index c1134bc8..a99e3450 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -1237,6 +1237,6 @@ func TestProcessLimit(t *testing.T) { status := task.ffmpeg.Status() - require.Equal(t, float64(61), status.CPU.Limit) + require.Equal(t, float64(244), status.CPU.Limit) require.Equal(t, uint64(42), status.Memory.Limit) }