Add global limits in config, fix not using process limits

This commit is contained in:
Ingo Oppermann
2023-04-25 21:49:55 +02:00
parent 74f582e4b4
commit 9b17ab2b29
8 changed files with 144 additions and 36 deletions

View File

@@ -105,6 +105,7 @@ func (d *Config) Clone() *Config {
data.Sessions = d.Sessions
data.Service = d.Service
data.Router = d.Router
data.Resources = d.Resources
data.Log.Topics = copy.Slice(d.Log.Topics)
@@ -273,6 +274,10 @@ func (d *Config) init() {
d.vars.Register(value.NewStringList(&d.Router.BlockedPrefixes, []string{"/api"}, ","), "router.blocked_prefixes", "CORE_ROUTER_BLOCKED_PREFIXES", nil, "List of path prefixes that can't be routed", false, false)
d.vars.Register(value.NewStringMapString(&d.Router.Routes, nil), "router.routes", "CORE_ROUTER_ROUTES", nil, "List of route mappings", false, false)
d.vars.Register(value.NewDir(&d.Router.UIPath, "", d.fs), "router.ui_path", "CORE_ROUTER_UI_PATH", nil, "Path to a directory holding UI files mounted as /ui", false, false)
// Resources
d.vars.Register(value.NewFloat(&d.Resources.MaxCPUUsage, 0), "resources.max_cpu_usage", "CORE_RESOURCES_MAX_CPU_USAGE", nil, "Maximum system CPU usage in percent, from 0 (no limit) to 100*ncpu", false, false)
d.vars.Register(value.NewFloat(&d.Resources.MaxMemoryUsage, 0), "resources.max_memory_usage", "CORE_RESOURCES_MAX_MEMORY_USAGE", nil, "Maximum system usage in percent, from 0 (no limit) to 100", false, false)
}
// Validate validates the current state of the Config for completeness and sanity. Errors are

View File

@@ -139,7 +139,7 @@ type Data struct {
} `json:"playout"`
Debug struct {
Profiling bool `json:"profiling"`
ForceGC int `json:"force_gc" format:"int"`
ForceGC int `json:"force_gc" format:"int"` // deprecated, use MemoryLimit instead
MemoryLimit int64 `json:"memory_limit_mbytes" format:"int64"`
AutoMaxProcs bool `json:"auto_max_procs"`
} `json:"debug"`
@@ -168,6 +168,10 @@ type Data struct {
Routes map[string]string `json:"routes"`
UIPath string `json:"ui_path"`
} `json:"router"`
Resources struct {
MaxCPUUsage float64 `json:"max_cpu_usage"`
MaxMemoryUsage float64 `json:"max_memory_usage"`
} `json:"resources"`
}
func UpgradeV2ToV3(d *v2.Data, fs fs.Filesystem) (*Data, error) {

View File

@@ -279,3 +279,34 @@ func (u *Uint64) Validate() error {
func (u *Uint64) IsEmpty() bool {
return uint64(*u) == 0
}
// float64
type Float64 float64
func NewFloat(p *float64, val float64) *Float64 {
*p = val
return (*Float64)(p)
}
func (u *Float64) Set(val string) error {
v, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
*u = Float64(v)
return nil
}
func (u *Float64) String() string {
return strconv.FormatFloat(float64(*u), 'f', -1, 64)
}
func (u *Float64) Validate() error {
return nil
}
func (u *Float64) IsEmpty() bool {
return float64(*u) == 0
}

View File

@@ -145,3 +145,23 @@ func TestUint64Value(t *testing.T) {
require.Equal(t, uint64(77), x)
}
func TestFloat64Value(t *testing.T) {
var x float64
val := NewFloat(&x, 11.1)
require.Equal(t, "11.1", val.String())
require.Equal(t, nil, val.Validate())
require.Equal(t, false, val.IsEmpty())
x = 42.5
require.Equal(t, "42.5", val.String())
require.Equal(t, nil, val.Validate())
require.Equal(t, false, val.IsEmpty())
val.Set("77.7")
require.Equal(t, float64(77.7), x)
}

View File

@@ -29,18 +29,21 @@ type FFmpeg interface {
}
type ProcessConfig struct {
Reconnect bool
ReconnectDelay time.Duration
StaleTimeout time.Duration
Timeout time.Duration
Scheduler string
Args []string
Parser process.Parser
Logger log.Logger
OnArgs func([]string) []string
OnExit func(state string)
OnStart func()
OnStateChange func(from, to string)
Reconnect bool // Whether to reconnect
ReconnectDelay time.Duration // Duration until next reconnect
StaleTimeout time.Duration // Duration to wait until killing the process if there is no progress in the process
Timeout time.Duration // Duration to wait until killing the process
LimitCPU float64 // Kill the process if the CPU usage in percent is above this value.
LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value.
LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration.
Scheduler string // A scheduler for starting the process, either a concrete date (RFC3339) or in crontab syntax
Args []string // Arguments for the process
Parser process.Parser // Parser for the process output
Logger log.Logger // Logger
OnArgs func([]string) []string // Callback before starting the process to retrieve new arguments
OnExit func(state string) // Callback called after the process stopped with exit state as argument
OnStart func() // Callback called after process has been started
OnStateChange func(from, to string) // Callback called on state change
}
// Config is the configuration for ffmpeg that is part of the configuration
@@ -134,6 +137,9 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
ReconnectDelay: config.ReconnectDelay,
StaleTimeout: config.StaleTimeout,
Timeout: config.Timeout,
LimitCPU: config.LimitCPU,
LimitMemory: config.LimitMemory,
LimitDuration: config.LimitDuration,
Scheduler: scheduler,
Parser: config.Parser,
Logger: config.Logger,

View File

@@ -49,6 +49,10 @@ type Limiter interface {
// Usage returns the current state of the limiter, such as current, average, max, and
// limit values for CPU and memory.
Usage() Usage
// Limit enables or disables the throttling of the CPU or killing because of to much
// memory consumption.
Limit(enable bool) error
}
type limiter struct {
@@ -257,3 +261,7 @@ func (l *limiter) Usage() Usage {
func (l *limiter) Limits() (cpu float64, memory uint64) {
return l.cpu, l.memory
}
func (l *limiter) Limit(enable bool) error {
return nil
}

View File

@@ -42,25 +42,31 @@ type Process interface {
// IsRunning returns whether the process is currently
// running or not.
IsRunning() bool
// Limit enabled or disables CPU and memory limiting. CPU will be throttled
// into the configured limit. If memory consumption is above the configured
// limit, the process will be killed.
Limit(enable bool) error
}
// Config is the configuration of a process
type Config struct {
Binary string // Path to the ffmpeg binary
Args []string // List of arguments for the binary
Reconnect bool // Whether to restart the process if it exited
ReconnectDelay time.Duration // Duration to wait before restarting the process
StaleTimeout time.Duration // Kill the process after this duration if it doesn't produce any output
Timeout time.Duration // Kill the process after this duration
LimitCPU float64 // Kill the process if the CPU usage in percent is above this value
LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value
LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration
Scheduler Scheduler // A scheduler
Parser Parser // A parser for the output of the process
OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args
OnStart func() // A callback which is called after the process started
OnExit func(state string) // A callback which is called after the process exited with the exit state
OnStateChange func(from, to string) // A callback which is called after a state changed
Binary string // Path to the ffmpeg binary.
Args []string // List of arguments for the binary.
Reconnect bool // Whether to restart the process if it exited.
ReconnectDelay time.Duration // Duration to wait before restarting the process.
StaleTimeout time.Duration // Kill the process after this duration if it doesn't produce any output.
Timeout time.Duration // Kill the process after this duration.
LimitCPU float64 // Kill the process if the CPU usage in percent is above this value.
LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value.
LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration.
Scheduler Scheduler // A scheduler.
Parser Parser // A parser for the output of the process.
OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args.
OnBeforeStart func() error // A callback which is called before the process will be started. If error is non-nil, the start will be refused.
OnStart func() // A callback which is called after the process started.
OnExit func(state string) // A callback which is called after the process exited with the exit state.
OnStateChange func(from, to string) // A callback which is called after a state changed.
Logger log.Logger
}
@@ -74,16 +80,16 @@ type Status struct {
Time time.Time // Time is the time of the last change of the state
CommandArgs []string // Currently running command arguments
CPU struct {
Current float64
Average float64
Max float64
Limit float64
Current float64 // Currently consumed CPU in percent
Average float64 // Average consumed CPU in percent
Max float64 // Max. consumed CPU in percent
Limit float64 // Usage limit in percent
} // Used CPU in percent
Memory struct {
Current uint64
Average float64
Max uint64
Limit uint64
Current uint64 // Currently consumed memory in bytes
Average float64 // Average consumed memory in bytes
Max uint64 // Max. consumed memory in bytes
Limit uint64 // Usage limit in bytes
} // Used memory in bytes
}
@@ -196,6 +202,7 @@ type process struct {
debuglogger log.Logger
callbacks struct {
onArgs func(args []string) []string
onBeforeStart func() error
onStart func()
onExit func(state string)
onStateChange func(from, to string)
@@ -252,6 +259,7 @@ func New(config Config) (Process, error) {
p.stale.timeout = config.StaleTimeout
p.callbacks.onArgs = config.OnArgs
p.callbacks.onBeforeStart = config.OnBeforeStart
p.callbacks.onStart = config.OnStart
p.callbacks.onExit = config.OnExit
p.callbacks.onStateChange = config.OnStateChange
@@ -445,6 +453,10 @@ func (p *process) IsRunning() bool {
return p.isRunning()
}
func (p *process) Limit(enable bool) error {
return p.limits.Limit(enable)
}
// Start will start the process and sets the order to "start". If the
// process has alread the "start" order, nothing will be done. Returns
// an error if start failed.
@@ -511,6 +523,19 @@ func (p *process) start() error {
args = p.callbacks.onArgs(args)
}
if p.callbacks.onBeforeStart != nil {
if err := p.callbacks.onBeforeStart(); err != nil {
p.setState(stateFailed)
p.parser.Parse(err.Error())
p.logger.WithError(err).Error().Log("Starting failed")
p.reconnect(p.delay(stateFailed))
return err
}
}
p.callbacks.lock.Unlock()
// Start the stop timeout if enabled

View File

@@ -365,6 +365,9 @@ func (r *restream) load() error {
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
@@ -519,6 +522,9 @@ func (r *restream) createTask(config *app.Config) (*task, error) {
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,
@@ -1240,6 +1246,9 @@ func (r *restream) reloadProcess(id string) error {
ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second,
StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second,
Timeout: time.Duration(t.config.Timeout) * time.Second,
LimitCPU: t.config.LimitCPU,
LimitMemory: t.config.LimitMemory,
LimitDuration: time.Duration(t.config.LimitWaitFor) * time.Second,
Scheduler: t.config.Scheduler,
Args: t.command,
Parser: t.parser,