Make resources the only direct user of psutil

This commit is contained in:
Ingo Oppermann
2024-10-29 12:25:39 +01:00
parent fbf62bf7e5
commit 2ee7fa7e41
10 changed files with 279 additions and 182 deletions

View File

@@ -38,6 +38,7 @@ import (
"github.com/datarhei/core/v16/prometheus" "github.com/datarhei/core/v16/prometheus"
"github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources/psutil"
"github.com/datarhei/core/v16/resources/psutil/gpu/nvidia"
"github.com/datarhei/core/v16/restream" "github.com/datarhei/core/v16/restream"
restreamapp "github.com/datarhei/core/v16/restream/app" restreamapp "github.com/datarhei/core/v16/restream/app"
"github.com/datarhei/core/v16/restream/replace" "github.com/datarhei/core/v16/restream/replace"
@@ -368,7 +369,7 @@ func (a *api) start(ctx context.Context) error {
debug.SetMemoryLimit(math.MaxInt64) debug.SetMemoryLimit(math.MaxInt64)
} }
psutil, err := psutil.New("", nil) psutil, err := psutil.New("", nvidia.New(""))
if err != nil { if err != nil {
return fmt.Errorf("failed to initialize psutils: %w", err) return fmt.Errorf("failed to initialize psutils: %w", err)
} }
@@ -511,7 +512,7 @@ func (a *api) start(ctx context.Context) error {
ValidatorOutput: validatorOut, ValidatorOutput: validatorOut,
Portrange: portrange, Portrange: portrange,
Collector: a.sessions.Collector("ffmpeg"), Collector: a.sessions.Collector("ffmpeg"),
PSUtil: psutil, Resource: a.resources,
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to create ffmpeg: %w", err) return fmt.Errorf("unable to create ffmpeg: %w", err)

View File

@@ -12,7 +12,7 @@ import (
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/process"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/session" "github.com/datarhei/core/v16/session"
) )
@@ -64,7 +64,7 @@ type Config struct {
ValidatorOutput Validator ValidatorOutput Validator
Portrange net.Portranger Portrange net.Portranger
Collector session.Collector Collector session.Collector
PSUtil psutil.Util Resource resources.Resources
} }
type ffmpeg struct { type ffmpeg struct {
@@ -83,17 +83,17 @@ type ffmpeg struct {
states process.States states process.States
statesLock sync.RWMutex statesLock sync.RWMutex
psutil psutil.Util resources resources.Resources
} }
func New(config Config) (FFmpeg, error) { func New(config Config) (FFmpeg, error) {
f := &ffmpeg{} f := &ffmpeg{}
if config.PSUtil == nil { if config.Resource == nil {
return nil, fmt.Errorf("psutils required") return nil, fmt.Errorf("resources are required")
} }
f.psutil = config.PSUtil f.resources = config.Resource
binary, err := exec.LookPath(config.Binary) binary, err := exec.LookPath(config.Binary)
if err != nil { if err != nil {
@@ -194,7 +194,7 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) {
config.OnStateChange(from, to) config.OnStateChange(from, to)
} }
}, },
PSUtil: f.psutil, Resources: f.resources,
}) })
return ffmpeg, err return ffmpeg, err

View File

@@ -17,6 +17,7 @@ import (
"github.com/datarhei/core/v16/http/validator" "github.com/datarhei/core/v16/http/validator"
"github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/io/fs"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources/psutil"
"github.com/datarhei/core/v16/restream" "github.com/datarhei/core/v16/restream"
jsonstore "github.com/datarhei/core/v16/restream/store/json" jsonstore "github.com/datarhei/core/v16/restream/store/json"
@@ -50,11 +51,18 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) {
return nil, err return nil, err
} }
resources, err := resources.New(resources.Config{
PSUtil: psutil,
})
if err != nil {
return nil, err
}
ffmpeg, err := ffmpeg.New(ffmpeg.Config{ ffmpeg, err := ffmpeg.New(ffmpeg.Config{
Binary: binary, Binary: binary,
MaxLogLines: 100, MaxLogLines: 100,
LogHistoryLength: 3, LogHistoryLength: 3,
PSUtil: psutil, Resource: resources,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -7,7 +7,7 @@ import (
"time" "time"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources"
) )
type Usage struct { type Usage struct {
@@ -85,13 +85,13 @@ type LimiterConfig struct {
WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered. WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered.
OnLimit LimitFunc // Function to be triggered if limits are exceeded. OnLimit LimitFunc // Function to be triggered if limits are exceeded.
Mode LimitMode // How to limit CPU usage. Mode LimitMode // How to limit CPU usage.
PSUtil psutil.Util NCPU float64 // Number of available CPU
Logger log.Logger Logger log.Logger
} }
type Limiter interface { type Limiter interface {
// Start starts the limiter with a psutil.Process. // Start starts the limiter with a psutil.Process.
Start(process psutil.Process) error Start(process resources.Process) error
// Stop stops the limiter. The limiter can be reused by calling Start() again // Stop stops the limiter. The limiter can be reused by calling Start() again
Stop() Stop()
@@ -226,11 +226,9 @@ func (x *metric[T]) IsExceeded(waitFor time.Duration, mode LimitMode) bool {
} }
type limiter struct { type limiter struct {
psutil psutil.Util
ncpu float64 ncpu float64
ncpuFactor float64 ncpuFactor float64
proc psutil.Process proc resources.Process
lock sync.RWMutex lock sync.RWMutex
cancel context.CancelFunc cancel context.CancelFunc
onLimit LimitFunc onLimit LimitFunc
@@ -259,13 +257,17 @@ type limiter struct {
// NewLimiter returns a new Limiter // NewLimiter returns a new Limiter
func NewLimiter(config LimiterConfig) (Limiter, error) { func NewLimiter(config LimiterConfig) (Limiter, error) {
l := &limiter{ l := &limiter{
ncpu: config.NCPU,
waitFor: config.WaitFor, waitFor: config.WaitFor,
onLimit: config.OnLimit, onLimit: config.OnLimit,
mode: config.Mode, mode: config.Mode,
psutil: config.PSUtil,
logger: config.Logger, logger: config.Logger,
} }
if l.ncpu <= 0 {
l.ncpu = 1
}
l.cpu.SetLimit(config.CPU / 100) l.cpu.SetLimit(config.CPU / 100)
l.memory.SetLimit(config.Memory) l.memory.SetLimit(config.Memory)
l.gpu.memory.SetLimit(config.GPUMemory) l.gpu.memory.SetLimit(config.GPUMemory)
@@ -277,16 +279,6 @@ func NewLimiter(config LimiterConfig) (Limiter, error) {
l.logger = log.New("") l.logger = log.New("")
} }
if l.psutil == nil {
return nil, fmt.Errorf("no psutil provided")
}
if ncpu, err := l.psutil.CPUCounts(); err != nil {
l.ncpu = 1
} else {
l.ncpu = ncpu
}
l.lastUsage.CPU.NCPU = l.ncpu l.lastUsage.CPU.NCPU = l.ncpu
l.lastUsage.CPU.Limit = l.cpu.Limit() * 100 * l.ncpu l.lastUsage.CPU.Limit = l.cpu.Limit() * 100 * l.ncpu
l.lastUsage.Memory.Limit = l.memory.Limit() l.lastUsage.Memory.Limit = l.memory.Limit()
@@ -333,7 +325,7 @@ func (l *limiter) reset() {
l.gpu.decoder.Reset() l.gpu.decoder.Reset()
} }
func (l *limiter) Start(process psutil.Process) error { func (l *limiter) Start(process resources.Process) error {
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
@@ -396,28 +388,24 @@ func (l *limiter) collect() {
return return
} }
mstat, merr := proc.Memory() pinfo, err := proc.Info()
cpustat, cerr := proc.CPU()
gstat, gerr := proc.GPU() //mstat, merr := proc.Memory()
//cpustat, cerr := proc.CPU()
//gstat, gerr := proc.GPU()
gindex := -1 gindex := -1
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
if merr == nil { if err == nil {
l.memory.Update(mstat) l.memory.Update(pinfo.Memory)
} l.cpu.Update((pinfo.CPU.System + pinfo.CPU.User + pinfo.CPU.Other) / 100)
l.gpu.memory.Update(pinfo.GPU.MemoryUsed)
if cerr == nil { l.gpu.usage.Update(pinfo.GPU.Usage / 100)
l.cpu.Update((cpustat.System + cpustat.User + cpustat.Other) / 100) l.gpu.encoder.Update(pinfo.GPU.Encoder / 100)
} l.gpu.decoder.Update(pinfo.GPU.Decoder / 100)
gindex = pinfo.GPU.Index
if gerr == nil {
l.gpu.memory.Update(gstat.MemoryUsed)
l.gpu.usage.Update(gstat.Usage / 100)
l.gpu.encoder.Update(gstat.Encoder / 100)
l.gpu.decoder.Update(gstat.Decoder / 100)
gindex = gstat.Index
} }
isLimitExceeded := false isLimitExceeded := false

View File

@@ -5,41 +5,39 @@ import (
"testing" "testing"
"time" "time"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type psproc struct{} type proc struct{}
func (p *psproc) CPU() (*psutil.CPUInfo, error) { func (p *proc) Info() (resources.ProcessInfo, error) {
return &psutil.CPUInfo{ info := resources.ProcessInfo{
System: 50, CPU: resources.ProcessInfoCPU{
User: 0, System: 50,
Idle: 0, User: 0,
Other: 0, Idle: 0,
}, nil Other: 0,
},
Memory: 197,
GPU: resources.ProcessInfoGPU{
Index: 0,
Name: "L4",
MemoryTotal: 128,
MemoryUsed: 91,
Usage: 3,
Encoder: 9,
Decoder: 5,
},
}
return info, nil
} }
func (p *psproc) Memory() (uint64, error) { func (p *proc) Cancel() {}
return 197, nil func (p *proc) Suspend() error { return nil }
} func (p *proc) Resume() error { return nil }
func (p *psproc) GPU() (*psutil.GPUInfo, error) {
return &psutil.GPUInfo{
Index: 0,
Name: "L4",
MemoryTotal: 128,
MemoryUsed: 91,
Usage: 3,
Encoder: 9,
Decoder: 5,
}, nil
}
func (p *psproc) Cancel() {}
func (p *psproc) Suspend() error { return nil }
func (p *psproc) Resume() error { return nil }
func TestCPULimit(t *testing.T) { func TestCPULimit(t *testing.T) {
lock := sync.Mutex{} lock := sync.Mutex{}
@@ -57,10 +55,9 @@ func TestCPULimit(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
wg.Wait() wg.Wait()
@@ -95,10 +92,9 @@ func TestCPULimitWaitFor(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
wg.Wait() wg.Wait()
@@ -132,10 +128,9 @@ func TestMemoryLimit(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
wg.Wait() wg.Wait()
@@ -170,10 +165,9 @@ func TestMemoryLimitWaitFor(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
wg.Wait() wg.Wait()
@@ -207,10 +201,9 @@ func TestGPUMemoryLimit(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
wg.Wait() wg.Wait()
@@ -245,10 +238,9 @@ func TestGPUMemoryLimitWaitFor(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
wg.Wait() wg.Wait()
@@ -283,10 +275,9 @@ func TestMemoryLimitSoftMode(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
l.Limit(false, true, false) l.Limit(false, true, false)
@@ -323,10 +314,9 @@ func TestGPUMemoryLimitSoftMode(t *testing.T) {
OnLimit: func(float64, uint64, float64, float64, float64, uint64) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(),
}) })
l.Start(&psproc{}) l.Start(&proc{})
defer l.Stop() defer l.Stop()
l.Limit(false, false, true) l.Limit(false, false, true)

View File

@@ -18,7 +18,7 @@ import (
"unicode/utf8" "unicode/utf8"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources"
) )
// Process represents a process and ways to control it // Process represents a process and ways to control it
@@ -71,7 +71,7 @@ type Config struct {
OnStart func() // A callback which is called after the process started. OnStart func() // A callback which is called after the process started.
OnExit func(state string) // A callback which is called after the process exited with the exit state. OnExit func(state string) // A callback which is called after the process exited with the exit state.
OnStateChange func(from, to string) // A callback which is called after a state changed. OnStateChange func(from, to string) // A callback which is called after a state changed.
PSUtil psutil.Util Resources resources.Resources
Logger log.Logger Logger log.Logger
} }
@@ -245,7 +245,7 @@ type process struct {
} }
limits Limiter limits Limiter
scheduler Scheduler scheduler Scheduler
psutil psutil.Util resources resources.Resources
} }
var _ Process = &process{} var _ Process = &process{}
@@ -259,11 +259,11 @@ func New(config Config) (Process, error) {
parser: config.Parser, parser: config.Parser,
logger: config.Logger, logger: config.Logger,
scheduler: config.Scheduler, scheduler: config.Scheduler,
psutil: config.PSUtil, resources: config.Resources,
} }
if p.psutil == nil { if p.resources == nil {
return nil, fmt.Errorf("no psutils given") return nil, fmt.Errorf("resources are required")
} }
p.args = make([]string, len(config.Args)) p.args = make([]string, len(config.Args))
@@ -276,10 +276,6 @@ func New(config Config) (Process, error) {
return nil, fmt.Errorf("no valid binary given") return nil, fmt.Errorf("no valid binary given")
} }
if p.psutil == nil {
return nil, fmt.Errorf("no psutils provided")
}
if p.parser == nil { if p.parser == nil {
p.parser = NewNullParser() p.parser = NewNullParser()
} }
@@ -308,8 +304,11 @@ func New(config Config) (Process, error) {
p.callbacks.onExit = config.OnExit p.callbacks.onExit = config.OnExit
p.callbacks.onStateChange = config.OnStateChange p.callbacks.onStateChange = config.OnStateChange
ncpu := p.resources.Info().CPU.NCPU
limits, err := NewLimiter(LimiterConfig{ limits, err := NewLimiter(LimiterConfig{
CPU: config.LimitCPU, CPU: config.LimitCPU,
NCPU: ncpu,
Memory: config.LimitMemory, Memory: config.LimitMemory,
GPUUsage: config.LimitGPUUsage, GPUUsage: config.LimitGPUUsage,
GPUEncoder: config.LimitGPUEncoder, GPUEncoder: config.LimitGPUEncoder,
@@ -333,7 +332,6 @@ func New(config Config) (Process, error) {
}).Warn().Log("Killed because limits are exceeded") }).Warn().Log("Killed because limits are exceeded")
p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory)) p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory))
}, },
PSUtil: p.psutil,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize limiter") return nil, fmt.Errorf("failed to initialize limiter")
@@ -719,7 +717,7 @@ func (p *process) start() error {
p.pid = int32(p.cmd.Process.Pid) p.pid = int32(p.cmd.Process.Pid)
if proc, err := p.psutil.Process(p.pid); err == nil { if proc, err := p.resources.Process(p.pid); err == nil {
p.limits.Start(proc) p.limits.Start(proc)
} }

View File

@@ -10,13 +10,18 @@ import (
"github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/math/rand" "github.com/datarhei/core/v16/math/rand"
"github.com/datarhei/core/v16/resources"
"github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/resources/psutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func newPSUtil() psutil.Util { func newResources() resources.Resources {
util, _ := psutil.New("", nil) util, _ := psutil.New("", nil)
return util res, _ := resources.New(resources.Config{
PSUtil: util,
})
return res
} }
func TestProcess(t *testing.T) { func TestProcess(t *testing.T) {
@@ -27,7 +32,7 @@ func TestProcess(t *testing.T) {
}, },
Reconnect: false, Reconnect: false,
StaleTimeout: 0, StaleTimeout: 0,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
require.Equal(t, "finished", p.Status().State) require.Equal(t, "finished", p.Status().State)
@@ -66,7 +71,7 @@ func TestReconnectProcess(t *testing.T) {
OnExit: func(string) { OnExit: func(string) {
wg.Done() wg.Done()
}, },
PSUtil: newPSUtil(), Resources: newResources(),
}) })
p.Start() p.Start()
@@ -112,7 +117,7 @@ func TestStaleProcess(t *testing.T) {
}, },
Reconnect: false, Reconnect: false,
StaleTimeout: 2 * time.Second, StaleTimeout: 2 * time.Second,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
p.Start() p.Start()
@@ -135,7 +140,7 @@ func TestStaleReconnectProcess(t *testing.T) {
Reconnect: true, Reconnect: true,
ReconnectDelay: 2 * time.Second, ReconnectDelay: 2 * time.Second,
StaleTimeout: 3 * time.Second, StaleTimeout: 3 * time.Second,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
p.Start() p.Start()
@@ -166,7 +171,7 @@ func TestNonExistingProcess(t *testing.T) {
Reconnect: false, Reconnect: false,
ReconnectDelay: 5 * time.Second, ReconnectDelay: 5 * time.Second,
StaleTimeout: 0, StaleTimeout: 0,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
p.Start() p.Start()
@@ -191,7 +196,7 @@ func TestNonExistingReconnectProcess(t *testing.T) {
Reconnect: true, Reconnect: true,
ReconnectDelay: 2 * time.Second, ReconnectDelay: 2 * time.Second,
StaleTimeout: 0, StaleTimeout: 0,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
p.Start() p.Start()
@@ -215,7 +220,7 @@ func TestProcessFailed(t *testing.T) {
}, },
Reconnect: false, Reconnect: false,
StaleTimeout: 0, StaleTimeout: 0,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
p.Start() p.Start()
@@ -241,7 +246,7 @@ func TestFFmpegWaitStop(t *testing.T) {
OnExit: func(state string) { OnExit: func(state string) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
}, },
PSUtil: newPSUtil(), Resources: newResources(),
}) })
err = p.Start() err = p.Start()
@@ -269,7 +274,7 @@ func TestFFmpegKill(t *testing.T) {
Args: []string{}, Args: []string{},
Reconnect: false, Reconnect: false,
StaleTimeout: 0, StaleTimeout: 0,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
err = p.Start() err = p.Start()
@@ -295,7 +300,7 @@ func TestProcessForceKill(t *testing.T) {
Args: []string{}, Args: []string{},
Reconnect: false, Reconnect: false,
StaleTimeout: 0, StaleTimeout: 0,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
err = p.Start() err = p.Start()
@@ -325,10 +330,10 @@ func TestProcessDuration(t *testing.T) {
require.NoError(t, err, "Failed to build helper program") require.NoError(t, err, "Failed to build helper program")
p, err := New(Config{ p, err := New(Config{
Binary: binary, Binary: binary,
Args: []string{}, Args: []string{},
Timeout: 3 * time.Second, Timeout: 3 * time.Second,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -375,7 +380,7 @@ func TestProcessSchedulePointInTime(t *testing.T) {
}, },
Reconnect: false, Reconnect: false,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
status := p.Status() status := p.Status()
@@ -417,7 +422,7 @@ func TestProcessSchedulePointInTimeGone(t *testing.T) {
}, },
Reconnect: false, Reconnect: false,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
status := p.Status() status := p.Status()
@@ -443,7 +448,7 @@ func TestProcessScheduleCron(t *testing.T) {
}, },
Reconnect: false, Reconnect: false,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
status := p.Status() status := p.Status()
@@ -474,7 +479,7 @@ func TestProcessDelayNoScheduler(t *testing.T) {
Binary: "sleep", Binary: "sleep",
Reconnect: false, Reconnect: false,
ReconnectDelay: 5 * time.Second, ReconnectDelay: 5 * time.Second,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px := p.(*process) px := p.(*process)
@@ -491,7 +496,7 @@ func TestProcessDelayNoScheduler(t *testing.T) {
Binary: "sleep", Binary: "sleep",
Reconnect: true, Reconnect: true,
ReconnectDelay: 5 * time.Second, ReconnectDelay: 5 * time.Second,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px = p.(*process) px = p.(*process)
@@ -515,7 +520,7 @@ func TestProcessDelaySchedulerNoReconnect(t *testing.T) {
Reconnect: false, Reconnect: false,
ReconnectDelay: 1 * time.Second, ReconnectDelay: 1 * time.Second,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px := p.(*process) px := p.(*process)
@@ -537,7 +542,7 @@ func TestProcessDelaySchedulerNoReconnect(t *testing.T) {
Reconnect: false, Reconnect: false,
ReconnectDelay: 1 * time.Second, ReconnectDelay: 1 * time.Second,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px = p.(*process) px = p.(*process)
@@ -561,7 +566,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) {
Reconnect: true, Reconnect: true,
ReconnectDelay: 1 * time.Second, ReconnectDelay: 1 * time.Second,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px := p.(*process) px := p.(*process)
@@ -583,7 +588,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) {
Reconnect: true, Reconnect: true,
ReconnectDelay: 1 * time.Second, ReconnectDelay: 1 * time.Second,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px = p.(*process) px = p.(*process)
@@ -605,7 +610,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) {
Reconnect: true, Reconnect: true,
ReconnectDelay: 10 * time.Second, ReconnectDelay: 10 * time.Second,
Scheduler: s, Scheduler: s,
PSUtil: newPSUtil(), Resources: newResources(),
}) })
px = p.(*process) px = p.(*process)
@@ -663,7 +668,7 @@ func TestProcessCallbacks(t *testing.T) {
onState = append(onState, from+"/"+to) onState = append(onState, from+"/"+to)
}, },
PSUtil: newPSUtil(), Resources: newResources(),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -706,7 +711,7 @@ func TestProcessCallbacksOnBeforeStart(t *testing.T) {
OnBeforeStart: func(a []string) ([]string, error) { OnBeforeStart: func(a []string) ([]string, error) {
return a, fmt.Errorf("no, not now") return a, fmt.Errorf("no, not now")
}, },
PSUtil: newPSUtil(), Resources: newResources(),
}) })
require.NoError(t, err) require.NoError(t, err)

View File

@@ -139,6 +139,8 @@ type Resources interface {
Disk(path string) (*DiskInfo, error) Disk(path string) (*DiskInfo, error)
Network() ([]NetworkInfo, error) Network() ([]NetworkInfo, error)
Process(pid int32) (Process, error)
} }
type Config struct { type Config struct {
@@ -622,3 +624,108 @@ func (r *resources) Network() ([]NetworkInfo, error) {
return info, nil return info, nil
} }
func (r *resources) Process(pid int32) (Process, error) {
proc, err := r.psutil.Process(pid)
if err != nil {
return nil, err
}
p := &process{
proc: proc,
}
return p, nil
}
type Process interface {
Info() (ProcessInfo, error)
// Cancel will stop collecting CPU and memory data for this process.
Cancel()
// Suspend will send SIGSTOP to the process.
Suspend() error
// Resume will send SIGCONT to the process.
Resume() error
}
type process struct {
proc psutil.Process
}
type ProcessInfoCPU struct {
System float64 // percent 0-100
User float64 // percent 0-100
Idle float64 // percent 0-100
Other float64 // percent 0-100
}
type ProcessInfoGPU struct {
Index int // Index of the GPU
Name string // Name of the GPU (not populated for a specific process)
MemoryTotal uint64 // bytes (not populated for a specific process)
MemoryUsed uint64 // bytes
Usage float64 // percent 0-100
Encoder float64 // percent 0-100
Decoder float64 // percent 0-100
}
type ProcessInfo struct {
CPU ProcessInfoCPU
Memory uint64
GPU ProcessInfoGPU
}
func (p *process) Info() (ProcessInfo, error) {
cpu, err := p.proc.CPU()
if err != nil {
return ProcessInfo{}, err
}
mem, err := p.proc.Memory()
if err != nil {
return ProcessInfo{}, err
}
gpu, err := p.proc.GPU()
if err != nil {
return ProcessInfo{}, err
}
pi := ProcessInfo{
CPU: ProcessInfoCPU{
System: cpu.System,
User: cpu.User,
Idle: cpu.Idle,
Other: cpu.Other,
},
Memory: mem,
GPU: ProcessInfoGPU{
Index: gpu.Index,
Name: gpu.Name,
MemoryTotal: gpu.MemoryTotal,
MemoryUsed: gpu.MemoryUsed,
Usage: gpu.Usage,
Encoder: gpu.Encoder,
Decoder: gpu.Decoder,
},
}
return pi, nil
}
func (p *process) Cancel() {
p.proc.Cancel()
}
func (p *process) Suspend() error {
return p.proc.Suspend()
}
func (p *process) Resume() error {
return p.proc.Resume()
}

View File

@@ -11,7 +11,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type util struct { type mockUtil struct {
lock sync.Mutex lock sync.Mutex
cpu psutil.CPUInfo cpu psutil.CPUInfo
@@ -19,8 +19,8 @@ type util struct {
gpu []psutil.GPUInfo gpu []psutil.GPUInfo
} }
func newUtil(ngpu int) *util { func newMockUtil(ngpu int) *mockUtil {
u := &util{ u := &mockUtil{
cpu: psutil.CPUInfo{ cpu: psutil.CPUInfo{
System: 10, System: 10,
User: 50, User: 50,
@@ -49,14 +49,14 @@ func newUtil(ngpu int) *util {
return u return u
} }
func (u *util) Start() {} func (u *mockUtil) Start() {}
func (u *util) Cancel() {} func (u *mockUtil) Cancel() {}
func (u *util) CPUCounts() (float64, error) { func (u *mockUtil) CPUCounts() (float64, error) {
return 2, nil return 2, nil
} }
func (u *util) CPU() (*psutil.CPUInfo, error) { func (u *mockUtil) CPU() (*psutil.CPUInfo, error) {
u.lock.Lock() u.lock.Lock()
defer u.lock.Unlock() defer u.lock.Unlock()
@@ -65,11 +65,11 @@ func (u *util) CPU() (*psutil.CPUInfo, error) {
return &cpu, nil return &cpu, nil
} }
func (u *util) Disk(path string) (*psutil.DiskInfo, error) { func (u *mockUtil) Disk(path string) (*psutil.DiskInfo, error) {
return &psutil.DiskInfo{}, nil return &psutil.DiskInfo{}, nil
} }
func (u *util) Memory() (*psutil.MemoryInfo, error) { func (u *mockUtil) Memory() (*psutil.MemoryInfo, error) {
u.lock.Lock() u.lock.Lock()
defer u.lock.Unlock() defer u.lock.Unlock()
@@ -78,11 +78,11 @@ func (u *util) Memory() (*psutil.MemoryInfo, error) {
return &mem, nil return &mem, nil
} }
func (u *util) Network() ([]psutil.NetworkInfo, error) { func (u *mockUtil) Network() ([]psutil.NetworkInfo, error) {
return nil, nil return nil, nil
} }
func (u *util) GPU() ([]psutil.GPUInfo, error) { func (u *mockUtil) GPU() ([]psutil.GPUInfo, error) {
u.lock.Lock() u.lock.Lock()
defer u.lock.Unlock() defer u.lock.Unlock()
@@ -93,13 +93,13 @@ func (u *util) GPU() ([]psutil.GPUInfo, error) {
return gpu, nil return gpu, nil
} }
func (u *util) Process(pid int32) (psutil.Process, error) { func (u *mockUtil) Process(pid int32) (psutil.Process, error) {
return &process{}, nil return &mockProcess{}, nil
} }
type process struct{} type mockProcess struct{}
func (p *process) CPU() (*psutil.CPUInfo, error) { func (p *mockProcess) CPU() (*psutil.CPUInfo, error) {
s := &psutil.CPUInfo{ s := &psutil.CPUInfo{
System: 1, System: 1,
User: 2, User: 2,
@@ -110,8 +110,8 @@ func (p *process) CPU() (*psutil.CPUInfo, error) {
return s, nil return s, nil
} }
func (p *process) Memory() (uint64, error) { return 42, nil } func (p *mockProcess) Memory() (uint64, error) { return 42, nil }
func (p *process) GPU() (*psutil.GPUInfo, error) { func (p *mockProcess) GPU() (*psutil.GPUInfo, error) {
return &psutil.GPUInfo{ return &psutil.GPUInfo{
Index: 0, Index: 0,
Name: "L4", Name: "L4",
@@ -122,13 +122,13 @@ func (p *process) GPU() (*psutil.GPUInfo, error) {
Decoder: 7, Decoder: 7,
}, nil }, nil
} }
func (p *process) Cancel() {} func (p *mockProcess) Cancel() {}
func (p *process) Suspend() error { return nil } func (p *mockProcess) Suspend() error { return nil }
func (p *process) Resume() error { return nil } func (p *mockProcess) Resume() error { return nil }
func TestConfigNoLimits(t *testing.T) { func TestConfigNoLimits(t *testing.T) {
_, err := New(Config{ _, err := New(Config{
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
}) })
require.NoError(t, err) require.NoError(t, err)
} }
@@ -137,7 +137,7 @@ func TestConfigWrongLimits(t *testing.T) {
_, err := New(Config{ _, err := New(Config{
MaxCPU: 102, MaxCPU: 102,
MaxMemory: 573, MaxMemory: 573,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
}) })
require.Error(t, err) require.Error(t, err)
@@ -146,7 +146,7 @@ func TestConfigWrongLimits(t *testing.T) {
MaxMemory: 0, MaxMemory: 0,
MaxGPU: 101, MaxGPU: 101,
MaxGPUMemory: 103, MaxGPUMemory: 103,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -155,7 +155,7 @@ func TestConfigWrongLimits(t *testing.T) {
MaxMemory: 0, MaxMemory: 0,
MaxGPU: 101, MaxGPU: 101,
MaxGPUMemory: 103, MaxGPUMemory: 103,
PSUtil: newUtil(1), PSUtil: newMockUtil(1),
}) })
require.Error(t, err) require.Error(t, err)
} }
@@ -164,7 +164,7 @@ func TestMemoryLimit(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxCPU: 100, MaxCPU: 100,
MaxMemory: 150. / 200. * 100, MaxMemory: 150. / 200. * 100,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -209,7 +209,7 @@ func TestMemoryLimit(t *testing.T) {
} }
func TestMemoryUnlimit(t *testing.T) { func TestMemoryUnlimit(t *testing.T) {
util := newUtil(0) util := newMockUtil(0)
r, err := New(Config{ r, err := New(Config{
MaxCPU: 100, MaxCPU: 100,
@@ -296,7 +296,7 @@ func TestCPULimit(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxCPU: 50., MaxCPU: 50.,
MaxMemory: 100, MaxMemory: 100,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -341,7 +341,7 @@ func TestCPULimit(t *testing.T) {
} }
func TestCPUUnlimit(t *testing.T) { func TestCPUUnlimit(t *testing.T) {
util := newUtil(0) util := newMockUtil(0)
r, err := New(Config{ r, err := New(Config{
MaxCPU: 50., MaxCPU: 50.,
@@ -430,7 +430,7 @@ func TestGPULimitMemory(t *testing.T) {
MaxMemory: 100, MaxMemory: 100,
MaxGPU: 100, MaxGPU: 100,
MaxGPUMemory: 20, MaxGPUMemory: 20,
PSUtil: newUtil(2), PSUtil: newMockUtil(2),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -475,7 +475,7 @@ func TestGPULimitMemory(t *testing.T) {
} }
func TestGPUUnlimitMemory(t *testing.T) { func TestGPUUnlimitMemory(t *testing.T) {
util := newUtil(2) util := newMockUtil(2)
r, err := New(Config{ r, err := New(Config{
MaxCPU: 100, MaxCPU: 100,
@@ -564,7 +564,7 @@ func TestGPULimitMemorySome(t *testing.T) {
MaxMemory: 100, MaxMemory: 100,
MaxGPU: 100, MaxGPU: 100,
MaxGPUMemory: 14. / 24. * 100., MaxGPUMemory: 14. / 24. * 100.,
PSUtil: newUtil(4), PSUtil: newMockUtil(4),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -614,7 +614,7 @@ func TestGPULimitUsage(t *testing.T) {
MaxMemory: 100, MaxMemory: 100,
MaxGPU: 40, MaxGPU: 40,
MaxGPUMemory: 100, MaxGPUMemory: 100,
PSUtil: newUtil(3), PSUtil: newMockUtil(3),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -662,7 +662,7 @@ func TestGPULimitUsage(t *testing.T) {
} }
func TestGPUUnlimitUsage(t *testing.T) { func TestGPUUnlimitUsage(t *testing.T) {
util := newUtil(3) util := newMockUtil(3)
r, err := New(Config{ r, err := New(Config{
MaxCPU: 100, MaxCPU: 100,
@@ -749,7 +749,7 @@ func TestGPUUnlimitUsage(t *testing.T) {
func TestRequestCPU(t *testing.T) { func TestRequestCPU(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxCPU: 70., MaxCPU: 70.,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -766,7 +766,7 @@ func TestRequestCPU(t *testing.T) {
func TestRequestMemory(t *testing.T) { func TestRequestMemory(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxMemory: 170. / 200. * 100, MaxMemory: 170. / 200. * 100,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -784,7 +784,7 @@ func TestRequestNoGPU(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxCPU: 100, MaxCPU: 100,
MaxMemory: 100, MaxMemory: 100,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -796,7 +796,7 @@ func TestRequestInvalidGPURequest(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxCPU: 100, MaxCPU: 100,
MaxMemory: 100, MaxMemory: 100,
PSUtil: newUtil(1), PSUtil: newMockUtil(1),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -813,7 +813,7 @@ func TestRequestGPULimitsOneGPU(t *testing.T) {
MaxMemory: 100, MaxMemory: 100,
MaxGPU: 50, MaxGPU: 50,
MaxGPUMemory: 60, MaxGPUMemory: 60,
PSUtil: newUtil(1), PSUtil: newMockUtil(1),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -840,7 +840,7 @@ func TestRequestGPULimitsMoreGPU(t *testing.T) {
MaxMemory: 100, MaxMemory: 100,
MaxGPU: 60, MaxGPU: 60,
MaxGPUMemory: 60, MaxGPUMemory: 60,
PSUtil: newUtil(2), PSUtil: newMockUtil(2),
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -856,7 +856,7 @@ func TestHasLimits(t *testing.T) {
r, err := New(Config{ r, err := New(Config{
MaxCPU: 70., MaxCPU: 70.,
MaxMemory: 170. / 200. * 100, MaxMemory: 170. / 200. * 100,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -866,7 +866,7 @@ func TestHasLimits(t *testing.T) {
r, err = New(Config{ r, err = New(Config{
MaxCPU: 100, MaxCPU: 100,
MaxMemory: 100, MaxMemory: 100,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -876,7 +876,7 @@ func TestHasLimits(t *testing.T) {
r, err = New(Config{ r, err = New(Config{
MaxCPU: 0, MaxCPU: 0,
MaxMemory: 0, MaxMemory: 0,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -887,7 +887,7 @@ func TestHasLimits(t *testing.T) {
MaxCPU: 0, MaxCPU: 0,
MaxMemory: 0, MaxMemory: 0,
MaxGPU: 10, MaxGPU: 10,
PSUtil: newUtil(1), PSUtil: newMockUtil(1),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -898,7 +898,7 @@ func TestHasLimits(t *testing.T) {
MaxCPU: 0, MaxCPU: 0,
MaxMemory: 0, MaxMemory: 0,
MaxGPU: 10, MaxGPU: 10,
PSUtil: newUtil(0), PSUtil: newMockUtil(0),
Logger: nil, Logger: nil,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -912,7 +912,7 @@ func TestInfo(t *testing.T) {
MaxMemory: 90, MaxMemory: 90,
MaxGPU: 11, MaxGPU: 11,
MaxGPUMemory: 50, MaxGPUMemory: 50,
PSUtil: newUtil(2), PSUtil: newMockUtil(2),
}) })
require.NoError(t, err) require.NoError(t, err)

View File

@@ -38,13 +38,20 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
return nil, err return nil, err
} }
resources, err := resources.New(resources.Config{
PSUtil: psutil,
})
if err != nil {
return nil, err
}
ffmpeg, err := ffmpeg.New(ffmpeg.Config{ ffmpeg, err := ffmpeg.New(ffmpeg.Config{
Binary: binary, Binary: binary,
LogHistoryLength: 3, LogHistoryLength: 3,
Portrange: portrange, Portrange: portrange,
ValidatorInput: validatorIn, ValidatorInput: validatorIn,
ValidatorOutput: validatorOut, ValidatorOutput: validatorOut,
PSUtil: psutil, Resource: resources,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -88,13 +95,6 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp
return nil, err return nil, err
} }
resources, err := resources.New(resources.Config{
PSUtil: psutil,
})
if err != nil {
return nil, err
}
rs, err := New(Config{ rs, err := New(Config{
FFmpeg: ffmpeg, FFmpeg: ffmpeg,
Replace: replacer, Replace: replacer,