From fbf62bf7e5078a05ca8f1dac942a5715d87c1ea5 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 28 Oct 2024 17:12:31 +0100 Subject: [PATCH] Remove Start() function, rename Stop() to Cancel() --- app/api/api.go | 4 +--- process/limiter.go | 2 +- process/limiter_test.go | 2 +- resources/psutil/process.go | 6 +++--- resources/psutil/psutil.go | 23 +++++++------------- resources/resources.go | 37 +++++++++++++------------------- resources/resources_test.go | 42 +++++++++++-------------------------- 7 files changed, 40 insertions(+), 76 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 05689098..da0496fe 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -385,8 +385,6 @@ func (a *api) start(ctx context.Context) error { return fmt.Errorf("failed to initialize resource manager: %w", err) } - resources.Start() - a.resources = resources if cfg.Sessions.Enable { @@ -1915,7 +1913,7 @@ func (a *api) stop() { // Stop resource observer if a.resources != nil { - a.resources.Stop() + a.resources.Cancel() } // Stop the session tracker diff --git a/process/limiter.go b/process/limiter.go index cd9cc1ec..8bb650dc 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -367,7 +367,7 @@ func (l *limiter) Stop() { l.cancel() - l.proc.Stop() + l.proc.Cancel() l.proc = nil l.reset() diff --git a/process/limiter_test.go b/process/limiter_test.go index 535b8ccd..5d93bb68 100644 --- a/process/limiter_test.go +++ b/process/limiter_test.go @@ -37,7 +37,7 @@ func (p *psproc) GPU() (*psutil.GPUInfo, error) { }, nil } -func (p *psproc) Stop() {} +func (p *psproc) Cancel() {} func (p *psproc) Suspend() error { return nil } func (p *psproc) Resume() error { return nil } diff --git a/resources/psutil/process.go b/resources/psutil/process.go index 636f85b6..bb2f9064 100644 --- a/resources/psutil/process.go +++ b/resources/psutil/process.go @@ -20,8 +20,8 @@ type Process interface { // GPU returns the current GPU memory in bytes and usage in percent (0-100) of this process only. GPU() (*GPUInfo, error) - // Stop will stop collecting CPU and memory data for this process. - Stop() + // Cancel will stop collecting CPU and memory data for this process. + Cancel() // Suspend will send SIGSTOP to the process. Suspend() error @@ -133,7 +133,7 @@ func (p *process) collectMemory() uint64 { return info.RSS } -func (p *process) Stop() { +func (p *process) Cancel() { p.stopTicker() } diff --git a/resources/psutil/psutil.go b/resources/psutil/psutil.go index 34507534..c231792b 100644 --- a/resources/psutil/psutil.go +++ b/resources/psutil/psutil.go @@ -90,8 +90,7 @@ type cpuTimesStat struct { } type Util interface { - Start() - Stop() + Cancel() // CPUCounts returns the number of cores, either logical or physical. CPUCounts() (float64, error) @@ -178,24 +177,18 @@ func New(root string, gpu psutilgpu.GPU) (Util, error) { u.gpu = psutilgpu.NewNilGPU() } - u.stopOnce.Do(func() {}) + ctx, cancel := context.WithCancel(context.Background()) + u.stopTicker = cancel - u.Start() + go u.tickCPU(ctx, time.Second) + go u.tickMemory(ctx, time.Second) + + u.stopOnce = sync.Once{} return u, nil } -func (u *util) Start() { - u.startOnce.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - u.stopTicker = cancel - - go u.tickCPU(ctx, time.Second) - go u.tickMemory(ctx, time.Second) - }) -} - -func (u *util) Stop() { +func (u *util) Cancel() { u.stopOnce.Do(func() { u.stopTicker() diff --git a/resources/resources.go b/resources/resources.go index c95f92a7..2130b1f4 100644 --- a/resources/resources.go +++ b/resources/resources.go @@ -113,16 +113,14 @@ type resources struct { cancelObserver context.CancelFunc - lock sync.RWMutex - startOnce sync.Once - stopOnce sync.Once + lock sync.RWMutex + stopOnce sync.Once logger log.Logger } type Resources interface { - Start() - Stop() + Cancel() // HasLimits returns whether any limits have been set. HasLimits() bool @@ -243,30 +241,23 @@ func New(config Config) (Resources, error) { r.logger.Debug().Log("Created") - r.stopOnce.Do(func() {}) + ctx, cancel := context.WithCancel(context.Background()) + r.cancelObserver = cancel + + go r.observe(ctx, time.Second) + + r.stopOnce = sync.Once{} + + r.logger.Info().Log("Started") return r, nil } -func (r *resources) Start() { - r.startOnce.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - r.cancelObserver = cancel - - go r.observe(ctx, time.Second) - - r.stopOnce = sync.Once{} - - r.logger.Info().Log("Started") - }) -} - -func (r *resources) Stop() { +func (r *resources) Cancel() { r.stopOnce.Do(func() { r.cancelObserver() - r.self.Stop() - - r.startOnce = sync.Once{} + r.psutil.Cancel() + r.self.Cancel() r.logger.Info().Log("Stopped") }) diff --git a/resources/resources_test.go b/resources/resources_test.go index 84293817..4b6b6bae 100644 --- a/resources/resources_test.go +++ b/resources/resources_test.go @@ -49,8 +49,8 @@ func newUtil(ngpu int) *util { return u } -func (u *util) Start() {} -func (u *util) Stop() {} +func (u *util) Start() {} +func (u *util) Cancel() {} func (u *util) CPUCounts() (float64, error) { return 2, nil @@ -122,7 +122,7 @@ func (p *process) GPU() (*psutil.GPUInfo, error) { Decoder: 7, }, nil } -func (p *process) Stop() {} +func (p *process) Cancel() {} func (p *process) Suspend() error { return nil } func (p *process) Resume() error { return nil } @@ -198,8 +198,6 @@ func TestMemoryLimit(t *testing.T) { } }() - r.Start() - wg.Wait() require.True(t, limit) @@ -207,7 +205,7 @@ func TestMemoryLimit(t *testing.T) { _, err = r.Request(Request{CPU: 5, Memory: 10}) require.Error(t, err) - r.Stop() + r.Cancel() } func TestMemoryUnlimit(t *testing.T) { @@ -250,8 +248,6 @@ func TestMemoryUnlimit(t *testing.T) { } }() - r.Start() - wg.Wait() require.True(t, limit) @@ -293,7 +289,7 @@ func TestMemoryUnlimit(t *testing.T) { require.False(t, limit) - r.Stop() + r.Cancel() } func TestCPULimit(t *testing.T) { @@ -334,8 +330,6 @@ func TestCPULimit(t *testing.T) { } }() - r.Start() - wg.Wait() require.True(t, limit) @@ -343,7 +337,7 @@ func TestCPULimit(t *testing.T) { _, err = r.Request(Request{CPU: 5, Memory: 10}) require.Error(t, err) - r.Stop() + r.Cancel() } func TestCPUUnlimit(t *testing.T) { @@ -386,8 +380,6 @@ func TestCPUUnlimit(t *testing.T) { } }() - r.Start() - wg.Wait() require.True(t, limit) @@ -429,7 +421,7 @@ func TestCPUUnlimit(t *testing.T) { require.False(t, limit) - r.Stop() + r.Cancel() } func TestGPULimitMemory(t *testing.T) { @@ -472,8 +464,6 @@ func TestGPULimitMemory(t *testing.T) { } }() - r.Start() - wg.Wait() require.Contains(t, limit, true) @@ -481,7 +471,7 @@ func TestGPULimitMemory(t *testing.T) { _, err = r.Request(Request{CPU: 5, Memory: 10, GPUUsage: 10, GPUMemory: 10}) require.Error(t, err) - r.Stop() + r.Cancel() } func TestGPUUnlimitMemory(t *testing.T) { @@ -526,8 +516,6 @@ func TestGPUUnlimitMemory(t *testing.T) { } }() - r.Start() - wg.Wait() require.Contains(t, limit, true) @@ -567,7 +555,7 @@ func TestGPUUnlimitMemory(t *testing.T) { require.NotContains(t, limit, true) - r.Stop() + r.Cancel() } func TestGPULimitMemorySome(t *testing.T) { @@ -610,8 +598,6 @@ func TestGPULimitMemorySome(t *testing.T) { } }() - r.Start() - wg.Wait() require.Equal(t, []bool{false, false, true, true}, limit) @@ -619,7 +605,7 @@ func TestGPULimitMemorySome(t *testing.T) { _, err = r.Request(Request{CPU: 5, Memory: 10, GPUUsage: 10, GPUMemory: 10}) require.NoError(t, err) - r.Stop() + r.Cancel() } func TestGPULimitUsage(t *testing.T) { @@ -662,8 +648,6 @@ func TestGPULimitUsage(t *testing.T) { } }() - r.Start() - wg.Wait() require.Equal(t, []bool{true, false, false}, limit) @@ -674,7 +658,7 @@ func TestGPULimitUsage(t *testing.T) { _, err = r.Request(Request{CPU: 5, Memory: 10, GPUEncoder: 10, GPUMemory: 10}) require.NoError(t, err) - r.Stop() + r.Cancel() } func TestGPUUnlimitUsage(t *testing.T) { @@ -719,8 +703,6 @@ func TestGPUUnlimitUsage(t *testing.T) { } }() - r.Start() - wg.Wait() require.Equal(t, []bool{true, false, false}, limit) @@ -761,7 +743,7 @@ func TestGPUUnlimitUsage(t *testing.T) { require.Equal(t, []bool{false, false, false}, limit) - r.Stop() + r.Cancel() } func TestRequestCPU(t *testing.T) {