diff --git a/app/api/api.go b/app/api/api.go index 042354e4..05689098 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -36,8 +36,8 @@ import ( "github.com/datarhei/core/v16/monitor" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/prometheus" - "github.com/datarhei/core/v16/psutil" "github.com/datarhei/core/v16/resources" + "github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/restream" restreamapp "github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/replace" @@ -127,8 +127,6 @@ type api struct { state string undoMaxprocs func() - - process psutil.Process } // ErrConfigReload is an error returned to indicate that a reload of @@ -370,12 +368,18 @@ func (a *api) start(ctx context.Context) error { debug.SetMemoryLimit(math.MaxInt64) } + psutil, err := psutil.New("", nil) + if err != nil { + return fmt.Errorf("failed to initialize psutils: %w", err) + } + resources, err := resources.New(resources.Config{ MaxCPU: cfg.Resources.MaxCPUUsage, MaxMemory: cfg.Resources.MaxMemoryUsage, MaxGPU: cfg.Resources.MaxGPUUsage, MaxGPUMemory: cfg.Resources.MaxGPUMemoryUsage, Logger: a.log.logger.core.WithComponent("Resources"), + PSUtil: psutil, }) if err != nil { return fmt.Errorf("failed to initialize resource manager: %w", err) @@ -509,6 +513,7 @@ func (a *api) start(ctx context.Context) error { ValidatorOutput: validatorOut, Portrange: portrange, Collector: a.sessions.Collector("ffmpeg"), + PSUtil: psutil, }) if err != nil { return fmt.Errorf("unable to create ffmpeg: %w", err) @@ -1230,8 +1235,8 @@ func (a *api) start(ctx context.Context) error { metrics.Register(monitor.NewUptimeCollector()) metrics.Register(monitor.NewCPUCollector(a.resources)) metrics.Register(monitor.NewMemCollector(a.resources)) - metrics.Register(monitor.NewNetCollector()) - metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base"))) + metrics.Register(monitor.NewNetCollector(a.resources)) + metrics.Register(monitor.NewDiskCollector(a.diskfs.Metadata("base"), a.resources)) metrics.Register(monitor.NewFilesystemCollector("diskfs", a.diskfs)) metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs)) for name, fs := range a.s3fs { @@ -1888,11 +1893,6 @@ func (a *api) stop() { a.service = nil } - if a.process != nil { - a.process.Stop() - a.process = nil - } - // Unregister all collectors if a.metrics != nil { a.metrics.UnregisterAll() diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 1b3c96af..5e64fd1e 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -12,6 +12,7 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/process" + "github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/session" ) @@ -63,6 +64,7 @@ type Config struct { ValidatorOutput Validator Portrange net.Portranger Collector session.Collector + PSUtil psutil.Util } type ffmpeg struct { @@ -80,11 +82,19 @@ type ffmpeg struct { states process.States statesLock sync.RWMutex + + psutil psutil.Util } func New(config Config) (FFmpeg, error) { f := &ffmpeg{} + if config.PSUtil == nil { + return nil, fmt.Errorf("psutils required") + } + + f.psutil = config.PSUtil + binary, err := exec.LookPath(config.Binary) if err != nil { return nil, fmt.Errorf("invalid ffmpeg binary given: %w", err) @@ -184,6 +194,7 @@ func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { config.OnStateChange(from, to) } }, + PSUtil: f.psutil, }) return ffmpeg, err diff --git a/http/api/process.go b/http/api/process.go index 43a9fce9..c7cae209 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -155,13 +155,13 @@ type ProcessConfigIOCleanup struct { } type ProcessConfigLimits struct { - CPU float64 `json:"cpu_usage" jsonschema:"minimum=0"` - Memory uint64 `json:"memory_mbytes" jsonschema:"minimum=0" format:"uint64"` - GPUUsage float64 `json:"gpu_usage" jsonschema:"minimum=0"` - GPUEncoder float64 `json:"gpu_encoder" jsonschema:"minimum=0"` - GPUDecoder float64 `json:"gpu_decoder" jsonschema:"minimum=0"` - GPUMemory uint64 `json:"gpu_memory_mbytes" jsonschema:"minimum=0" format:"uint64"` - WaitFor uint64 `json:"waitfor_seconds" jsonschema:"minimum=0" format:"uint64"` + CPU float64 `json:"cpu_usage" jsonschema:"minimum=0"` // percent 0-100*ncpu + Memory uint64 `json:"memory_mbytes" jsonschema:"minimum=0" format:"uint64"` // megabytes + GPUUsage float64 `json:"gpu_usage" jsonschema:"minimum=0"` // percent 0-100 + GPUEncoder float64 `json:"gpu_encoder" jsonschema:"minimum=0"` // percent 0-100 + GPUDecoder float64 `json:"gpu_decoder" jsonschema:"minimum=0"` // percent 0-100 + GPUMemory uint64 `json:"gpu_memory_mbytes" jsonschema:"minimum=0" format:"uint64"` // megabytes + WaitFor uint64 `json:"waitfor_seconds" jsonschema:"minimum=0" format:"uint64"` // seconds } // ProcessConfig represents the configuration of an ffmpeg process diff --git a/http/mock/mock.go b/http/mock/mock.go index 51e3b8a0..7487c9af 100644 --- a/http/mock/mock.go +++ b/http/mock/mock.go @@ -17,6 +17,7 @@ import ( "github.com/datarhei/core/v16/http/validator" "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/io/fs" + "github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/restream" jsonstore "github.com/datarhei/core/v16/restream/store/json" @@ -44,10 +45,16 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) { return nil, err } + psutil, err := psutil.New("", nil) + if err != nil { + return nil, err + } + ffmpeg, err := ffmpeg.New(ffmpeg.Config{ Binary: binary, MaxLogLines: 100, LogHistoryLength: 3, + PSUtil: psutil, }) if err != nil { return nil, err diff --git a/monitor/disk.go b/monitor/disk.go index fda2f24d..562d5539 100644 --- a/monitor/disk.go +++ b/monitor/disk.go @@ -2,19 +2,21 @@ package monitor import ( "github.com/datarhei/core/v16/monitor/metric" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources" ) type diskCollector struct { - path string + path string + resources resources.Resources totalDescr *metric.Description usageDescr *metric.Description } -func NewDiskCollector(path string) metric.Collector { +func NewDiskCollector(path string, rsc resources.Resources) metric.Collector { c := &diskCollector{ - path: path, + path: path, + resources: rsc, } c.totalDescr = metric.NewDesc("disk_total", "Total size of the disk in bytes", []string{"path"}) @@ -37,7 +39,7 @@ func (c *diskCollector) Describe() []*metric.Description { func (c *diskCollector) Collect() metric.Metrics { metrics := metric.NewMetrics() - stat, err := psutil.Disk(c.path) + stat, err := c.resources.Disk(c.path) if err != nil { return metrics } diff --git a/monitor/net.go b/monitor/net.go index 270e0948..f961aa1e 100644 --- a/monitor/net.go +++ b/monitor/net.go @@ -2,16 +2,20 @@ package monitor import ( "github.com/datarhei/core/v16/monitor/metric" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources" ) type netCollector struct { rxDescr *metric.Description txDescr *metric.Description + + resources resources.Resources } -func NewNetCollector() metric.Collector { - c := &netCollector{} +func NewNetCollector(rsc resources.Resources) metric.Collector { + c := &netCollector{ + resources: rsc, + } c.rxDescr = metric.NewDesc("net_rx", "Number of received bytes", []string{"interface"}) c.txDescr = metric.NewDesc("net_tx", "Number of transmitted bytes", []string{"interface"}) @@ -33,7 +37,7 @@ func (c *netCollector) Describe() []*metric.Description { func (c *netCollector) Collect() metric.Metrics { metrics := metric.NewMetrics() - devs, err := psutil.Network() + devs, err := c.resources.Network() if err != nil { return metrics } diff --git a/process/limiter.go b/process/limiter.go index 699294dc..cd9cc1ec 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -7,7 +7,7 @@ import ( "time" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources/psutil" ) type Usage struct { @@ -257,7 +257,7 @@ type limiter struct { } // NewLimiter returns a new Limiter -func NewLimiter(config LimiterConfig) Limiter { +func NewLimiter(config LimiterConfig) (Limiter, error) { l := &limiter{ waitFor: config.WaitFor, onLimit: config.OnLimit, @@ -278,7 +278,7 @@ func NewLimiter(config LimiterConfig) Limiter { } if l.psutil == nil { - l.psutil = psutil.DefaultUtil + return nil, fmt.Errorf("no psutil provided") } if ncpu, err := l.psutil.CPUCounts(); err != nil { @@ -318,7 +318,7 @@ func NewLimiter(config LimiterConfig) Limiter { "mode": mode, }) - return l + return l, nil } func (l *limiter) reset() { diff --git a/process/limiter_test.go b/process/limiter_test.go index 0ec98333..535b8ccd 100644 --- a/process/limiter_test.go +++ b/process/limiter_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources/psutil" "github.com/stretchr/testify/require" ) @@ -52,11 +52,12 @@ func TestCPULimit(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ CPU: 42, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -88,12 +89,13 @@ func TestCPULimitWaitFor(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ CPU: 42, WaitFor: 3 * time.Second, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -125,11 +127,12 @@ func TestMemoryLimit(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ Memory: 42, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -161,12 +164,13 @@ func TestMemoryLimitWaitFor(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ Memory: 42, WaitFor: 3 * time.Second, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -198,11 +202,12 @@ func TestGPUMemoryLimit(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ GPUMemory: 42, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -234,12 +239,13 @@ func TestGPUMemoryLimitWaitFor(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ GPUMemory: 42, WaitFor: 3 * time.Second, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -271,12 +277,13 @@ func TestMemoryLimitSoftMode(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ Memory: 42, Mode: LimitModeSoft, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) @@ -310,12 +317,13 @@ func TestGPUMemoryLimitSoftMode(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - l := NewLimiter(LimiterConfig{ + l, _ := NewLimiter(LimiterConfig{ GPUMemory: 42, Mode: LimitModeSoft, OnLimit: func(float64, uint64, float64, float64, float64, uint64) { wg.Done() }, + PSUtil: newPSUtil(), }) l.Start(&psproc{}) diff --git a/process/process.go b/process/process.go index 916ca5c9..0d5b50ca 100644 --- a/process/process.go +++ b/process/process.go @@ -18,7 +18,7 @@ import ( "unicode/utf8" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources/psutil" ) // Process represents a process and ways to control it @@ -71,6 +71,7 @@ type Config struct { OnStart func() // A callback which is called after the process started. OnExit func(state string) // A callback which is called after the process exited with the exit state. OnStateChange func(from, to string) // A callback which is called after a state changed. + PSUtil psutil.Util Logger log.Logger } @@ -244,6 +245,7 @@ type process struct { } limits Limiter scheduler Scheduler + psutil psutil.Util } var _ Process = &process{} @@ -257,6 +259,11 @@ func New(config Config) (Process, error) { parser: config.Parser, logger: config.Logger, scheduler: config.Scheduler, + psutil: config.PSUtil, + } + + if p.psutil == nil { + return nil, fmt.Errorf("no psutils given") } p.args = make([]string, len(config.Args)) @@ -269,6 +276,10 @@ func New(config Config) (Process, error) { return nil, fmt.Errorf("no valid binary given") } + if p.psutil == nil { + return nil, fmt.Errorf("no psutils provided") + } + if p.parser == nil { p.parser = NewNullParser() } @@ -297,7 +308,7 @@ func New(config Config) (Process, error) { p.callbacks.onExit = config.OnExit p.callbacks.onStateChange = config.OnStateChange - p.limits = NewLimiter(LimiterConfig{ + limits, err := NewLimiter(LimiterConfig{ CPU: config.LimitCPU, Memory: config.LimitMemory, GPUUsage: config.LimitGPUUsage, @@ -322,7 +333,12 @@ func New(config Config) (Process, error) { }).Warn().Log("Killed because limits are exceeded") p.Kill(false, fmt.Sprintf("Killed because limits are exceeded (mode: %s, tolerance: %s): %.2f (%.2f) CPU, %d (%d) bytes memory, %.2f/%.2f/%.2f (%.2f) GPU usage, %d (%d) bytes GPU memory", config.LimitMode.String(), config.LimitDuration.String(), cpu, config.LimitCPU, memory, config.LimitMemory, gpuusage, gpuencoder, gpudecoder, config.LimitGPUUsage, gpumemory, config.LimitGPUMemory)) }, + PSUtil: p.psutil, }) + if err != nil { + return nil, fmt.Errorf("failed to initialize limiter") + } + p.limits = limits p.logger.Info().Log("Created") p.debuglogger.Debug().Log("Created") @@ -703,7 +719,7 @@ func (p *process) start() error { p.pid = int32(p.cmd.Process.Pid) - if proc, err := psutil.NewProcess(p.pid); err == nil { + if proc, err := p.psutil.Process(p.pid); err == nil { p.limits.Start(proc) } diff --git a/process/process_test.go b/process/process_test.go index 6ddba58a..88045b9d 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -10,9 +10,15 @@ import ( "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/math/rand" + "github.com/datarhei/core/v16/resources/psutil" "github.com/stretchr/testify/require" ) +func newPSUtil() psutil.Util { + util, _ := psutil.New("", nil) + return util +} + func TestProcess(t *testing.T) { p, _ := New(Config{ Binary: "sleep", @@ -21,6 +27,7 @@ func TestProcess(t *testing.T) { }, Reconnect: false, StaleTimeout: 0, + PSUtil: newPSUtil(), }) require.Equal(t, "finished", p.Status().State) @@ -59,6 +66,7 @@ func TestReconnectProcess(t *testing.T) { OnExit: func(string) { wg.Done() }, + PSUtil: newPSUtil(), }) p.Start() @@ -104,6 +112,7 @@ func TestStaleProcess(t *testing.T) { }, Reconnect: false, StaleTimeout: 2 * time.Second, + PSUtil: newPSUtil(), }) p.Start() @@ -126,6 +135,7 @@ func TestStaleReconnectProcess(t *testing.T) { Reconnect: true, ReconnectDelay: 2 * time.Second, StaleTimeout: 3 * time.Second, + PSUtil: newPSUtil(), }) p.Start() @@ -156,6 +166,7 @@ func TestNonExistingProcess(t *testing.T) { Reconnect: false, ReconnectDelay: 5 * time.Second, StaleTimeout: 0, + PSUtil: newPSUtil(), }) p.Start() @@ -180,6 +191,7 @@ func TestNonExistingReconnectProcess(t *testing.T) { Reconnect: true, ReconnectDelay: 2 * time.Second, StaleTimeout: 0, + PSUtil: newPSUtil(), }) p.Start() @@ -203,6 +215,7 @@ func TestProcessFailed(t *testing.T) { }, Reconnect: false, StaleTimeout: 0, + PSUtil: newPSUtil(), }) p.Start() @@ -228,6 +241,7 @@ func TestFFmpegWaitStop(t *testing.T) { OnExit: func(state string) { time.Sleep(3 * time.Second) }, + PSUtil: newPSUtil(), }) err = p.Start() @@ -255,6 +269,7 @@ func TestFFmpegKill(t *testing.T) { Args: []string{}, Reconnect: false, StaleTimeout: 0, + PSUtil: newPSUtil(), }) err = p.Start() @@ -280,6 +295,7 @@ func TestProcessForceKill(t *testing.T) { Args: []string{}, Reconnect: false, StaleTimeout: 0, + PSUtil: newPSUtil(), }) err = p.Start() @@ -312,6 +328,7 @@ func TestProcessDuration(t *testing.T) { Binary: binary, Args: []string{}, Timeout: 3 * time.Second, + PSUtil: newPSUtil(), }) require.NoError(t, err) @@ -358,6 +375,7 @@ func TestProcessSchedulePointInTime(t *testing.T) { }, Reconnect: false, Scheduler: s, + PSUtil: newPSUtil(), }) status := p.Status() @@ -399,6 +417,7 @@ func TestProcessSchedulePointInTimeGone(t *testing.T) { }, Reconnect: false, Scheduler: s, + PSUtil: newPSUtil(), }) status := p.Status() @@ -424,6 +443,7 @@ func TestProcessScheduleCron(t *testing.T) { }, Reconnect: false, Scheduler: s, + PSUtil: newPSUtil(), }) status := p.Status() @@ -454,6 +474,7 @@ func TestProcessDelayNoScheduler(t *testing.T) { Binary: "sleep", Reconnect: false, ReconnectDelay: 5 * time.Second, + PSUtil: newPSUtil(), }) px := p.(*process) @@ -470,6 +491,7 @@ func TestProcessDelayNoScheduler(t *testing.T) { Binary: "sleep", Reconnect: true, ReconnectDelay: 5 * time.Second, + PSUtil: newPSUtil(), }) px = p.(*process) @@ -493,6 +515,7 @@ func TestProcessDelaySchedulerNoReconnect(t *testing.T) { Reconnect: false, ReconnectDelay: 1 * time.Second, Scheduler: s, + PSUtil: newPSUtil(), }) px := p.(*process) @@ -514,6 +537,7 @@ func TestProcessDelaySchedulerNoReconnect(t *testing.T) { Reconnect: false, ReconnectDelay: 1 * time.Second, Scheduler: s, + PSUtil: newPSUtil(), }) px = p.(*process) @@ -537,6 +561,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { Reconnect: true, ReconnectDelay: 1 * time.Second, Scheduler: s, + PSUtil: newPSUtil(), }) px := p.(*process) @@ -558,6 +583,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { Reconnect: true, ReconnectDelay: 1 * time.Second, Scheduler: s, + PSUtil: newPSUtil(), }) px = p.(*process) @@ -579,6 +605,7 @@ func TestProcessDelaySchedulerReconnect(t *testing.T) { Reconnect: true, ReconnectDelay: 10 * time.Second, Scheduler: s, + PSUtil: newPSUtil(), }) px = p.(*process) @@ -636,6 +663,7 @@ func TestProcessCallbacks(t *testing.T) { onState = append(onState, from+"/"+to) }, + PSUtil: newPSUtil(), }) require.NoError(t, err) @@ -678,6 +706,7 @@ func TestProcessCallbacksOnBeforeStart(t *testing.T) { OnBeforeStart: func(a []string) ([]string, error) { return a, fmt.Errorf("no, not now") }, + PSUtil: newPSUtil(), }) require.NoError(t, err) diff --git a/psutil/fixtures/cgroup-limited/cpu/cpu.cfs_period_us b/resources/psutil/fixtures/cgroup-limited/cpu/cpu.cfs_period_us similarity index 100% rename from psutil/fixtures/cgroup-limited/cpu/cpu.cfs_period_us rename to resources/psutil/fixtures/cgroup-limited/cpu/cpu.cfs_period_us diff --git a/psutil/fixtures/cgroup-limited/cpu/cpu.cfs_quota_us b/resources/psutil/fixtures/cgroup-limited/cpu/cpu.cfs_quota_us similarity index 100% rename from psutil/fixtures/cgroup-limited/cpu/cpu.cfs_quota_us rename to resources/psutil/fixtures/cgroup-limited/cpu/cpu.cfs_quota_us diff --git a/psutil/fixtures/cgroup-limited/cpuacct/cpuacct.usage b/resources/psutil/fixtures/cgroup-limited/cpuacct/cpuacct.usage similarity index 100% rename from psutil/fixtures/cgroup-limited/cpuacct/cpuacct.usage rename to resources/psutil/fixtures/cgroup-limited/cpuacct/cpuacct.usage diff --git a/psutil/fixtures/cgroup-limited/memory/memory.limit_in_bytes b/resources/psutil/fixtures/cgroup-limited/memory/memory.limit_in_bytes similarity index 100% rename from psutil/fixtures/cgroup-limited/memory/memory.limit_in_bytes rename to resources/psutil/fixtures/cgroup-limited/memory/memory.limit_in_bytes diff --git a/psutil/fixtures/cgroup-limited/memory/memory.usage_in_bytes b/resources/psutil/fixtures/cgroup-limited/memory/memory.usage_in_bytes similarity index 100% rename from psutil/fixtures/cgroup-limited/memory/memory.usage_in_bytes rename to resources/psutil/fixtures/cgroup-limited/memory/memory.usage_in_bytes diff --git a/psutil/fixtures/cgroup/cpu/cpu.cfs_period_us b/resources/psutil/fixtures/cgroup/cpu/cpu.cfs_period_us similarity index 100% rename from psutil/fixtures/cgroup/cpu/cpu.cfs_period_us rename to resources/psutil/fixtures/cgroup/cpu/cpu.cfs_period_us diff --git a/psutil/fixtures/cgroup/cpu/cpu.cfs_quota_us b/resources/psutil/fixtures/cgroup/cpu/cpu.cfs_quota_us similarity index 100% rename from psutil/fixtures/cgroup/cpu/cpu.cfs_quota_us rename to resources/psutil/fixtures/cgroup/cpu/cpu.cfs_quota_us diff --git a/psutil/fixtures/cgroup/cpuacct/cpuacct.usage b/resources/psutil/fixtures/cgroup/cpuacct/cpuacct.usage similarity index 100% rename from psutil/fixtures/cgroup/cpuacct/cpuacct.usage rename to resources/psutil/fixtures/cgroup/cpuacct/cpuacct.usage diff --git a/psutil/fixtures/cgroup/memory/memory.limit_in_bytes b/resources/psutil/fixtures/cgroup/memory/memory.limit_in_bytes similarity index 100% rename from psutil/fixtures/cgroup/memory/memory.limit_in_bytes rename to resources/psutil/fixtures/cgroup/memory/memory.limit_in_bytes diff --git a/psutil/fixtures/cgroup/memory/memory.usage_in_bytes b/resources/psutil/fixtures/cgroup/memory/memory.usage_in_bytes similarity index 100% rename from psutil/fixtures/cgroup/memory/memory.usage_in_bytes rename to resources/psutil/fixtures/cgroup/memory/memory.usage_in_bytes diff --git a/psutil/fixtures/cgroup2-limited/cpu.max b/resources/psutil/fixtures/cgroup2-limited/cpu.max similarity index 100% rename from psutil/fixtures/cgroup2-limited/cpu.max rename to resources/psutil/fixtures/cgroup2-limited/cpu.max diff --git a/psutil/fixtures/cgroup2-limited/cpu.stat b/resources/psutil/fixtures/cgroup2-limited/cpu.stat similarity index 100% rename from psutil/fixtures/cgroup2-limited/cpu.stat rename to resources/psutil/fixtures/cgroup2-limited/cpu.stat diff --git a/psutil/fixtures/cgroup2-limited/memory.current b/resources/psutil/fixtures/cgroup2-limited/memory.current similarity index 100% rename from psutil/fixtures/cgroup2-limited/memory.current rename to resources/psutil/fixtures/cgroup2-limited/memory.current diff --git a/psutil/fixtures/cgroup2-limited/memory.max b/resources/psutil/fixtures/cgroup2-limited/memory.max similarity index 100% rename from psutil/fixtures/cgroup2-limited/memory.max rename to resources/psutil/fixtures/cgroup2-limited/memory.max diff --git a/psutil/fixtures/cgroup2/cpu.max b/resources/psutil/fixtures/cgroup2/cpu.max similarity index 100% rename from psutil/fixtures/cgroup2/cpu.max rename to resources/psutil/fixtures/cgroup2/cpu.max diff --git a/psutil/fixtures/cgroup2/cpu.stat b/resources/psutil/fixtures/cgroup2/cpu.stat similarity index 100% rename from psutil/fixtures/cgroup2/cpu.stat rename to resources/psutil/fixtures/cgroup2/cpu.stat diff --git a/psutil/fixtures/cgroup2/memory.current b/resources/psutil/fixtures/cgroup2/memory.current similarity index 100% rename from psutil/fixtures/cgroup2/memory.current rename to resources/psutil/fixtures/cgroup2/memory.current diff --git a/psutil/fixtures/cgroup2/memory.max b/resources/psutil/fixtures/cgroup2/memory.max similarity index 100% rename from psutil/fixtures/cgroup2/memory.max rename to resources/psutil/fixtures/cgroup2/memory.max diff --git a/psutil/gpu/gpu.go b/resources/psutil/gpu/gpu.go similarity index 70% rename from psutil/gpu/gpu.go rename to resources/psutil/gpu/gpu.go index cb8dcf00..dc7f5634 100644 --- a/psutil/gpu/gpu.go +++ b/resources/psutil/gpu/gpu.go @@ -43,3 +43,14 @@ type GPU interface { } var ErrProcessNotFound = errors.New("process not found") + +type dummy struct{} + +func (d *dummy) Count() (int, error) { return 0, nil } +func (d *dummy) Stats() ([]Stats, error) { return nil, nil } +func (d *dummy) Process(pid int32) (Process, error) { return Process{}, ErrProcessNotFound } +func (d *dummy) Close() {} + +func NewNilGPU() GPU { + return &dummy{} +} diff --git a/psutil/gpu/nvidia/fixtures/process.txt b/resources/psutil/gpu/nvidia/fixtures/process.txt similarity index 100% rename from psutil/gpu/nvidia/fixtures/process.txt rename to resources/psutil/gpu/nvidia/fixtures/process.txt diff --git a/psutil/gpu/nvidia/fixtures/query1.xml b/resources/psutil/gpu/nvidia/fixtures/query1.xml similarity index 100% rename from psutil/gpu/nvidia/fixtures/query1.xml rename to resources/psutil/gpu/nvidia/fixtures/query1.xml diff --git a/psutil/gpu/nvidia/fixtures/query2.xml b/resources/psutil/gpu/nvidia/fixtures/query2.xml similarity index 100% rename from psutil/gpu/nvidia/fixtures/query2.xml rename to resources/psutil/gpu/nvidia/fixtures/query2.xml diff --git a/psutil/gpu/nvidia/fixtures/query3.xml b/resources/psutil/gpu/nvidia/fixtures/query3.xml similarity index 100% rename from psutil/gpu/nvidia/fixtures/query3.xml rename to resources/psutil/gpu/nvidia/fixtures/query3.xml diff --git a/psutil/gpu/nvidia/nvidia.go b/resources/psutil/gpu/nvidia/nvidia.go similarity index 98% rename from psutil/gpu/nvidia/nvidia.go rename to resources/psutil/gpu/nvidia/nvidia.go index 98ad1520..a2aaf92f 100644 --- a/psutil/gpu/nvidia/nvidia.go +++ b/resources/psutil/gpu/nvidia/nvidia.go @@ -12,15 +12,9 @@ import ( "sync" "time" - "github.com/datarhei/core/v16/psutil/gpu" + "github.com/datarhei/core/v16/resources/psutil/gpu" ) -var Default gpu.GPU - -func init() { - Default = New("") -} - type Megabytes uint64 func (m *Megabytes) UnmarshalText(text []byte) error { diff --git a/psutil/gpu/nvidia/nvidia_test.go b/resources/psutil/gpu/nvidia/nvidia_test.go similarity index 96% rename from psutil/gpu/nvidia/nvidia_test.go rename to resources/psutil/gpu/nvidia/nvidia_test.go index 51954eb8..ddc48722 100644 --- a/psutil/gpu/nvidia/nvidia_test.go +++ b/resources/psutil/gpu/nvidia/nvidia_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/datarhei/core/v16/internal/testhelper" - "github.com/datarhei/core/v16/psutil/gpu" + "github.com/datarhei/core/v16/resources/psutil/gpu" "github.com/stretchr/testify/require" ) @@ -280,7 +280,7 @@ func TestWriterProcess(t *testing.T) { } func TestNvidiaGPUCount(t *testing.T) { - binary, err := testhelper.BuildBinary("nvidia-smi", "../../../internal/testhelper") + binary, err := testhelper.BuildBinary("nvidia-smi", "../../../../internal/testhelper") require.NoError(t, err, "Failed to build helper program") nv := New(binary) @@ -299,7 +299,7 @@ func TestNvidiaGPUCount(t *testing.T) { } func TestNvidiaGPUStats(t *testing.T) { - binary, err := testhelper.BuildBinary("nvidia-smi", "../../../internal/testhelper") + binary, err := testhelper.BuildBinary("nvidia-smi", "../../../../internal/testhelper") require.NoError(t, err, "Failed to build helper program") nv := New(binary) @@ -400,7 +400,7 @@ func TestNvidiaGPUStats(t *testing.T) { } func TestNvidiaGPUProcess(t *testing.T) { - binary, err := testhelper.BuildBinary("nvidia-smi", "../../../internal/testhelper") + binary, err := testhelper.BuildBinary("nvidia-smi", "../../../../internal/testhelper") require.NoError(t, err, "Failed to build helper program") nv := New(binary) diff --git a/psutil/process.go b/resources/psutil/process.go similarity index 95% rename from psutil/process.go rename to resources/psutil/process.go index ee17489f..636f85b6 100644 --- a/psutil/process.go +++ b/resources/psutil/process.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/datarhei/core/v16/psutil/gpu/nvidia" + "github.com/datarhei/core/v16/resources/psutil/gpu" psprocess "github.com/shirou/gopsutil/v3/process" ) @@ -46,6 +46,8 @@ type process struct { statPreviousTime time.Time nTicks uint64 memRSS uint64 + + gpu gpu.GPU } func (u *util) Process(pid int32) (Process, error) { @@ -54,6 +56,7 @@ func (u *util) Process(pid int32) (Process, error) { hasCgroup: u.hasCgroup, cpuLimit: u.cpuLimit, ncpu: u.ncpu, + gpu: u.gpu, } proc, err := psprocess.NewProcess(pid) @@ -71,10 +74,6 @@ func (u *util) Process(pid int32) (Process, error) { return p, nil } -func NewProcess(pid int32) (Process, error) { - return DefaultUtil.Process(pid) -} - func (p *process) tickCPU(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -202,7 +201,7 @@ func (p *process) GPU() (*GPUInfo, error) { Index: -1, } - proc, err := nvidia.Default.Process(p.pid) + proc, err := p.gpu.Process(p.pid) if err != nil { return info, nil } diff --git a/psutil/process_linux.go b/resources/psutil/process_linux.go similarity index 100% rename from psutil/process_linux.go rename to resources/psutil/process_linux.go diff --git a/psutil/process_other.go b/resources/psutil/process_other.go similarity index 100% rename from psutil/process_other.go rename to resources/psutil/process_other.go diff --git a/psutil/psutil.go b/resources/psutil/psutil.go similarity index 95% rename from psutil/psutil.go rename to resources/psutil/psutil.go index 079e933d..34507534 100644 --- a/psutil/psutil.go +++ b/resources/psutil/psutil.go @@ -13,7 +13,7 @@ import ( "sync" "time" - "github.com/datarhei/core/v16/psutil/gpu/nvidia" + psutilgpu "github.com/datarhei/core/v16/resources/psutil/gpu" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" @@ -41,12 +41,6 @@ var cgroup2Files = []string{ // https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/sect-cpu-example_usage // https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html -var DefaultUtil Util - -func init() { - DefaultUtil, _ = New("/sys/fs/cgroup") -} - type DiskInfo struct { Path string Fstype string @@ -141,10 +135,16 @@ type util struct { statPreviousTime time.Time nTicks uint64 mem MemoryInfo + + gpu psutilgpu.GPU } // New returns a new util, it will be started automatically -func New(root string) (Util, error) { +func New(root string, gpu psutilgpu.GPU) (Util, error) { + if len(root) == 0 { + root = "/sys/fs/cgroup" + } + u := &util{ root: os.DirFS(root), } @@ -173,6 +173,11 @@ func New(root string) (Util, error) { u.mem = *mem + u.gpu = gpu + if u.gpu == nil { + u.gpu = psutilgpu.NewNilGPU() + } + u.stopOnce.Do(func() {}) u.Start() @@ -353,10 +358,6 @@ func (u *util) CPUCounts() (float64, error) { return float64(ncpu), nil } -func CPUCounts() (float64, error) { - return DefaultUtil.CPUCounts() -} - // cpuTimes returns the current cpu usage times in seconds. func (u *util) cpuTimes() (*cpuTimesStat, error) { if u.hasCgroup && u.cpuLimit > 0 { @@ -439,10 +440,6 @@ func (u *util) CPU() (*CPUInfo, error) { return s, nil } -func CPUPercent() (*CPUInfo, error) { - return DefaultUtil.CPU() -} - func (u *util) cgroupCPUTimes(version int) (*cpuTimesStat, error) { info := &cpuTimesStat{} @@ -494,10 +491,6 @@ func (u *util) Disk(path string) (*DiskInfo, error) { return info, nil } -func Disk(path string) (*DiskInfo, error) { - return DefaultUtil.Disk(path) -} - func (u *util) virtualMemory() (*MemoryInfo, error) { info, err := mem.VirtualMemory() if err != nil { @@ -533,10 +526,6 @@ func (u *util) Memory() (*MemoryInfo, error) { return stat, nil } -func Memory() (*MemoryInfo, error) { - return DefaultUtil.Memory() -} - func (u *util) cgroupVirtualMemory(version int) (*MemoryInfo, error) { info := &MemoryInfo{} @@ -612,10 +601,6 @@ func (u *util) Network() ([]NetworkInfo, error) { return info, nil } -func Network() ([]NetworkInfo, error) { - return DefaultUtil.Network() -} - func (u *util) readFile(path string) ([]string, error) { file, err := u.root.Open(path) if err != nil { @@ -653,7 +638,7 @@ func cpuTotal(c *cpu.TimesStat) float64 { } func (u *util) GPU() ([]GPUInfo, error) { - nvstats, err := nvidia.Default.Stats() + nvstats, err := u.gpu.Stats() if err != nil { return nil, err } @@ -673,7 +658,3 @@ func (u *util) GPU() ([]GPUInfo, error) { return stats, nil } - -func GPU() ([]GPUInfo, error) { - return DefaultUtil.GPU() -} diff --git a/psutil/psutil_test.go b/resources/psutil/psutil_test.go similarity index 98% rename from psutil/psutil_test.go rename to resources/psutil/psutil_test.go index b4aaae9f..ae5b3095 100644 --- a/psutil/psutil_test.go +++ b/resources/psutil/psutil_test.go @@ -8,7 +8,7 @@ import ( ) func getUtil(path string) *util { - u, _ := New(path) + u, _ := New(path, nil) return u.(*util) } diff --git a/resources/resources.go b/resources/resources.go index 5a4043d5..c95f92a7 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -8,7 +8,7 @@ import ( "time" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/slices" ) @@ -18,6 +18,21 @@ type Info struct { GPU GPUInfo } +type DiskInfo struct { + Path string + Fstype string + Total uint64 + Used uint64 + InodesTotal uint64 + InodesUsed uint64 +} + +type NetworkInfo struct { + Name string // interface name + BytesSent uint64 // number of bytes sent + BytesRecv uint64 // number of bytes received +} + type MemoryInfo struct { Total uint64 // bytes Available uint64 // bytes @@ -123,6 +138,9 @@ type Resources interface { // Info returns the current resource usage. Info() Info + + Disk(path string) (*DiskInfo, error) + Network() ([]NetworkInfo, error) } type Config struct { @@ -136,7 +154,11 @@ type Config struct { func New(config Config) (Resources, error) { if config.PSUtil == nil { - config.PSUtil = psutil.DefaultUtil + psutil, err := psutil.New("", nil) + if err != nil { + return nil, fmt.Errorf("unable to initialize psutils: %w", err) + } + config.PSUtil = psutil } gpu, err := config.PSUtil.GPU() @@ -572,3 +594,40 @@ func (r *resources) Info() Info { return i } + +func (r *resources) Disk(path string) (*DiskInfo, error) { + info, err := r.psutil.Disk(path) + if err != nil { + return nil, err + } + + diskinfo := &DiskInfo{ + Path: info.Path, + Fstype: info.Fstype, + Total: info.Total, + Used: info.Used, + InodesTotal: info.InodesTotal, + InodesUsed: info.InodesUsed, + } + + return diskinfo, nil +} + +func (r *resources) Network() ([]NetworkInfo, error) { + netio, err := r.psutil.Network() + if err != nil { + return nil, err + } + + info := []NetworkInfo{} + + for _, io := range netio { + info = append(info, NetworkInfo{ + Name: io.Name, + BytesSent: io.BytesSent, + BytesRecv: io.BytesRecv, + }) + } + + return info, nil +} diff --git a/resources/resources_test.go b/resources/resources_test.go index a1ee4244..84293817 100644 --- a/resources/resources_test.go +++ b/resources/resources_test.go @@ -6,7 +6,7 @@ import ( "testing" "time" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources/psutil" "github.com/stretchr/testify/require" ) diff --git a/restream/core_test.go b/restream/core_test.go index 48d79d89..d1b644ee 100644 --- a/restream/core_test.go +++ b/restream/core_test.go @@ -16,7 +16,8 @@ import ( "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/io/fs" "github.com/datarhei/core/v16/net" - "github.com/datarhei/core/v16/psutil" + "github.com/datarhei/core/v16/resources" + "github.com/datarhei/core/v16/resources/psutil" "github.com/datarhei/core/v16/restream/app" rfs "github.com/datarhei/core/v16/restream/fs" "github.com/datarhei/core/v16/restream/replace" @@ -32,12 +33,18 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp return nil, fmt.Errorf("failed to build helper program: %w", err) } + psutil, err := psutil.New("", nil) + if err != nil { + return nil, err + } + ffmpeg, err := ffmpeg.New(ffmpeg.Config{ Binary: binary, LogHistoryLength: 3, Portrange: portrange, ValidatorInput: validatorIn, ValidatorOutput: validatorOut, + PSUtil: psutil, }) if err != nil { return nil, err @@ -81,11 +88,19 @@ func getDummyRestreamer(portrange net.Portranger, validatorIn, validatorOut ffmp return nil, err } + resources, err := resources.New(resources.Config{ + PSUtil: psutil, + }) + if err != nil { + return nil, err + } + rs, err := New(Config{ FFmpeg: ffmpeg, Replace: replacer, Filesystems: []fs.Filesystem{memfs}, Rewrite: rewriter, + Resources: resources, }) if err != nil { return nil, err @@ -1531,8 +1546,7 @@ func TestProcessLimit(t *testing.T) { status := task.ffmpeg.Status() - ncpu, err := psutil.CPUCounts() - require.NoError(t, err) + ncpu := rs.resources.Info().CPU.NCPU require.Equal(t, ncpu*process.LimitCPU, status.CPU.Limit) require.Equal(t, process.LimitMemory, status.Memory.Limit)