From 55015bcf6f14a9b64336c832c3aab15e377e2587 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Wed, 30 Oct 2024 17:12:29 +0100 Subject: [PATCH] Read out GPU specs at util start --- http/handler/api/about_test.go | 3 +- internal/testhelper/nvidia-smi/nvidia-smi.go | 65 +++++---- .../nvidia/fixtures/process_noprocesses.txt | 3 + resources/psutil/gpu/nvidia/nvidia.go | 124 ++++++++++++++---- resources/psutil/gpu/nvidia/nvidia_test.go | 66 ++++------ 5 files changed, 171 insertions(+), 90 deletions(-) create mode 100644 resources/psutil/gpu/nvidia/fixtures/process_noprocesses.txt diff --git a/http/handler/api/about_test.go b/http/handler/api/about_test.go index 2ac0877a..5ef491fa 100644 --- a/http/handler/api/about_test.go +++ b/http/handler/api/about_test.go @@ -6,6 +6,7 @@ import ( "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/mock" + "github.com/datarhei/core/v16/internal/mock/resources" "github.com/datarhei/core/v16/internal/mock/restream" "github.com/stretchr/testify/require" @@ -20,7 +21,7 @@ func getDummyAboutRouter() (*echo.Echo, error) { return nil, err } - handler := NewAbout(rs, nil, func() []string { return []string{} }) + handler := NewAbout(rs, resources.New(), func() []string { return []string{} }) router.Add("GET", "/", handler.About) diff --git a/internal/testhelper/nvidia-smi/nvidia-smi.go b/internal/testhelper/nvidia-smi/nvidia-smi.go index 36f6a78c..7bab74a3 100644 --- a/internal/testhelper/nvidia-smi/nvidia-smi.go +++ b/internal/testhelper/nvidia-smi/nvidia-smi.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/signal" + "slices" "time" ) @@ -931,41 +932,53 @@ func main() { } ctx, cancel := context.WithCancel(context.Background()) + wait := false if os.Args[1] == "pmon" { - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() + if slices.Contains(os.Args[1:], "-c") { + fmt.Fprintf(os.Stdout, "%s\n", pmondata) + } else { + go func(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - fmt.Fprintf(os.Stdout, "%s\n", pmondata) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + fmt.Fprintf(os.Stdout, "%s\n", pmondata) + } } - } - }(ctx) + }(ctx) + } } else { - go func(ctx context.Context) { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() + if !slices.Contains(os.Args[1:], "-l") { + fmt.Fprintf(os.Stdout, "%s\n", querydata) + } else { + wait = true + go func(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - fmt.Fprintf(os.Stdout, "%s\n", querydata) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + fmt.Fprintf(os.Stdout, "%s\n", querydata) + } } - } - }(ctx) + }(ctx) + } } - // Wait for interrupt signal to gracefully shutdown the app - quit := make(chan os.Signal, 1) - signal.Notify(quit, os.Interrupt) - <-quit + if wait { + // Wait for interrupt signal to gracefully shutdown the app + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt) + <-quit + } cancel() diff --git a/resources/psutil/gpu/nvidia/fixtures/process_noprocesses.txt b/resources/psutil/gpu/nvidia/fixtures/process_noprocesses.txt new file mode 100644 index 00000000..7d9f98ed --- /dev/null +++ b/resources/psutil/gpu/nvidia/fixtures/process_noprocesses.txt @@ -0,0 +1,3 @@ +# gpu pid type sm mem enc dec fb command +# Idx # C/G % % % % MB name + 0 - - - - - - - - \ No newline at end of file diff --git a/resources/psutil/gpu/nvidia/nvidia.go b/resources/psutil/gpu/nvidia/nvidia.go index a2aaf92f..2618023a 100644 --- a/resources/psutil/gpu/nvidia/nvidia.go +++ b/resources/psutil/gpu/nvidia/nvidia.go @@ -114,7 +114,7 @@ func (w *writerQuery) Write(data []byte) (int, error) { break } - s, err := w.parse(content) + s, err := parseQuery(content) if err != nil { continue } @@ -125,7 +125,7 @@ func (w *writerQuery) Write(data []byte) (int, error) { return n, nil } -func (w *writerQuery) parse(data []byte) (Stats, error) { +func parseQuery(data []byte) (Stats, error) { nv := Stats{} err := xml.Unmarshal(data, &nv) @@ -139,7 +139,6 @@ func (w *writerQuery) parse(data []byte) (Stats, error) { type writerProcess struct { buf bytes.Buffer ch chan Process - re *regexp.Regexp terminator []byte } @@ -161,7 +160,7 @@ func (w *writerProcess) Write(data []byte) (int, error) { break } - s, err := w.parse(content) + s, err := parseProcess(content) if err != nil { continue } @@ -172,7 +171,19 @@ func (w *writerProcess) Write(data []byte) (int, error) { return n, nil } -func (w *writerProcess) parse(data []byte) (Process, error) { +const processMatcher = `^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*` + +// # gpu pid type sm mem enc dec fb command +// # Idx # C/G % % % % MB name +// +// 0 7372 C 2 0 2 - 136 ffmpeg +// 0 12176 C 5 2 3 7 782 ffmpeg +// 0 20035 C 8 2 4 1 1145 ffmpeg +// 0 20141 C 2 1 1 3 429 ffmpeg +// 0 29591 C 2 1 - 2 435 ffmpeg +var reProcessMatcher = regexp.MustCompile(processMatcher) + +func parseProcess(data []byte) (Process, error) { p := Process{} if len(data) == 0 { @@ -183,7 +194,7 @@ func (w *writerProcess) parse(data []byte) (Process, error) { return p, fmt.Errorf("comment") } - matches := w.re.FindStringSubmatch(string(data)) + matches := reProcessMatcher.FindStringSubmatch(string(data)) if matches == nil { return p, fmt.Errorf("no matches found") } @@ -236,31 +247,38 @@ func New(path string) gpu.GPU { } n := &nvidia{ - wrQuery: &writerQuery{ - ch: make(chan Stats, 1), - terminator: []byte("\n"), - }, - wrProcess: &writerProcess{ - ch: make(chan Process, 32), - // # gpu pid type sm mem enc dec fb command - // # Idx # C/G % % % % MB name - // 0 7372 C 2 0 2 - 136 ffmpeg - // 0 12176 C 5 2 3 7 782 ffmpeg - // 0 20035 C 8 2 4 1 1145 ffmpeg - // 0 20141 C 2 1 1 3 429 ffmpeg - // 0 29591 C 2 1 - 2 435 ffmpeg - re: regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`), - terminator: []byte("\n"), - }, process: map[int32]Process{}, } + stats, err := n.runQueryOnce(path) + if err != nil { + return &dummy{} + } + + n.stats = stats + + process, err := n.runProcessOnce(path) + if err != nil { + return &dummy{} + } + + n.process = process + + n.wrQuery = &writerQuery{ + ch: make(chan Stats, 1), + terminator: []byte("\n"), + } + n.wrProcess = &writerProcess{ + ch: make(chan Process, 32), + terminator: []byte("\n"), + } + ctx, cancel := context.WithCancel(context.Background()) n.cancel = cancel + go n.reader(ctx) go n.runnerQuery(ctx, path) go n.runnerProcess(ctx, path) - go n.reader(ctx) return n } @@ -289,6 +307,32 @@ func (n *nvidia) reader(ctx context.Context) { } } +func (n *nvidia) runQueryOnce(path string) (Stats, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + data := &bytes.Buffer{} + + cmd := exec.CommandContext(ctx, path, "-q", "-x") + cmd.Stdout = data + err := cmd.Start() + if err != nil { + return Stats{}, err + } + + err = cmd.Wait() + if err != nil { + return Stats{}, err + } + + stats, err := parseQuery(data.Bytes()) + if err != nil { + return Stats{}, err + } + + return stats, nil +} + func (n *nvidia) runnerQuery(ctx context.Context, path string) { for { cmd := exec.CommandContext(ctx, path, "-q", "-x", "-l", "1") @@ -317,6 +361,40 @@ func (n *nvidia) runnerQuery(ctx context.Context, path string) { } } +func (n *nvidia) runProcessOnce(path string) (map[int32]Process, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + data := &bytes.Buffer{} + + cmd := exec.CommandContext(ctx, path, "pmon", "-s", "um", "-c", "1") + cmd.Stdout = data + err := cmd.Start() + if err != nil { + return nil, err + } + + err = cmd.Wait() + if err != nil { + return nil, err + } + + lines := bytes.Split(data.Bytes(), []byte{'\n'}) + + process := map[int32]Process{} + + for _, line := range lines { + p, err := parseProcess(line) + if err != nil { + continue + } + + process[p.PID] = p + } + + return process, nil +} + func (n *nvidia) runnerProcess(ctx context.Context, path string) { for { cmd := exec.CommandContext(ctx, path, "pmon", "-s", "um", "-d", "5") diff --git a/resources/psutil/gpu/nvidia/nvidia_test.go b/resources/psutil/gpu/nvidia/nvidia_test.go index 37f18690..cad4d1a0 100644 --- a/resources/psutil/gpu/nvidia/nvidia_test.go +++ b/resources/psutil/gpu/nvidia/nvidia_test.go @@ -3,10 +3,8 @@ package nvidia import ( "bytes" "os" - "regexp" "sync" "testing" - "time" "github.com/datarhei/core/v16/internal/testhelper" "github.com/datarhei/core/v16/resources/psutil/gpu" @@ -17,9 +15,7 @@ func TestParseQuery(t *testing.T) { data, err := os.ReadFile("./fixtures/query1.xml") require.NoError(t, err) - wr := &writerQuery{} - - nv, err := wr.parse(data) + nv, err := parseQuery(data) require.NoError(t, err) require.Equal(t, Stats{ @@ -40,7 +36,7 @@ func TestParseQuery(t *testing.T) { data, err = os.ReadFile("./fixtures/query2.xml") require.NoError(t, err) - nv, err = wr.parse(data) + nv, err = parseQuery(data) require.NoError(t, err) require.Equal(t, Stats{ @@ -71,7 +67,7 @@ func TestParseQuery(t *testing.T) { data, err = os.ReadFile("./fixtures/query3.xml") require.NoError(t, err) - nv, err = wr.parse(data) + nv, err = parseQuery(data) require.NoError(t, err) require.Equal(t, Stats{ @@ -93,15 +89,11 @@ func TestParseProcess(t *testing.T) { data, err := os.ReadFile("./fixtures/process.txt") require.NoError(t, err) - wr := &writerProcess{ - re: regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`), - } - lines := bytes.Split(data, []byte("\n")) process := map[int32]Process{} for _, line := range lines { - p, err := wr.parse(line) + p, err := parseProcess(line) if err != nil { continue } @@ -153,6 +145,25 @@ func TestParseProcess(t *testing.T) { }, process) } +func TestParseProcessNoProcesses(t *testing.T) { + data, err := os.ReadFile("./fixtures/process_noprocesses.txt") + require.NoError(t, err) + + lines := bytes.Split(data, []byte("\n")) + process := map[int32]Process{} + + for _, line := range lines { + p, err := parseProcess(line) + if err != nil { + continue + } + + process[p.PID] = p + } + + require.Equal(t, map[int32]Process{}, process) +} + func TestWriterQuery(t *testing.T) { data, err := os.ReadFile("./fixtures/query2.xml") require.NoError(t, err) @@ -213,7 +224,6 @@ func TestWriterProcess(t *testing.T) { wr := &writerProcess{ ch: make(chan Process, 32), - re: regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`), terminator: []byte("\n"), } @@ -292,10 +302,9 @@ func TestNvidiaGPUCount(t *testing.T) { _, ok := nv.(*dummy) require.False(t, ok) - require.Eventually(t, func() bool { - count, _ := nv.Count() - return count != 0 - }, 5*time.Second, time.Second) + count, err := nv.Count() + require.NoError(t, err) + require.NotEqual(t, 0, count) } func TestNvidiaGPUStats(t *testing.T) { @@ -311,24 +320,6 @@ func TestNvidiaGPUStats(t *testing.T) { _, ok := nv.(*dummy) require.False(t, ok) - require.Eventually(t, func() bool { - stats, _ := nv.Stats() - - if len(stats) != 2 { - return false - } - - if len(stats[0].Process) != 3 { - return false - } - - if len(stats[1].Process) != 2 { - return false - } - - return true - }, 5*time.Second, time.Second) - stats, err := nv.Stats() require.NoError(t, err) require.Equal(t, []gpu.Stats{ @@ -412,11 +403,6 @@ func TestNvidiaGPUProcess(t *testing.T) { _, ok := nv.(*dummy) require.False(t, ok) - require.Eventually(t, func() bool { - _, err := nv.Process(12176) - return err == nil - }, 5*time.Second, time.Second) - proc, err := nv.Process(12176) require.NoError(t, err) require.Equal(t, gpu.Process{