Read out GPU specs at util start

This commit is contained in:
Ingo Oppermann
2024-10-30 17:12:29 +01:00
parent 22a94e1089
commit 55015bcf6f
5 changed files with 171 additions and 90 deletions

View File

@@ -6,6 +6,7 @@ import (
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/mock"
"github.com/datarhei/core/v16/internal/mock/resources"
"github.com/datarhei/core/v16/internal/mock/restream"
"github.com/stretchr/testify/require"
@@ -20,7 +21,7 @@ func getDummyAboutRouter() (*echo.Echo, error) {
return nil, err
}
handler := NewAbout(rs, nil, func() []string { return []string{} })
handler := NewAbout(rs, resources.New(), func() []string { return []string{} })
router.Add("GET", "/", handler.About)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"os/signal"
"slices"
"time"
)
@@ -931,41 +932,53 @@ func main() {
}
ctx, cancel := context.WithCancel(context.Background())
wait := false
if os.Args[1] == "pmon" {
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
if slices.Contains(os.Args[1:], "-c") {
fmt.Fprintf(os.Stdout, "%s\n", pmondata)
} else {
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Fprintf(os.Stdout, "%s\n", pmondata)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Fprintf(os.Stdout, "%s\n", pmondata)
}
}
}
}(ctx)
}(ctx)
}
} else {
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
if !slices.Contains(os.Args[1:], "-l") {
fmt.Fprintf(os.Stdout, "%s\n", querydata)
} else {
wait = true
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Fprintf(os.Stdout, "%s\n", querydata)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Fprintf(os.Stdout, "%s\n", querydata)
}
}
}
}(ctx)
}(ctx)
}
}
// Wait for interrupt signal to gracefully shutdown the app
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
if wait {
// Wait for interrupt signal to gracefully shutdown the app
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
}
cancel()

View File

@@ -0,0 +1,3 @@
# gpu pid type sm mem enc dec fb command
# Idx # C/G % % % % MB name
0 - - - - - - - -

View File

@@ -114,7 +114,7 @@ func (w *writerQuery) Write(data []byte) (int, error) {
break
}
s, err := w.parse(content)
s, err := parseQuery(content)
if err != nil {
continue
}
@@ -125,7 +125,7 @@ func (w *writerQuery) Write(data []byte) (int, error) {
return n, nil
}
func (w *writerQuery) parse(data []byte) (Stats, error) {
func parseQuery(data []byte) (Stats, error) {
nv := Stats{}
err := xml.Unmarshal(data, &nv)
@@ -139,7 +139,6 @@ func (w *writerQuery) parse(data []byte) (Stats, error) {
type writerProcess struct {
buf bytes.Buffer
ch chan Process
re *regexp.Regexp
terminator []byte
}
@@ -161,7 +160,7 @@ func (w *writerProcess) Write(data []byte) (int, error) {
break
}
s, err := w.parse(content)
s, err := parseProcess(content)
if err != nil {
continue
}
@@ -172,7 +171,19 @@ func (w *writerProcess) Write(data []byte) (int, error) {
return n, nil
}
func (w *writerProcess) parse(data []byte) (Process, error) {
const processMatcher = `^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`
// # gpu pid type sm mem enc dec fb command
// # Idx # C/G % % % % MB name
//
// 0 7372 C 2 0 2 - 136 ffmpeg
// 0 12176 C 5 2 3 7 782 ffmpeg
// 0 20035 C 8 2 4 1 1145 ffmpeg
// 0 20141 C 2 1 1 3 429 ffmpeg
// 0 29591 C 2 1 - 2 435 ffmpeg
var reProcessMatcher = regexp.MustCompile(processMatcher)
func parseProcess(data []byte) (Process, error) {
p := Process{}
if len(data) == 0 {
@@ -183,7 +194,7 @@ func (w *writerProcess) parse(data []byte) (Process, error) {
return p, fmt.Errorf("comment")
}
matches := w.re.FindStringSubmatch(string(data))
matches := reProcessMatcher.FindStringSubmatch(string(data))
if matches == nil {
return p, fmt.Errorf("no matches found")
}
@@ -236,31 +247,38 @@ func New(path string) gpu.GPU {
}
n := &nvidia{
wrQuery: &writerQuery{
ch: make(chan Stats, 1),
terminator: []byte("</nvidia_smi_log>\n"),
},
wrProcess: &writerProcess{
ch: make(chan Process, 32),
// # gpu pid type sm mem enc dec fb command
// # Idx # C/G % % % % MB name
// 0 7372 C 2 0 2 - 136 ffmpeg
// 0 12176 C 5 2 3 7 782 ffmpeg
// 0 20035 C 8 2 4 1 1145 ffmpeg
// 0 20141 C 2 1 1 3 429 ffmpeg
// 0 29591 C 2 1 - 2 435 ffmpeg
re: regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`),
terminator: []byte("\n"),
},
process: map[int32]Process{},
}
stats, err := n.runQueryOnce(path)
if err != nil {
return &dummy{}
}
n.stats = stats
process, err := n.runProcessOnce(path)
if err != nil {
return &dummy{}
}
n.process = process
n.wrQuery = &writerQuery{
ch: make(chan Stats, 1),
terminator: []byte("</nvidia_smi_log>\n"),
}
n.wrProcess = &writerProcess{
ch: make(chan Process, 32),
terminator: []byte("\n"),
}
ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel
go n.reader(ctx)
go n.runnerQuery(ctx, path)
go n.runnerProcess(ctx, path)
go n.reader(ctx)
return n
}
@@ -289,6 +307,32 @@ func (n *nvidia) reader(ctx context.Context) {
}
}
func (n *nvidia) runQueryOnce(path string) (Stats, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
data := &bytes.Buffer{}
cmd := exec.CommandContext(ctx, path, "-q", "-x")
cmd.Stdout = data
err := cmd.Start()
if err != nil {
return Stats{}, err
}
err = cmd.Wait()
if err != nil {
return Stats{}, err
}
stats, err := parseQuery(data.Bytes())
if err != nil {
return Stats{}, err
}
return stats, nil
}
func (n *nvidia) runnerQuery(ctx context.Context, path string) {
for {
cmd := exec.CommandContext(ctx, path, "-q", "-x", "-l", "1")
@@ -317,6 +361,40 @@ func (n *nvidia) runnerQuery(ctx context.Context, path string) {
}
}
func (n *nvidia) runProcessOnce(path string) (map[int32]Process, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
data := &bytes.Buffer{}
cmd := exec.CommandContext(ctx, path, "pmon", "-s", "um", "-c", "1")
cmd.Stdout = data
err := cmd.Start()
if err != nil {
return nil, err
}
err = cmd.Wait()
if err != nil {
return nil, err
}
lines := bytes.Split(data.Bytes(), []byte{'\n'})
process := map[int32]Process{}
for _, line := range lines {
p, err := parseProcess(line)
if err != nil {
continue
}
process[p.PID] = p
}
return process, nil
}
func (n *nvidia) runnerProcess(ctx context.Context, path string) {
for {
cmd := exec.CommandContext(ctx, path, "pmon", "-s", "um", "-d", "5")

View File

@@ -3,10 +3,8 @@ package nvidia
import (
"bytes"
"os"
"regexp"
"sync"
"testing"
"time"
"github.com/datarhei/core/v16/internal/testhelper"
"github.com/datarhei/core/v16/resources/psutil/gpu"
@@ -17,9 +15,7 @@ func TestParseQuery(t *testing.T) {
data, err := os.ReadFile("./fixtures/query1.xml")
require.NoError(t, err)
wr := &writerQuery{}
nv, err := wr.parse(data)
nv, err := parseQuery(data)
require.NoError(t, err)
require.Equal(t, Stats{
@@ -40,7 +36,7 @@ func TestParseQuery(t *testing.T) {
data, err = os.ReadFile("./fixtures/query2.xml")
require.NoError(t, err)
nv, err = wr.parse(data)
nv, err = parseQuery(data)
require.NoError(t, err)
require.Equal(t, Stats{
@@ -71,7 +67,7 @@ func TestParseQuery(t *testing.T) {
data, err = os.ReadFile("./fixtures/query3.xml")
require.NoError(t, err)
nv, err = wr.parse(data)
nv, err = parseQuery(data)
require.NoError(t, err)
require.Equal(t, Stats{
@@ -93,15 +89,11 @@ func TestParseProcess(t *testing.T) {
data, err := os.ReadFile("./fixtures/process.txt")
require.NoError(t, err)
wr := &writerProcess{
re: regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`),
}
lines := bytes.Split(data, []byte("\n"))
process := map[int32]Process{}
for _, line := range lines {
p, err := wr.parse(line)
p, err := parseProcess(line)
if err != nil {
continue
}
@@ -153,6 +145,25 @@ func TestParseProcess(t *testing.T) {
}, process)
}
func TestParseProcessNoProcesses(t *testing.T) {
data, err := os.ReadFile("./fixtures/process_noprocesses.txt")
require.NoError(t, err)
lines := bytes.Split(data, []byte("\n"))
process := map[int32]Process{}
for _, line := range lines {
p, err := parseProcess(line)
if err != nil {
continue
}
process[p.PID] = p
}
require.Equal(t, map[int32]Process{}, process)
}
func TestWriterQuery(t *testing.T) {
data, err := os.ReadFile("./fixtures/query2.xml")
require.NoError(t, err)
@@ -213,7 +224,6 @@ func TestWriterProcess(t *testing.T) {
wr := &writerProcess{
ch: make(chan Process, 32),
re: regexp.MustCompile(`^\s*([0-9]+)\s+([0-9]+)\s+[A-Z]\s+([0-9-]+)\s+[0-9-]+\s+([0-9-]+)\s+([0-9-]+)\s+([0-9]+).*`),
terminator: []byte("\n"),
}
@@ -292,10 +302,9 @@ func TestNvidiaGPUCount(t *testing.T) {
_, ok := nv.(*dummy)
require.False(t, ok)
require.Eventually(t, func() bool {
count, _ := nv.Count()
return count != 0
}, 5*time.Second, time.Second)
count, err := nv.Count()
require.NoError(t, err)
require.NotEqual(t, 0, count)
}
func TestNvidiaGPUStats(t *testing.T) {
@@ -311,24 +320,6 @@ func TestNvidiaGPUStats(t *testing.T) {
_, ok := nv.(*dummy)
require.False(t, ok)
require.Eventually(t, func() bool {
stats, _ := nv.Stats()
if len(stats) != 2 {
return false
}
if len(stats[0].Process) != 3 {
return false
}
if len(stats[1].Process) != 2 {
return false
}
return true
}, 5*time.Second, time.Second)
stats, err := nv.Stats()
require.NoError(t, err)
require.Equal(t, []gpu.Stats{
@@ -412,11 +403,6 @@ func TestNvidiaGPUProcess(t *testing.T) {
_, ok := nv.(*dummy)
require.False(t, ok)
require.Eventually(t, func() bool {
_, err := nv.Process(12176)
return err == nil
}, 5*time.Second, time.Second)
proc, err := nv.Process(12176)
require.NoError(t, err)
require.Equal(t, gpu.Process{