Add POST /cluster/process/probe endpoint

This commit is contained in:
Ingo Oppermann
2023-08-09 14:10:16 +03:00
parent 15d317a1cd
commit d19010944d
13 changed files with 189 additions and 21 deletions

View File

@@ -32,6 +32,7 @@ type Node interface {
DeleteProcess(id app.ProcessID) error DeleteProcess(id app.ProcessID) error
UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error UpdateProcess(id app.ProcessID, config *app.Config, metadata map[string]interface{}) error
ProbeProcess(id app.ProcessID) (clientapi.Probe, error) ProbeProcess(id app.ProcessID) (clientapi.Probe, error)
ProbeProcessConfig(config *app.Config) (clientapi.Probe, error)
NodeReader NodeReader
} }
@@ -1082,16 +1083,38 @@ func (n *node) UpdateProcess(id app.ProcessID, config *app.Config, metadata map[
func (n *node) ProbeProcess(id app.ProcessID) (clientapi.Probe, error) { func (n *node) ProbeProcess(id app.ProcessID) (clientapi.Probe, error) {
n.peerLock.RLock() n.peerLock.RLock()
defer n.peerLock.RUnlock() peer := n.peer
n.peerLock.RUnlock()
if n.peer == nil { if peer == nil {
probe := clientapi.Probe{ probe := clientapi.Probe{
Log: []string{fmt.Sprintf("the node %s where the process %s resides, is not connected", n.id, id.String())}, Log: []string{fmt.Sprintf("the node %s where the process %s resides, is not connected", n.id, id.String())},
} }
return probe, ErrNoPeer return probe, ErrNoPeer
} }
probe, err := n.peer.ProcessProbe(client.NewProcessID(id.ID, id.Domain)) probe, err := peer.ProcessProbe(client.NewProcessID(id.ID, id.Domain))
probe.Log = append([]string{fmt.Sprintf("probed on node: %s", n.id)}, probe.Log...)
return probe, err
}
func (n *node) ProbeProcessConfig(config *app.Config) (clientapi.Probe, error) {
n.peerLock.RLock()
peer := n.peer
n.peerLock.RUnlock()
if peer == nil {
probe := clientapi.Probe{
Log: []string{fmt.Sprintf("the node %s where the process config should be probed, is not connected", n.id)},
}
return probe, ErrNoPeer
}
cfg := convertConfig(config, nil)
probe, err := peer.ProcessProbeConfig(cfg)
probe.Log = append([]string{fmt.Sprintf("probed on node: %s", n.id)}, probe.Log...) probe.Log = append([]string{fmt.Sprintf("probed on node: %s", n.id)}, probe.Log...)

View File

@@ -35,12 +35,14 @@ type ProxyReader interface {
GetNodeReader(id string) (NodeReader, error) GetNodeReader(id string) (NodeReader, error)
FindNodeFromProcess(id app.ProcessID) (string, error) FindNodeFromProcess(id app.ProcessID) (string, error)
FindNodeFromResources(nodeid string, cpu float64, memory uint64) string
Resources() map[string]NodeResources Resources() map[string]NodeResources
ListProcesses(ProcessListOptions) []clientapi.Process ListProcesses(ProcessListOptions) []clientapi.Process
ListProxyProcesses() []Process ListProxyProcesses() []Process
ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error)
ProbeProcessConfig(nodeid string, config *app.Config) (clientapi.Probe, error)
ListFiles(storage, patter string) []clientapi.FileInfo ListFiles(storage, patter string) []clientapi.FileInfo
@@ -476,6 +478,30 @@ func (p *proxy) FindNodeFromProcess(id app.ProcessID) (string, error) {
return nodeid, nil return nodeid, nil
} }
func (p *proxy) FindNodeFromResources(nodeid string, cpu float64, memory uint64) string {
p.nodesLock.RLock()
defer p.nodesLock.RUnlock()
if len(nodeid) != 0 {
node, ok := p.nodes[nodeid]
if ok {
r := node.Resources()
if r.CPU+cpu <= r.CPULimit && r.Mem+memory <= r.MemLimit && !r.IsThrottling {
return nodeid
}
}
}
for nodeid, node := range p.nodes {
r := node.Resources()
if r.CPU+cpu <= r.CPULimit && r.Mem+memory <= r.MemLimit && !r.IsThrottling {
return nodeid
}
}
return ""
}
func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process { func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
processChan := make(chan []clientapi.Process, 64) processChan := make(chan []clientapi.Process, 64)
processList := []clientapi.Process{} processList := []clientapi.Process{}
@@ -579,3 +605,15 @@ func (p *proxy) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe,
return node.ProbeProcess(id) return node.ProbeProcess(id)
} }
func (p *proxy) ProbeProcessConfig(nodeid string, config *app.Config) (clientapi.Probe, error) {
node, err := p.GetNode(nodeid)
if err != nil {
probe := clientapi.Probe{
Log: []string{fmt.Sprintf("the node %s where the process config should be probed on, doesn't exist", nodeid)},
}
return probe, fmt.Errorf("node not found: %w", err)
}
return node.ProbeProcessConfig(config)
}

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.19.1 github.com/caddyserver/certmagic v0.19.1
github.com/casbin/casbin/v2 v2.73.1 github.com/casbin/casbin/v2 v2.73.1
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230725151052-26252f73d23b github.com/datarhei/core-client-go/v16 v16.11.1-0.20230809104853-391c13f9d400
github.com/datarhei/gosrt v0.5.4 github.com/datarhei/gosrt v0.5.4
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
github.com/fujiwara/shapeio v1.0.0 github.com/fujiwara/shapeio v1.0.0

11
go.sum
View File

@@ -8,6 +8,8 @@ github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/adhocore/gronx v1.6.5 h1:/pryEagBKz3WqUgpgvtL51eBN2rJLXowuW7rpS+jrew= github.com/adhocore/gronx v1.6.5 h1:/pryEagBKz3WqUgpgvtL51eBN2rJLXowuW7rpS+jrew=
github.com/adhocore/gronx v1.6.5/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/adhocore/gronx v1.6.5/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg=
github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8=
@@ -48,6 +50,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230725151052-26252f73d23b h1:b8TWP11Tx9QwZ8lEPoyv9L2hfkH8YaUkAc0bGxRLYmg= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230725151052-26252f73d23b h1:b8TWP11Tx9QwZ8lEPoyv9L2hfkH8YaUkAc0bGxRLYmg=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230725151052-26252f73d23b/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8= github.com/datarhei/core-client-go/v16 v16.11.1-0.20230725151052-26252f73d23b/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230809080523-222b4b4efd5a h1:24dPnxb6eFMzyX2BjP2iZn4q7SxVBN41v5OUDRpXGWY=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230809080523-222b4b4efd5a/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230809104853-391c13f9d400 h1:xk4R13HF2vdG3aXKQ3Bce7hBMDnjBGC+HU4daun4wJI=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230809104853-391c13f9d400/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/gosrt v0.5.4 h1:dE3mmSB+n1GeviGM8xQAW3+UD3mKeFmd84iefDul5Vs= github.com/datarhei/gosrt v0.5.4 h1:dE3mmSB+n1GeviGM8xQAW3+UD3mKeFmd84iefDul5Vs=
github.com/datarhei/gosrt v0.5.4/go.mod h1:MiUCwCG+LzFMzLM/kTA+3wiTtlnkVvGbW/F0XzyhtG8= github.com/datarhei/gosrt v0.5.4/go.mod h1:MiUCwCG+LzFMzLM/kTA+3wiTtlnkVvGbW/F0XzyhtG8=
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo= github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=
@@ -71,6 +77,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
@@ -157,11 +164,13 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -232,6 +241,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@@ -283,6 +293,7 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=

View File

@@ -618,7 +618,7 @@ func (h *ClusterHandler) GetProcessMetadata(c echo.Context) error {
// Probe probes a process in the cluster // Probe probes a process in the cluster
// @Summary Probe a process in the cluster // @Summary Probe a process in the cluster
// @Description Probe an existing process to get a detailed stream information on the inputs. // @Description Probe an existing process to get a detailed stream information on the inputs. The probe is executed on the same node as the process.
// @Tags v16.?.? // @Tags v16.?.?
// @ID cluster-3-process-probe // @ID cluster-3-process-probe
// @Produce json // @Produce json
@@ -654,6 +654,64 @@ func (h *ClusterHandler) ProbeProcess(c echo.Context) error {
return c.JSON(http.StatusOK, probe) return c.JSON(http.StatusOK, probe)
} }
// Probe probes a process in the cluster
// @Summary Probe a process in the cluster
// @Description Probe a process config to get a detailed stream information on the inputs.
// @Tags v16.?.?
// @ID cluster-3-probe-process-config
// @Accept json
// @Produce json
// @Param config body api.ProcessConfig true "Process config"
// @Param coreid query string false "Core to execute the probe on"
// @Success 200 {object} api.Probe
// @Failure 400 {object} api.Error
// @Failure 403 {object} api.Error
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process/probe [post]
func (h *ClusterHandler) ProbeProcessConfig(c echo.Context) error {
ctxuser := util.DefaultContext(c, "user", "")
coreid := util.DefaultQuery(c, "coreid", "")
process := api.ProcessConfig{
ID: shortuuid.New(),
Owner: ctxuser,
Type: "ffmpeg",
Autostart: true,
}
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
if !h.iam.Enforce(ctxuser, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "", "API user %s is not allowed to write this process in domain %s", ctxuser, process.Domain)
}
if process.Type != "ffmpeg" {
return api.Err(http.StatusBadRequest, "", "unsupported process type: supported process types are: ffmpeg")
}
if len(process.Input) == 0 {
return api.Err(http.StatusBadRequest, "", "At least one input must be defined")
}
if process.Limits.CPU <= 0 || process.Limits.Memory == 0 {
return api.Err(http.StatusBadRequest, "", "Resource limit must be defined")
}
config, _ := process.Marshal()
coreid = h.proxy.FindNodeFromResources(coreid, config.LimitCPU, config.LimitMemory)
if len(coreid) == 0 {
return api.Err(http.StatusInternalServerError, "", "Not enough available resources available to execute probe")
}
probe, _ := h.proxy.ProbeProcessConfig(coreid, config)
return c.JSON(http.StatusOK, probe)
}
// Delete deletes the process with the given ID from the cluster // Delete deletes the process with the given ID from the cluster
// @Summary Delete a process by its ID // @Summary Delete a process by its ID
// @Description Delete a process by its ID // @Description Delete a process by its ID

View File

@@ -706,6 +706,10 @@ func (h *RestreamHandler) ProbeConfig(c echo.Context) error {
return api.Err(http.StatusBadRequest, "", "unsupported process type, supported process types are: ffmpeg") return api.Err(http.StatusBadRequest, "", "unsupported process type, supported process types are: ffmpeg")
} }
if len(process.Input) == 0 {
return api.Err(http.StatusBadRequest, "", "At least one input must be defined")
}
config, _ := process.Marshal() config, _ := process.Marshal()
probe := h.restream.Probe(config, 20*time.Second) probe := h.restream.Probe(config, 20*time.Second)

View File

@@ -734,6 +734,7 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.PUT("/cluster/leave", s.v3handler.cluster.Leave) v3.PUT("/cluster/leave", s.v3handler.cluster.Leave)
v3.POST("/cluster/process", s.v3handler.cluster.AddProcess) v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)
v3.POST("/cluster/process/probe", s.v3handler.cluster.ProbeProcessConfig)
v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess) v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess)
v3.GET("/cluster/process/:id/probe", s.v3handler.cluster.ProbeProcess) v3.GET("/cluster/process/:id/probe", s.v3handler.cluster.ProbeProcess)
v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess) v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess)

View File

@@ -2,6 +2,7 @@ package api
type IAMUser struct { type IAMUser struct {
Name string `json:"name"` Name string `json:"name"`
Alias string `json:"alias"`
Superuser bool `json:"superuser"` Superuser bool `json:"superuser"`
Auth IAMUserAuth `json:"auth"` Auth IAMUserAuth `json:"auth"`
Policies []IAMPolicy `json:"policies"` Policies []IAMPolicy `json:"policies"`

View File

@@ -3,8 +3,8 @@ package api
// SessionStats are the accumulated numbers for the session summary // SessionStats are the accumulated numbers for the session summary
type SessionStats struct { type SessionStats struct {
TotalSessions uint64 `json:"sessions" format:"uint64"` TotalSessions uint64 `json:"sessions" format:"uint64"`
TotalRxBytes uint64 `json:"traffic_rx_mb" format:"uint64"` TotalRxBytes uint64 `json:"traffic_rx_mb" format:"uint64"` // megabytes
TotalTxBytes uint64 `json:"traffic_tx_mb" format:"uint64"` TotalTxBytes uint64 `json:"traffic_tx_mb" format:"uint64"` // megabytes
} }
// SessionPeers is for the grouping by peers in the summary // SessionPeers is for the grouping by peers in the summary
@@ -21,7 +21,7 @@ type Session struct {
CreatedAt int64 `json:"created_at" format:"int64"` CreatedAt int64 `json:"created_at" format:"int64"`
Location string `json:"local"` Location string `json:"local"`
Peer string `json:"remote"` Peer string `json:"remote"`
Extra string `json:"extra"` Extra map[string]interface{} `json:"extra"`
RxBytes uint64 `json:"bytes_rx" format:"uint64"` RxBytes uint64 `json:"bytes_rx" format:"uint64"`
TxBytes uint64 `json:"bytes_tx" format:"uint64"` TxBytes uint64 `json:"bytes_tx" format:"uint64"`
RxBitrate float64 `json:"bandwidth_rx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s RxBitrate float64 `json:"bandwidth_rx_kbit" swaggertype:"number" jsonschema:"type=number"` // kbit/s

View File

@@ -88,6 +88,7 @@ type RestClient interface {
ProcessDelete(id ProcessID) error // DELETE /v3/process/{id} ProcessDelete(id ProcessID) error // DELETE /v3/process/{id}
ProcessCommand(id ProcessID, command string) error // PUT /v3/process/{id}/command ProcessCommand(id ProcessID, command string) error // PUT /v3/process/{id}/command
ProcessProbe(id ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe ProcessProbe(id ProcessID) (api.Probe, error) // GET /v3/process/{id}/probe
ProcessProbeConfig(config api.ProcessConfig) (api.Probe, error) // POST /v3/process/probe
ProcessConfig(id ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config ProcessConfig(id ProcessID) (api.ProcessConfig, error) // GET /v3/process/{id}/config
ProcessReport(id ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report ProcessReport(id ProcessID) (api.ProcessReport, error) // GET /v3/process/{id}/report
ProcessState(id ProcessID) (api.ProcessState, error) // GET /v3/process/{id}/state ProcessState(id ProcessID) (api.ProcessState, error) // GET /v3/process/{id}/state
@@ -441,6 +442,10 @@ func New(config Config) (RestClient, error) {
path: mustNewGlob("/v3/events"), path: mustNewGlob("/v3/events"),
constraint: mustNewConstraint("^16.14.0"), constraint: mustNewConstraint("^16.14.0"),
}, },
{
path: mustNewGlob("/v3/process/probe"),
constraint: mustNewConstraint("^16.14.0"),
},
}, },
"PUT": { "PUT": {
{ {

View File

@@ -250,6 +250,29 @@ func (r *restclient) processProbe(where string, id ProcessID) (api.Probe, error)
return p, err return p, err
} }
func (r *restclient) processProbeConfig(where string, config api.ProcessConfig) (api.Probe, error) {
var p api.Probe
path := "/v3/process/probe"
if where == "cluster" {
path = "/v3/cluster/process/probe"
}
var buf bytes.Buffer
e := json.NewEncoder(&buf)
e.Encode(config)
data, err := r.call("POST", path, nil, nil, "application/json", &buf)
if err != nil {
return p, err
}
err = json.Unmarshal(data, &p)
return p, err
}
func (r *restclient) ProcessList(opts ProcessListOptions) ([]api.Process, error) { func (r *restclient) ProcessList(opts ProcessListOptions) ([]api.Process, error) {
return r.processList("", opts) return r.processList("", opts)
} }
@@ -286,6 +309,10 @@ func (r *restclient) ProcessProbe(id ProcessID) (api.Probe, error) {
return r.processProbe("", id) return r.processProbe("", id)
} }
func (r *restclient) ProcessProbeConfig(config api.ProcessConfig) (api.Probe, error) {
return r.processProbeConfig("", config)
}
func (r *restclient) ProcessConfig(id ProcessID) (api.ProcessConfig, error) { func (r *restclient) ProcessConfig(id ProcessID) (api.ProcessConfig, error) {
var p api.ProcessConfig var p api.ProcessConfig

View File

@@ -15,7 +15,7 @@ func (r *restclient) Sessions(collectors []string) (api.SessionsSummary, error)
query := &url.Values{} query := &url.Values{}
query.Set("collectors", strings.Join(collectors, ",")) query.Set("collectors", strings.Join(collectors, ","))
data, err := r.call("GET", "/v3/sessions", query, nil, "", nil) data, err := r.call("GET", "/v3/session", query, nil, "", nil)
if err != nil { if err != nil {
return sessions, err return sessions, err
} }
@@ -31,7 +31,7 @@ func (r *restclient) SessionsActive(collectors []string) (api.SessionsActive, er
query := &url.Values{} query := &url.Values{}
query.Set("collectors", strings.Join(collectors, ",")) query.Set("collectors", strings.Join(collectors, ","))
data, err := r.call("GET", "/v3/sessions/active", query, nil, "", nil) data, err := r.call("GET", "/v3/session/active", query, nil, "", nil)
if err != nil { if err != nil {
return sessions, err return sessions, err
} }
@@ -48,7 +48,7 @@ func (r *restclient) SessionToken(name string, req []api.SessionTokenRequest) ([
e := json.NewEncoder(&buf) e := json.NewEncoder(&buf)
e.Encode(req) e.Encode(req)
data, err := r.call("PUT", "/session/token/"+url.PathEscape(name), nil, nil, "application/json", &buf) data, err := r.call("PUT", "/v3/session/token/"+url.PathEscape(name), nil, nil, "application/json", &buf)
if err != nil { if err != nil {
return tokens, err return tokens, err
} }

2
vendor/modules.txt vendored
View File

@@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
# github.com/cpuguy83/go-md2man/v2 v2.0.2 # github.com/cpuguy83/go-md2man/v2 v2.0.2
## explicit; go 1.11 ## explicit; go 1.11
github.com/cpuguy83/go-md2man/v2/md2man github.com/cpuguy83/go-md2man/v2/md2man
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230725151052-26252f73d23b # github.com/datarhei/core-client-go/v16 v16.11.1-0.20230809104853-391c13f9d400
## explicit; go 1.18 ## explicit; go 1.18
github.com/datarhei/core-client-go/v16 github.com/datarhei/core-client-go/v16
github.com/datarhei/core-client-go/v16/api github.com/datarhei/core-client-go/v16/api