diff --git a/app/api/api.go b/app/api/api.go index da0496fe..6665d619 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -38,6 +38,7 @@ import ( "github.com/datarhei/core/v16/prometheus" "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/resources/psutil" + "github.com/datarhei/core/v16/resources/psutil/gpu/nvidia" "github.com/datarhei/core/v16/restream" restreamapp "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" @@ -368,7 +369,7 @@ func (a *api) start(ctx context.Context) error { debug.SetMemoryLimit(math.MaxInt64) } - psutil, err := psutil.New("", nil) + psutil, err := psutil.New("", nvidia.New("")) if err != nil { return fmt.Errorf("failed to initialize psutils: %w", err) } @@ -511,7 +512,7 @@ func (a *api) start(ctx context.Context) error { ValidatorOutput: validatorOut, Portrange: portrange, Collector: a.sessions.Collector("ffmpeg"), - PSUtil: psutil, + Resource: a.resources, }) if err != nil { return fmt.Errorf("unable to create ffmpeg: %w", err) diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 5e64fd1e..b3c4b445 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -12,7 +12,7 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/process" - "github.com/datarhei/core/v16/resources/psutil" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/session" ) @@ -64,7 +64,7 @@ type Config struct { ValidatorOutput Validator Portrange net.Portranger Collector session.Collector - PSUtil psutil.Util + Resource resources.Resources } type ffmpeg struct { @@ -83,17 +83,17 @@ type ffmpeg struct { states process.States statesLock sync.RWMutex - psutil psutil.Util + resources resources.Resources } func New(config Config) (FFmpeg, error) { f := &ffmpeg{} - if config.PSUtil == nil { - return nil, fmt.Errorf("psutils required") + if config.Resource == nil { + return nil, fmt.Errorf("resources are required") } - f.psutil = config.PSUtil + f.resources = config.Resource binary, err := exec.LookPath(config.Binary) if err != nil { @@ -194,7 +194,7 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { config.OnStateChange(from, to) } }, - PSUtil: f.psutil, + Resources: f.resources, }) return ffmpeg, err diff --git a/http/mock/mock.go b/http/mock/mock.go index 7487c9af..f6b516fc 100644 --- a/http/mock/mock.go +++ b/http/mock/mock.go @@ -17,6 +17,7 @@ import ( "github.com/datarhei/core/v16/http/validator" "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/io/fs" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/restream" jsonstore "github.com/datarhei/core/v16/restream/store/json" @@ -50,11 +51,18 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) { return nil, err } + resources, err := resources.New(resources.Config{ + PSUtil: psutil, + }) + if err != nil { + return nil, err + } + ffmpeg, err := ffmpeg.New(ffmpeg.Config{ Binary: binary, MaxLogLines: 100, LogHistoryLength: 3, - PSUtil: psutil, + Resource: resources, }) if err != nil { return nil, err diff --git a/process/limiter.go b/process/limiter.go index 8bb650dc..b30d4731 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -7,7 +7,7 @@ import ( "time" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/resources/psutil" + "github.com/datarhei/core/v16/resources" ) type Usage struct { @@ -85,13 +85,13 @@ type LimiterConfig struct { WaitFor time.Duration // Duration for one of the limits has to be above the limit until OnLimit gets triggered. OnLimit LimitFunc // Function to be triggered if limits are exceeded. Mode LimitMode // How to limit CPU usage. - PSUtil psutil.Util + NCPU float64 // Number of available CPU Logger log.Logger } type Limiter interface { // Start starts the limiter with a psutil.Process. - Start(process psutil.Process) error + Start(process resources.Process) error // Stop stops the limiter. The limiter can be reused by calling Start() again Stop() @@ -226,11 +226,9 @@ func (x *metric[T]) IsExceeded(waitFor time.Duration, mode LimitMode) bool { } type limiter struct { - psutil psutil.Util - ncpu float64 ncpuFactor float64 - proc psutil.Process + proc resources.Process lock sync.RWMutex cancel context.CancelFunc onLimit LimitFunc @@ -259,13 +257,17 @@ type limiter struct { // NewLimiter returns a new Limiter func NewLimiter(config LimiterConfig) (Limiter, error) { l := &limiter{ + ncpu: config.NCPU, waitFor: config.WaitFor, onLimit: config.OnLimit, mode: config.Mode, - psutil: config.PSUtil, logger: config.Logger, } + if l.ncpu <= 0 { + l.ncpu = 1 + } + l.cpu.SetLimit(config.CPU / 100) l.memory.SetLimit(config.Memory) l.gpu.memory.SetLimit(config.GPUMemory) @@ -277,16 +279,6 @@ func NewLimiter(config LimiterConfig) (Limiter, error) { l.logger = log.New("") } - if l.psutil == nil { - return nil, fmt.Errorf("no psutil provided") - } - - if ncpu, err := l.psutil.CPUCounts(); err != nil { - l.ncpu = 1 - } else { - l.ncpu = ncpu - } - l.lastUsage.CPU.NCPU = l.ncpu l.lastUsage.CPU.Limit = l.cpu.Limit() * 100 * l.ncpu l.lastUsage.Memory.Limit = l.memory.Limit() @@ -333,7 +325,7 @@ func (l *limiter) reset() { l.gpu.decoder.Reset() } -func (l *limiter) Start(process psutil.Process) error { +func (l *limiter) Start(process resources.Process) error { l.lock.Lock() defer l.lock.Unlock() @@ -396,28 +388,24 @@ func (l *limiter) collect() { return } - mstat, merr := proc.Memory() - cpustat, cerr := proc.CPU() - gstat, gerr := proc.GPU() + pinfo, err := proc.Info() + + //mstat, merr := proc.Memory() + //cpustat, cerr := proc.CPU() + //gstat, gerr := proc.GPU() gindex := -1 l.lock.Lock() defer l.lock.Unlock() - if merr == nil { - l.memory.Update(mstat) - } - - if cerr == nil { - l.cpu.Update((cpustat.System + cpustat.User + cpustat.Other) / 100) - } - - if gerr == nil { - l.gpu.memory.Update(gstat.MemoryUsed) - l.gpu.usage.Update(gstat.Usage / 100) - l.gpu.encoder.Update(gstat.Encoder / 100) - l.gpu.decoder.Update(gstat.Decoder / 100) - gindex = gstat.Index + if err == nil { + l.memory.Update(pinfo.Memory) + l.cpu.Update((pinfo.CPU.System + pinfo.CPU.User + pinfo.CPU.Other) / 100) + l.gpu.memory.Update(pinfo.GPU.MemoryUsed) + l.gpu.usage.Update(pinfo.GPU.Usage / 100) + l.gpu.encoder.Update(pinfo.GPU.Encoder / 100) + l.gpu.decoder.Update(pinfo.GPU.Decoder / 100) + gindex = pinfo.GPU.Index } isLimitExceeded := false diff --git a/process/limiter_test.go b/process/limiter_test.go index 5d93bb68..56813958 100644 --- a/process/limiter_test.go +++ b/process/limiter_test.go @@ -5,41 +5,39 @@ import ( "testing" "time" - "github.com/datarhei/core/v16/resources/psutil" + "github.com/datarhei/core/v16/resources" "github.com/stretchr/testify/require" ) -type psproc struct{} +type proc struct{} -func (p *psproc) CPU() (*psutil.CPUInfo, error) { - return &psutil.CPUInfo{ - System: 50, - User: 0, - Idle: 0, - Other: 0, - }, nil +func (p *proc) Info() (resources.ProcessInfo, error) { + info := resources.ProcessInfo{ + CPU: resources.ProcessInfoCPU{ + System: 50, + User: 0, + Idle: 0, + Other: 0, + }, + Memory: 197, + GPU: resources.ProcessInfoGPU{ + Index: 0, + Name: "L4", + MemoryTotal: 128, + MemoryUsed: 91, + Usage: 3, + Encoder: 9, + Decoder: 5, + }, + } + + return info, nil } -func (p *psproc) Memory() (uint64, error) { - return 197, nil -} - -func (p *psproc) GPU() (*psutil.GPUInfo, error) { - return &psutil.GPUInfo{ - Index: 0, - Name: "L4", - MemoryTotal: 128, - MemoryUsed: 91, - Usage: 3, - Encoder: 9, - Decoder: 5, - }, nil -} - -func (p *psproc) Cancel() {} -func (p *psproc) Suspend() error { return nil } -func (p *psproc) Resume() error { return nil } +func (p *proc) Cancel() {} +func (p *proc) Suspend() error { return nil } +func (p *proc) Resume() error { return nil } func TestCPULimit(t *testing.T) { lock := sync.Mutex{} @@ -57,10 +55,9 @@ func TestCPULimit(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() wg.Wait() @@ -95,10 +92,9 @@ func TestCPULimitWaitFor(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() wg.Wait() @@ -132,10 +128,9 @@ func TestMemoryLimit(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() wg.Wait() @@ -170,10 +165,9 @@ func TestMemoryLimitWaitFor(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() wg.Wait() @@ -207,10 +201,9 @@ func TestGPUMemoryLimit(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() wg.Wait() @@ -245,10 +238,9 @@ func TestGPUMemoryLimitWaitFor(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() wg.Wait() @@ -283,10 +275,9 @@ func TestMemoryLimitSoftMode(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() l.Limit(false, true, false) @@ -323,10 +314,9 @@ func TestGPUMemoryLimitSoftMode(t *testing.T) { OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, - PSUtil: newPSUtil(), }) - l.Start(&psproc{}) + l.Start(&proc{}) defer l.Stop() l.Limit(false, false, true) diff --git a/process/process.go b/process/process.go index 0d5b50ca..be0a9854 100644 --- a/process/process.go +++ b/process/process.go @@ -18,7 +18,7 @@ import ( "unicode/utf8" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/resources/psutil" + "github.com/datarhei/core/v16/resources" ) // Process represents a process and ways to control it @@ -71,7 +71,7 @@ type Config struct { OnStart func() // A callback which is called after the process started. OnExit func(state string) // A callback which is called after the process exited with the exit state. OnStateChange func(from, to string) // A callback which is called after a state changed. - PSUtil psutil.Util + Resources resources.Resources Logger log.Logger } @@ -245,7 +245,7 @@ type process struct { } limits Limiter scheduler Scheduler - psutil psutil.Util + resources resources.Resources } var _ Process = &process{} @@ -259,11 +259,11 @@ func New(config Config) (Process, error) { parser: config.Parser, logger: config.Logger, scheduler: config.Scheduler, - psutil: config.PSUtil, + resources: config.Resources, } - if p.psutil == nil { - return nil, fmt.Errorf("no psutils given") + if p.resources == nil { + return nil, fmt.Errorf("resources are required") } p.args = make([]string, len(config.Args)) @@ -276,10 +276,6 @@ func New(config Config) (Process, error) { return nil, fmt.Errorf("no valid binary given") } - if p.psutil == nil { - return nil, fmt.Errorf("no psutils provided") - } - if p.parser == nil { p.parser = NewNullParser() } @@ -308,8 +304,11 @@ func New(config Config) (Process, error) { p.callbacks.onExit = config.OnExit p.callbacks.onStateChange = config.OnStateChange + ncpu := p.resources.Info().CPU.NCPU + limits, err := NewLimiter(LimiterConfig{ CPU: config.LimitCPU, + NCPU: ncpu, Memory: config.LimitMemory, GPUUsage: config.LimitGPUUsage, GPUEncoder: config.LimitGPUEncoder, @@ -333,7 +332,6 @@ func New(config Config) (Process, error) { }).Warn().Log("Killed because limits are exceeded") p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory)) }, - PSUtil: p.psutil, }) if err != nil { return nil, fmt.Errorf("failed to initialize limiter") @@ -719,7 +717,7 @@ func (p *process) start() error { p.pid = int32(p.cmd.Process.Pid) - if proc, err := p.psutil.Process(p.pid); err == nil { + if proc, err := p.resources.Process(p.pid); err == nil { p.limits.Start(proc) } diff --git a/process/process_test.go b/process/process_test.go index 88045b9d..ce4cc01b 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -10,13 +10,18 @@ import ( "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/math/rand" + "github.com/datarhei/core/v16/resources" "github.com/datarhei/core/v16/resources/psutil" "github.com/stretchr/testify/require" ) -func newPSUtil() psutil.Util { +func newResources() resources.Resources { util, _ := psutil.New("", nil) - return util + res, _ := resources.New(resources.Config{ + PSUtil: util, + }) + + return res } func TestProcess(t *testing.T) { @@ -27,7 +32,7 @@ func TestProcess(t *testing.T) { }, Reconnect: false, StaleTimeout: 0, - PSUtil: newPSUtil(), + Resources: newResources(), }) require.Equal(t, "finished", p.Status().State) @@ -66,7 +71,7 @@ func TestReconnectProcess(t *testing.T) { OnExit: func(string) { wg.Done() }, - PSUtil: newPSUtil(), + Resources: newResources(), }) p.Start() @@ -112,7 +117,7 @@ func TestStaleProcess(t *testing.T) { }, Reconnect: false, StaleTimeout: 2 * time.Second, - PSUtil: newPSUtil(), + Resources: newResources(), }) p.Start() @@ -135,7 +140,7 @@ func TestStaleReconnectProcess(t *testing.T) { Reconnect: true, ReconnectDelay: 2 * time.Second, StaleTimeout: 3 * time.Second, - PSUtil: newPSUtil(), + Resources: newResources(), }) p.Start() @@ -166,7 +171,7 @@ func TestNonExistingProcess(t *testing.T) { Reconnect: false, ReconnectDelay: 5 * time.Second, StaleTimeout: 0, - PSUtil: newPSUtil(), + Resources: newResources(), }) p.Start() @@ -191,7 +196,7 @@ func TestNonExistingReconnectProcess(t *testing.T) { Reconnect: true, ReconnectDelay: 2 * time.Second, StaleTimeout: 0, - PSUtil: newPSUtil(), + Resources: newResources(), }) p.Start() @@ -215,7 +220,7 @@ func TestProcessFailed(t *testing.T) { }, Reconnect: false, StaleTimeout: 0, - PSUtil: newPSUtil(), + Resources: newResources(), }) p.Start() @@ -241,7 +246,7 @@ func TestFFmpegWaitStop(t *testing.T) { OnExit: func(state string) { time.Sleep(3 * time.Second) }, - PSUtil: newPSUtil(), + Resources: newResources(), }) err = p.Start() @@ -269,7 +274,7 @@ func TestFFmpegKill(t *testing.T) { Args: []string{}, Reconnect: false, StaleTimeout: 0, - PSUtil: newPSUtil(), + Resources: newResources(), }) err = p.Start() @@ -295,7 +300,7 @@ func TestProcessForceKill(t *testing.T) { Args: []string{}, Reconnect: false, StaleTimeout: 0, - PSUtil: newPSUtil(), + Resources: newResources(), }) err = p.Start() @@ -325,10 +330,10 @@ func TestProcessDuration(t *testing.T) { require.NoError(t, err, "Failed to build helper program") p, err := New(Config{ - Binary: binary, - Args: []string{}, - Timeout: 3 * time.Second, - PSUtil: newPSUtil(), + Binary: binary, + Args: []string{}, + Timeout: 3 * time.Second, + Resources: newResources(), }) require.NoError(t, err) @@ -375,7 +380,7 @@ func TestProcessSchedulePointInTime(t *testing.T) { }, Reconnect: false, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) status := p.Status() @@ -417,7 +422,7 @@ func TestProcessSchedulePointInTimeGone(t *testing.T) { }, Reconnect: false, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) status := p.Status() @@ -443,7 +448,7 @@ func TestProcessScheduleCron(t *testing.T) { }, Reconnect: false, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) status := p.Status() @@ -474,7 +479,7 @@ func TestProcessDelayNoScheduler(t *testing.T) { Binary: "sleep", Reconnect: false, ReconnectDelay: 5 * time.Second, - PSUtil: newPSUtil(), + Resources: newResources(), }) px := p.(*process) @@ -491,7 +496,7 @@ func TestProcessDelayNoScheduler(t *testing.T) { Binary: "sleep", Reconnect: true, ReconnectDelay: 5 * time.Second, - PSUtil: newPSUtil(), + Resources: newResources(), }) px = p.(*process) @@ -515,7 +520,7 @@ func TestProcessDelaySchedulerNoReconnect(t *testing.T) { Reconnect: false, ReconnectDelay: 1 * time.Second, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) px := p.(*process) @@ -537,7 +542,7 @@ func TestProcessDelaySchedulerNoReconnect(t *testing.T) { Reconnect: false, ReconnectDelay: 1 * time.Second, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) px = p.(*process) @@ -561,7 +566,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { Reconnect: true, ReconnectDelay: 1 * time.Second, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) px := p.(*process) @@ -583,7 +588,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { Reconnect: true, ReconnectDelay: 1 * time.Second, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) px = p.(*process) @@ -605,7 +610,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { Reconnect: true, ReconnectDelay: 10 * time.Second, Scheduler: s, - PSUtil: newPSUtil(), + Resources: newResources(), }) px = p.(*process) @@ -663,7 +668,7 @@ func TestProcessCallbacks(t *testing.T) { onState = append(onState, from+"/"+to) }, - PSUtil: newPSUtil(), + Resources: newResources(), }) require.NoError(t, err) @@ -706,7 +711,7 @@ func TestProcessCallbacksOnBeforeStart(t *testing.T) { OnBeforeStart: func(a []string) ([]string, error) { return a, fmt.Errorf("no, not now") }, - PSUtil: newPSUtil(), + Resources: newResources(), }) require.NoError(t, err) diff --git a/resources/resources.go b/resources/resources.go index 2130b1f4..17c77e4e 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -139,6 +139,8 @@ type Resources interface { Disk(path string) (*DiskInfo, error) Network() ([]NetworkInfo, error) + + Process(pid int32) (Process, error) } type Config struct { @@ -622,3 +624,108 @@ func (r *resources) Network() ([]NetworkInfo, error) { return info, nil } + +func (r *resources) Process(pid int32) (Process, error) { + proc, err := r.psutil.Process(pid) + if err != nil { + return nil, err + } + + p := &process{ + proc: proc, + } + + return p, nil +} + +type Process interface { + Info() (ProcessInfo, error) + + // Cancel will stop collecting CPU and memory data for this process. + Cancel() + + // Suspend will send SIGSTOP to the process. + Suspend() error + + // Resume will send SIGCONT to the process. + Resume() error +} + +type process struct { + proc psutil.Process +} + +type ProcessInfoCPU struct { + System float64 // percent 0-100 + User float64 // percent 0-100 + Idle float64 // percent 0-100 + Other float64 // percent 0-100 +} + +type ProcessInfoGPU struct { + Index int // Index of the GPU + Name string // Name of the GPU (not populated for a specific process) + + MemoryTotal uint64 // bytes (not populated for a specific process) + MemoryUsed uint64 // bytes + + Usage float64 // percent 0-100 + Encoder float64 // percent 0-100 + Decoder float64 // percent 0-100 +} + +type ProcessInfo struct { + CPU ProcessInfoCPU + Memory uint64 + GPU ProcessInfoGPU +} + +func (p *process) Info() (ProcessInfo, error) { + cpu, err := p.proc.CPU() + if err != nil { + return ProcessInfo{}, err + } + + mem, err := p.proc.Memory() + if err != nil { + return ProcessInfo{}, err + } + + gpu, err := p.proc.GPU() + if err != nil { + return ProcessInfo{}, err + } + + pi := ProcessInfo{ + CPU: ProcessInfoCPU{ + System: cpu.System, + User: cpu.User, + Idle: cpu.Idle, + Other: cpu.Other, + }, + Memory: mem, + GPU: ProcessInfoGPU{ + Index: gpu.Index, + Name: gpu.Name, + MemoryTotal: gpu.MemoryTotal, + MemoryUsed: gpu.MemoryUsed, + Usage: gpu.Usage, + Encoder: gpu.Encoder, + Decoder: gpu.Decoder, + }, + } + + return pi, nil +} + +func (p *process) Cancel() { + p.proc.Cancel() +} + +func (p *process) Suspend() error { + return p.proc.Suspend() +} + +func (p *process) Resume() error { + return p.proc.Resume() +} diff --git a/resources/resources_test.go b/resources/resources_test.go index 4b6b6bae..f590b881 100644 --- a/resources/resources_test.go +++ b/resources/resources_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" ) -type util struct { +type mockUtil struct { lock sync.Mutex cpu psutil.CPUInfo @@ -19,8 +19,8 @@ type util struct { gpu []psutil.GPUInfo } -func newUtil(ngpu int) *util { - u := &util{ +func newMockUtil(ngpu int) *mockUtil { + u := &mockUtil{ cpu: psutil.CPUInfo{ System: 10, User: 50, @@ -49,14 +49,14 @@ func newUtil(ngpu int) *util { return u } -func (u *util) Start() {} -func (u *util) Cancel() {} +func (u *mockUtil) Start() {} +func (u *mockUtil) Cancel() {} -func (u *util) CPUCounts() (float64, error) { +func (u *mockUtil) CPUCounts() (float64, error) { return 2, nil } -func (u *util) CPU() (*psutil.CPUInfo, error) { +func (u *mockUtil) CPU() (*psutil.CPUInfo, error) { u.lock.Lock() defer u.lock.Unlock() @@ -65,11 +65,11 @@ func (u *util) CPU() (*psutil.CPUInfo, error) { return &cpu, nil } -func (u *util) Disk(path string) (*psutil.DiskInfo, error) { +func (u *mockUtil) Disk(path string) (*psutil.DiskInfo, error) { return &psutil.DiskInfo{}, nil } -func (u *util) Memory() (*psutil.MemoryInfo, error) { +func (u *mockUtil) Memory() (*psutil.MemoryInfo, error) { u.lock.Lock() defer u.lock.Unlock() @@ -78,11 +78,11 @@ func (u *util) Memory() (*psutil.MemoryInfo, error) { return &mem, nil } -func (u *util) Network() ([]psutil.NetworkInfo, error) { +func (u *mockUtil) Network() ([]psutil.NetworkInfo, error) { return nil, nil } -func (u *util) GPU() ([]psutil.GPUInfo, error) { +func (u *mockUtil) GPU() ([]psutil.GPUInfo, error) { u.lock.Lock() defer u.lock.Unlock() @@ -93,13 +93,13 @@ func (u *util) GPU() ([]psutil.GPUInfo, error) { return gpu, nil } -func (u *util) Process(pid int32) (psutil.Process, error) { - return &process{}, nil +func (u *mockUtil) Process(pid int32) (psutil.Process, error) { + return &mockProcess{}, nil } -type process struct{} +type mockProcess struct{} -func (p *process) CPU() (*psutil.CPUInfo, error) { +func (p *mockProcess) CPU() (*psutil.CPUInfo, error) { s := &psutil.CPUInfo{ System: 1, User: 2, @@ -110,8 +110,8 @@ func (p *process) CPU() (*psutil.CPUInfo, error) { return s, nil } -func (p *process) Memory() (uint64, error) { return 42, nil } -func (p *process) GPU() (*psutil.GPUInfo, error) { +func (p *mockProcess) Memory() (uint64, error) { return 42, nil } +func (p *mockProcess) GPU() (*psutil.GPUInfo, error) { return &psutil.GPUInfo{ Index: 0, Name: "L4", @@ -122,13 +122,13 @@ func (p *process) GPU() (*psutil.GPUInfo, error) { Decoder: 7, }, nil } -func (p *process) Cancel() {} -func (p *process) Suspend() error { return nil } -func (p *process) Resume() error { return nil } +func (p *mockProcess) Cancel() {} +func (p *mockProcess) Suspend() error { return nil } +func (p *mockProcess) Resume() error { return nil } func TestConfigNoLimits(t *testing.T) { _, err := New(Config{ - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), }) require.NoError(t, err) } @@ -137,7 +137,7 @@ func TestConfigWrongLimits(t *testing.T) { _, err := New(Config{ MaxCPU: 102, MaxMemory: 573, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), }) require.Error(t, err) @@ -146,7 +146,7 @@ func TestConfigWrongLimits(t *testing.T) { MaxMemory: 0, MaxGPU: 101, MaxGPUMemory: 103, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), }) require.NoError(t, err) @@ -155,7 +155,7 @@ func TestConfigWrongLimits(t *testing.T) { MaxMemory: 0, MaxGPU: 101, MaxGPUMemory: 103, - PSUtil: newUtil(1), + PSUtil: newMockUtil(1), }) require.Error(t, err) } @@ -164,7 +164,7 @@ func TestMemoryLimit(t *testing.T) { r, err := New(Config{ MaxCPU: 100, MaxMemory: 150. / 200. * 100, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), Logger: nil, }) require.NoError(t, err) @@ -209,7 +209,7 @@ func TestMemoryLimit(t *testing.T) { } func TestMemoryUnlimit(t *testing.T) { - util := newUtil(0) + util := newMockUtil(0) r, err := New(Config{ MaxCPU: 100, @@ -296,7 +296,7 @@ func TestCPULimit(t *testing.T) { r, err := New(Config{ MaxCPU: 50., MaxMemory: 100, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), Logger: nil, }) require.NoError(t, err) @@ -341,7 +341,7 @@ func TestCPULimit(t *testing.T) { } func TestCPUUnlimit(t *testing.T) { - util := newUtil(0) + util := newMockUtil(0) r, err := New(Config{ MaxCPU: 50., @@ -430,7 +430,7 @@ func TestGPULimitMemory(t *testing.T) { MaxMemory: 100, MaxGPU: 100, MaxGPUMemory: 20, - PSUtil: newUtil(2), + PSUtil: newMockUtil(2), Logger: nil, }) require.NoError(t, err) @@ -475,7 +475,7 @@ func TestGPULimitMemory(t *testing.T) { } func TestGPUUnlimitMemory(t *testing.T) { - util := newUtil(2) + util := newMockUtil(2) r, err := New(Config{ MaxCPU: 100, @@ -564,7 +564,7 @@ func TestGPULimitMemorySome(t *testing.T) { MaxMemory: 100, MaxGPU: 100, MaxGPUMemory: 14. / 24. * 100., - PSUtil: newUtil(4), + PSUtil: newMockUtil(4), Logger: nil, }) require.NoError(t, err) @@ -614,7 +614,7 @@ func TestGPULimitUsage(t *testing.T) { MaxMemory: 100, MaxGPU: 40, MaxGPUMemory: 100, - PSUtil: newUtil(3), + PSUtil: newMockUtil(3), Logger: nil, }) require.NoError(t, err) @@ -662,7 +662,7 @@ func TestGPULimitUsage(t *testing.T) { } func TestGPUUnlimitUsage(t *testing.T) { - util := newUtil(3) + util := newMockUtil(3) r, err := New(Config{ MaxCPU: 100, @@ -749,7 +749,7 @@ func TestGPUUnlimitUsage(t *testing.T) { func TestRequestCPU(t *testing.T) { r, err := New(Config{ MaxCPU: 70., - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), }) require.NoError(t, err) @@ -766,7 +766,7 @@ func TestRequestCPU(t *testing.T) { func TestRequestMemory(t *testing.T) { r, err := New(Config{ MaxMemory: 170. / 200. * 100, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), }) require.NoError(t, err) @@ -784,7 +784,7 @@ func TestRequestNoGPU(t *testing.T) { r, err := New(Config{ MaxCPU: 100, MaxMemory: 100, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), }) require.NoError(t, err) @@ -796,7 +796,7 @@ func TestRequestInvalidGPURequest(t *testing.T) { r, err := New(Config{ MaxCPU: 100, MaxMemory: 100, - PSUtil: newUtil(1), + PSUtil: newMockUtil(1), }) require.NoError(t, err) @@ -813,7 +813,7 @@ func TestRequestGPULimitsOneGPU(t *testing.T) { MaxMemory: 100, MaxGPU: 50, MaxGPUMemory: 60, - PSUtil: newUtil(1), + PSUtil: newMockUtil(1), }) require.NoError(t, err) @@ -840,7 +840,7 @@ func TestRequestGPULimitsMoreGPU(t *testing.T) { MaxMemory: 100, MaxGPU: 60, MaxGPUMemory: 60, - PSUtil: newUtil(2), + PSUtil: newMockUtil(2), }) require.NoError(t, err) @@ -856,7 +856,7 @@ func TestHasLimits(t *testing.T) { r, err := New(Config{ MaxCPU: 70., MaxMemory: 170. / 200. * 100, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), Logger: nil, }) require.NoError(t, err) @@ -866,7 +866,7 @@ func TestHasLimits(t *testing.T) { r, err = New(Config{ MaxCPU: 100, MaxMemory: 100, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), Logger: nil, }) require.NoError(t, err) @@ -876,7 +876,7 @@ func TestHasLimits(t *testing.T) { r, err = New(Config{ MaxCPU: 0, MaxMemory: 0, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), Logger: nil, }) require.NoError(t, err) @@ -887,7 +887,7 @@ func TestHasLimits(t *testing.T) { MaxCPU: 0, MaxMemory: 0, MaxGPU: 10, - PSUtil: newUtil(1), + PSUtil: newMockUtil(1), Logger: nil, }) require.NoError(t, err) @@ -898,7 +898,7 @@ func TestHasLimits(t *testing.T) { MaxCPU: 0, MaxMemory: 0, MaxGPU: 10, - PSUtil: newUtil(0), + PSUtil: newMockUtil(0), Logger: nil, }) require.NoError(t, err) @@ -912,7 +912,7 @@ func TestInfo(t *testing.T) { MaxMemory: 90, MaxGPU: 11, MaxGPUMemory: 50, - PSUtil: newUtil(2), + PSUtil: newMockUtil(2), }) require.NoError(t, err) diff --git a/restream/core_test.go b/restream/core_test.go index d1b644ee..1924849b 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -38,13 +38,20 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp return nil, err } + resources, err := resources.New(resources.Config{ + PSUtil: psutil, + }) + if err != nil { + return nil, err + } + ffmpeg, err := ffmpeg.New(ffmpeg.Config{ Binary: binary, LogHistoryLength: 3, Portrange: portrange, ValidatorInput: validatorIn, ValidatorOutput: validatorOut, - PSUtil: psutil, + Resource: resources, }) if err != nil { return nil, err @@ -88,13 +95,6 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp return nil, err } - resources, err := resources.New(resources.Config{ - PSUtil: psutil, - }) - if err != nil { - return nil, err - } - rs, err := New(Config{ FFmpeg: ffmpeg, Replace: replacer,