Files
core/resources/resources.go
2025-06-25 16:53:27 +02:00

741 lines
18 KiB
Go

package resources
import (
"context"
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/resources/psutil"
"github.com/datarhei/core/v16/slices"
)
type Info struct {
Mem MemoryInfo
CPU CPUInfo
GPU GPUInfo
}
type DiskInfo struct {
Path string
Fstype string
Total uint64
Used uint64
InodesTotal uint64
InodesUsed uint64
}
type NetworkInfo struct {
Name string // interface name
BytesSent uint64 // number of bytes sent
BytesRecv uint64 // number of bytes received
}
type MemoryInfo struct {
Total uint64 // bytes
Available uint64 // bytes
Used uint64 // bytes
Limit uint64 // bytes
Core uint64 // bytes
Throttling bool
Error error
}
type CPUInfo struct {
NCPU float64 // number of cpus
System float64 // percent 0-100
User float64 // percent 0-100
Idle float64 // percent 0-100
Other float64 // percent 0-100
Limit float64 // percent 0-100
Core float64 // percent 0-100
Throttling bool
Error error
}
type GPUInfo struct {
NGPU float64 // number of gpus
GPU []GPUInfoStat
Error error
}
type GPUInfoStat struct {
Index int
ID string
Name string
// Memory
MemoryTotal uint64 // bytes
MemoryUsed uint64 // bytes
MemoryAvailable uint64 // bytes
MemoryLimit uint64 // bytes
// GPU
Usage float64 // percent 0-100
Encoder float64 // percent 0-100
Decoder float64 // percent 0-100
UsageLimit float64 // percent 0-100
Throttling bool
}
type Request struct {
CPU float64 // percent 0-100*ncpu
Memory uint64 // bytes
GPUUsage float64 // percent 0-100
GPUEncoder float64 // percent 0-100
GPUDecoder float64 // percent 0-100
GPUMemory uint64 // bytes
}
type Response struct {
GPU int // GPU number, hwdevice
}
type resources struct {
psutil psutil.Util
ncpu float64
maxCPU float64 // percent 0-100*ncpu
maxMemory uint64 // bytes
ngpu int
maxGPU float64 // general usage, percent 0-100
maxGPUMemory float64 // memory usage, percent 0-100
isUnlimited bool
isCPULimiting bool
isMemoryLimiting bool
isGPULimiting []bool
self psutil.Process
cancelObserver context.CancelFunc
lock sync.RWMutex
stopOnce sync.Once
logger log.Logger
}
type Resources interface {
Cancel()
// HasLimits returns whether any limits have been set.
HasLimits() bool
// Limits returns the CPU (percent 0-100), memory (bytes) limits, and GPU limits (usage and memory each in percent 0-100).
Limits() (float64, uint64, float64, float64)
// ShouldLimit returns whether cpu, memory, and/or GPU is currently limited.
ShouldLimit() (bool, bool, []bool)
// Request checks whether the requested resources are available.
Request(req Request) (Response, error)
// Info returns the current resource usage.
Info() Info
Disk(path string) (*DiskInfo, error)
Network() ([]NetworkInfo, error)
Process(pid int32) (Process, error)
}
type Config struct {
MaxCPU float64 // percent 0-100
MaxMemory float64 // percent 0-100
MaxGPU float64 // general,encoder,decoder usage, percent 0-100
MaxGPUMemory float64 // memory usage, percent 0-100
PSUtil psutil.Util
Logger log.Logger
}
func New(config Config) (Resources, error) {
if config.PSUtil == nil {
psutil, err := psutil.New("", nil)
if err != nil {
return nil, fmt.Errorf("unable to initialize psutils: %w", err)
}
config.PSUtil = psutil
}
gpu, err := config.PSUtil.GPU()
if err != nil {
return nil, fmt.Errorf("unable to determine number of GPUs: %w", err)
}
if len(gpu) == 0 {
config.MaxGPU = 0
config.MaxGPUMemory = 0
}
isUnlimited := false
if config.MaxCPU <= 0 && config.MaxMemory <= 0 && config.MaxGPU <= 0 && config.MaxGPUMemory <= 0 {
isUnlimited = true
}
if config.MaxCPU <= 0 {
config.MaxCPU = 100
}
if config.MaxMemory <= 0 {
config.MaxMemory = 100
}
if config.MaxGPU <= 0 {
config.MaxGPU = 100
}
if config.MaxGPUMemory <= 0 {
config.MaxGPUMemory = 100
}
if config.MaxCPU > 100 || config.MaxMemory > 100 || config.MaxGPU > 100 || config.MaxGPUMemory > 100 {
return nil, fmt.Errorf("all Max... values must have a range of 0-100")
}
r := &resources{
maxCPU: config.MaxCPU,
maxGPU: config.MaxGPU,
maxGPUMemory: config.MaxGPUMemory,
psutil: config.PSUtil,
isUnlimited: isUnlimited,
ngpu: len(gpu),
isGPULimiting: make([]bool, len(gpu)),
logger: config.Logger,
}
if r.logger == nil {
r.logger = log.New("")
}
vmstat, err := r.psutil.Memory()
if err != nil {
return nil, fmt.Errorf("unable to determine available memory: %w", err)
}
ncpu, err := r.psutil.CPUCounts()
if err != nil {
return nil, fmt.Errorf("unable to determine number of logical CPUs: %w", err)
}
r.ncpu = ncpu
r.maxCPU *= r.ncpu
r.maxMemory = uint64(float64(vmstat.Total) * config.MaxMemory / 100)
r.logger = r.logger.WithFields(log.Fields{
"ncpu": r.ncpu,
"max_cpu": r.maxCPU,
"max_memory": r.maxMemory,
"ngpu": len(gpu),
"max_gpu": r.maxGPU,
"max_gpu_memory": r.maxGPUMemory,
})
r.self, err = r.psutil.Process(int32(os.Getpid()), false)
if err != nil {
return nil, fmt.Errorf("unable to create process observer for self: %w", err)
}
r.logger.Debug().Log("Created")
ctx, cancel := context.WithCancel(context.Background())
r.cancelObserver = cancel
go r.observe(ctx, time.Second)
r.stopOnce = sync.Once{}
r.logger.Info().Log("Started")
return r, nil
}
func (r *resources) Cancel() {
r.stopOnce.Do(func() {
r.cancelObserver()
r.psutil.Cancel()
r.self.Cancel()
r.logger.Info().Log("Stopped")
})
}
func (r *resources) observe(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
r.logger.Debug().Log("Observer started")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if r.isUnlimited {
// If there aren't any limits imposed, don't do anything
continue
}
cpustat, err := r.psutil.CPU()
if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine system CPU usage")
continue
}
cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu
vmstat, err := r.psutil.Memory()
if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine system memory usage")
continue
}
gpustat, err := r.psutil.GPU()
if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine GPU usage")
continue
}
r.logger.Debug().WithFields(log.Fields{
"cur_cpu": cpuload,
"cur_memory": vmstat.Used,
}).Log("Observation")
doCPULimit := false
if !r.isCPULimiting {
if cpuload >= r.maxCPU {
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit reached")
doCPULimit = true
}
} else {
doCPULimit = true
if cpuload < r.maxCPU {
r.logger.Debug().WithField("cpu", cpuload).Log("CPU limit released")
doCPULimit = false
}
}
doMemoryLimit := false
if !r.isMemoryLimiting {
if vmstat.Used >= r.maxMemory {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit reached")
doMemoryLimit = true
}
} else {
doMemoryLimit = true
if vmstat.Used < r.maxMemory {
r.logger.Debug().WithField("memory", vmstat.Used).Log("Memory limit released")
doMemoryLimit = false
}
}
doGPULimit := make([]bool, r.ngpu)
for i, limiting := range r.isGPULimiting {
maxMemory := uint64(r.maxGPUMemory * float64(gpustat[i].MemoryTotal) / 100)
if !limiting {
if gpustat[i].MemoryUsed >= maxMemory || (gpustat[i].Usage >= r.maxGPU && gpustat[i].Encoder >= r.maxGPU && gpustat[i].Decoder >= r.maxGPU) {
doGPULimit[i] = true
}
} else {
doGPULimit[i] = true
if gpustat[i].MemoryUsed < maxMemory && (gpustat[i].Usage < r.maxGPU || gpustat[i].Encoder < r.maxGPU || gpustat[i].Decoder < r.maxGPU) {
doGPULimit[i] = false
}
}
}
r.lock.Lock()
if r.isCPULimiting != doCPULimit {
r.logger.Warn().WithFields(log.Fields{
"enabled": doCPULimit,
}).Log("Limiting CPU")
}
r.isCPULimiting = doCPULimit
if r.isMemoryLimiting != doMemoryLimit {
r.logger.Warn().WithFields(log.Fields{
"enabled": doMemoryLimit,
}).Log("Limiting memory")
}
r.isMemoryLimiting = doMemoryLimit
for i, limiting := range r.isGPULimiting {
if limiting != doGPULimit[i] {
r.logger.Warn().WithFields(log.Fields{
"enabled": doGPULimit,
"index": i,
}).Log("Limiting GPU")
}
}
r.isGPULimiting = doGPULimit
r.lock.Unlock()
}
}
}
func (r *resources) HasLimits() bool {
return !r.isUnlimited
}
func (r *resources) Limits() (float64, uint64, float64, float64) {
return r.maxCPU / r.ncpu, r.maxMemory, r.maxGPU, r.maxGPUMemory
}
func (r *resources) ShouldLimit() (bool, bool, []bool) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.isCPULimiting, r.isMemoryLimiting, slices.Copy(r.isGPULimiting)
}
func (r *resources) Request(req Request) (Response, error) {
res := Response{
GPU: -1,
}
r.lock.RLock()
defer r.lock.RUnlock()
logger := r.logger.WithFields(log.Fields{
"req_cpu": req.CPU,
"req_memory": req.Memory,
"req_gpu": req.GPUUsage,
"req_gpu_encoder": req.GPUEncoder,
"req_gpu_decoder": req.GPUDecoder,
"req_gpu_memory": req.GPUMemory,
})
logger.Debug().Log("Request for acquiring resources")
// Check if anything is currently limiting.
if r.isCPULimiting || r.isMemoryLimiting {
logger.Debug().Log("Rejected, currently limiting")
return res, fmt.Errorf("resources are currenlty actively limited")
}
// Check if the requested resources are valid.
if req.CPU <= 0 || req.Memory == 0 {
logger.Debug().Log("Rejected, invalid values")
return res, fmt.Errorf("the cpu and/or memory values are invalid. values > 0 are required: cpu=%f, memory=%d", req.CPU, req.Memory)
}
// Get current CPU and memory values.
cpustat, err := r.psutil.CPU()
if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine system CPU usage")
return res, fmt.Errorf("the system CPU usage couldn't be determined")
}
cpuload := (cpustat.User + cpustat.System + cpustat.Other) * r.ncpu
vmstat, err := r.psutil.Memory()
if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine system memory usage")
return res, fmt.Errorf("the system memory usage couldn't be determined")
}
// Check if enough resources are available
if cpuload+req.CPU > r.maxCPU {
logger.Debug().WithField("cur_cpu", cpuload).Log("Rejected, CPU limit exceeded")
return res, fmt.Errorf("the CPU limit would be exceeded: %f + %f > %f", cpuload, req.CPU, r.maxCPU)
}
if vmstat.Used+req.Memory > r.maxMemory {
logger.Debug().WithField("cur_memory", vmstat.Used).Log("Rejected, memory limit exceeded")
return res, fmt.Errorf("the memory limit would be exceeded: %d + %d > %d", vmstat.Used, req.Memory, r.maxMemory)
}
// Check if any GPU resources are requested
if req.GPUUsage > 0 || req.GPUEncoder > 0 || req.GPUDecoder > 0 || req.GPUMemory > 0 {
if req.GPUUsage < 0 || req.GPUEncoder < 0 || req.GPUDecoder < 0 || req.GPUMemory == 0 {
logger.Debug().Log("Rejected, invalid values")
return res, fmt.Errorf("the gpu usage and memory values are invalid: usage=%f, encoder=%f, decoder=%f, memory=%d", req.GPUUsage, req.GPUEncoder, req.GPUDecoder, req.GPUMemory)
}
// Get current GPU values
gpustat, err := r.psutil.GPU()
if err != nil {
r.logger.Warn().WithError(err).Log("Failed to determine GPU usage")
return res, fmt.Errorf("the GPU usage couldn't be determined")
}
if len(gpustat) == 0 {
r.logger.Debug().WithError(err).Log("GPU resources requested but no GPU available")
return res, fmt.Errorf("some GPU resources requested but no GPU available")
}
fittingGPU := []psutil.GPUInfo{}
for _, g := range gpustat {
if req.GPUUsage > 0 && g.Usage+req.GPUUsage > r.maxGPU {
logger.Debug().WithFields(log.Fields{"id": g.Index, "cur_gpu": g.Usage}).Log("Rejected, GPU usage limit exceeded")
continue
}
if req.GPUEncoder > 0 && g.Encoder+req.GPUEncoder > r.maxGPU {
logger.Debug().WithFields(log.Fields{"id": g.Index, "cur_gpu_encoder": g.Usage}).Log("Rejected, GPU encoder usage limit exceeded")
continue
}
if req.GPUDecoder > 0 && g.Decoder+req.GPUDecoder > r.maxGPU {
logger.Debug().WithFields(log.Fields{"id": g.Index, "cur_gpu_decoder": g.Usage}).Log("Rejected, GPU decoder usage limit exceeded")
continue
}
gpuMemoryUsage := float64(g.MemoryUsed) / float64(g.MemoryTotal) * 100
requestedGPUMemoryUsage := float64(req.GPUMemory) / float64(g.MemoryTotal) * 100
if gpuMemoryUsage+requestedGPUMemoryUsage > r.maxGPUMemory {
logger.Debug().WithFields(log.Fields{"id": g.Index, "cur_gpu_memory": gpuMemoryUsage}).Log("Rejected, GPU memory usage limit exceeded")
continue
}
fittingGPU = append(fittingGPU, g)
}
if len(fittingGPU) == 0 {
return res, fmt.Errorf("all GPU usage limits are exceeded")
}
sort.SliceStable(fittingGPU, func(a, b int) bool {
loadA := fittingGPU[a].Usage + fittingGPU[a].Encoder + fittingGPU[a].Decoder
loadB := fittingGPU[b].Usage + fittingGPU[b].Encoder + fittingGPU[b].Decoder
return loadA < loadB
})
foundGPU := fittingGPU[0]
logger = logger.Debug().WithFields(log.Fields{
"cur_gpu": foundGPU.Index,
"cur_gpu_general": foundGPU.Usage,
"cur_gpu_encoder": foundGPU.Encoder,
"cur_gpu_decoder": foundGPU.Decoder,
"cur_gpu_memory": float64(foundGPU.MemoryUsed) / float64(foundGPU.MemoryTotal) * 100,
})
res.GPU = foundGPU.Index
}
logger.Debug().WithFields(log.Fields{
"cur_cpu": cpuload,
"cur_memory": vmstat.Used,
}).Log("Acquiring approved")
return res, nil
}
func (r *resources) Info() Info {
cpulimit, memlimit, gpulimit, gpumemlimit := r.Limits()
cputhrottling, memthrottling, gputhrottling := r.ShouldLimit()
cpustat, cpuerr := r.psutil.CPU()
memstat, memerr := r.psutil.Memory()
gpustat, gpuerr := r.psutil.GPU()
selfcpu, _ := r.self.CPU()
selfmem, _ := r.self.Memory()
cpuinfo := CPUInfo{
NCPU: r.ncpu,
System: cpustat.System,
User: cpustat.User,
Idle: cpustat.Idle,
Other: cpustat.Other,
Limit: cpulimit,
Core: selfcpu.System + selfcpu.User + selfcpu.Other,
Throttling: cputhrottling,
Error: cpuerr,
}
meminfo := MemoryInfo{
Total: memstat.Total,
Available: memstat.Available,
Used: memstat.Used,
Limit: memlimit,
Core: selfmem,
Throttling: memthrottling,
Error: memerr,
}
gpuinfo := GPUInfo{
NGPU: float64(len(gpustat)),
Error: gpuerr,
}
for i, g := range gpustat {
gpuinfo.GPU = append(gpuinfo.GPU, GPUInfoStat{
Index: g.Index,
ID: g.ID,
Name: g.Name,
MemoryTotal: g.MemoryTotal,
MemoryUsed: g.MemoryUsed,
MemoryAvailable: g.MemoryTotal - g.MemoryUsed,
MemoryLimit: uint64(float64(g.MemoryTotal) * gpumemlimit / 100),
Usage: g.Usage,
Encoder: g.Encoder,
Decoder: g.Decoder,
UsageLimit: gpulimit,
})
if i < len(gputhrottling) {
gpuinfo.GPU[i].Throttling = gputhrottling[i]
}
}
i := Info{
CPU: cpuinfo,
Mem: meminfo,
GPU: gpuinfo,
}
return i
}
func (r *resources) Disk(path string) (*DiskInfo, error) {
info, err := r.psutil.Disk(path)
if err != nil {
return nil, err
}
diskinfo := &DiskInfo{
Path: info.Path,
Fstype: info.Fstype,
Total: info.Total,
Used: info.Used,
InodesTotal: info.InodesTotal,
InodesUsed: info.InodesUsed,
}
return diskinfo, nil
}
func (r *resources) Network() ([]NetworkInfo, error) {
netio, err := r.psutil.Network()
if err != nil {
return nil, err
}
info := []NetworkInfo{}
for _, io := range netio {
info = append(info, NetworkInfo{
Name: io.Name,
BytesSent: io.BytesSent,
BytesRecv: io.BytesRecv,
})
}
return info, nil
}
func (r *resources) Process(pid int32) (Process, error) {
proc, err := r.psutil.Process(pid, true)
if err != nil {
return nil, err
}
p := &process{
proc: proc,
}
return p, nil
}
type Process interface {
Info() (ProcessInfo, error)
// Cancel will stop collecting CPU and memory data for this process.
Cancel()
// Suspend will send SIGSTOP to the process.
Suspend() error
// Resume will send SIGCONT to the process.
Resume() error
}
type process struct {
proc psutil.Process
}
type ProcessInfoCPU struct {
System float64 // percent 0-100
User float64 // percent 0-100
Idle float64 // percent 0-100
Other float64 // percent 0-100
}
type ProcessInfoGPU struct {
Index int // Index of the GPU
MemoryUsed uint64 // bytes
Usage float64 // percent 0-100
Encoder float64 // percent 0-100
Decoder float64 // percent 0-100
}
type ProcessInfo struct {
CPU ProcessInfoCPU
Memory uint64
GPU ProcessInfoGPU
}
func (p *process) Info() (ProcessInfo, error) {
cpu, err := p.proc.CPU()
if err != nil {
return ProcessInfo{}, err
}
mem, err := p.proc.Memory()
if err != nil {
return ProcessInfo{}, err
}
gpu, err := p.proc.GPU()
if err != nil {
return ProcessInfo{}, err
}
pi := ProcessInfo{
CPU: ProcessInfoCPU{
System: cpu.System,
User: cpu.User,
Idle: cpu.Idle,
Other: cpu.Other,
},
Memory: mem,
GPU: ProcessInfoGPU{
Index: gpu.Index,
MemoryUsed: gpu.MemoryUsed,
Usage: gpu.Usage,
Encoder: gpu.Encoder,
Decoder: gpu.Decoder,
},
}
return pi, nil
}
func (p *process) Cancel() {
p.proc.Cancel()
}
func (p *process) Suspend() error {
return p.proc.Suspend()
}
func (p *process) Resume() error {
return p.proc.Resume()
}