Don't use a map for the process list

This commit is contained in:
Ingo Oppermann
2023-07-24 10:32:51 +02:00
parent 3e842f2b2f
commit bb5b580561
2 changed files with 21 additions and 40 deletions

View File

@@ -37,7 +37,7 @@ type ProxyReader interface {
FindNodeFromProcess(id app.ProcessID) (string, error) FindNodeFromProcess(id app.ProcessID) (string, error)
Resources() map[string]NodeResources Resources() map[string]NodeResources
ListProcesses(ProcessListOptions) map[string][]clientapi.Process ListProcesses(ProcessListOptions) []clientapi.Process
ListProxyProcesses() []Process ListProxyProcesses() []Process
ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error) ProbeProcess(nodeid string, id app.ProcessID) (clientapi.Probe, error)
@@ -367,7 +367,7 @@ type ProcessListOptions struct {
} }
func (p *proxy) ListProxyProcesses() []Process { func (p *proxy) ListProxyProcesses() []Process {
processChan := make(chan Process, 64) processChan := make(chan []Process, 64)
processList := []Process{} processList := []Process{}
wgList := sync.WaitGroup{} wgList := sync.WaitGroup{}
@@ -376,8 +376,8 @@ func (p *proxy) ListProxyProcesses() []Process {
go func() { go func() {
defer wgList.Done() defer wgList.Done()
for process := range processChan { for list := range processChan {
processList = append(processList, process) processList = append(processList, list...)
} }
}() }()
@@ -387,7 +387,7 @@ func (p *proxy) ListProxyProcesses() []Process {
for _, node := range p.nodes { for _, node := range p.nodes {
wg.Add(1) wg.Add(1)
go func(node Node, p chan<- Process) { go func(node Node, p chan<- []Process) {
defer wg.Done() defer wg.Done()
processes, err := node.ProxyProcessList() processes, err := node.ProxyProcessList()
@@ -395,9 +395,7 @@ func (p *proxy) ListProxyProcesses() []Process {
return return
} }
for _, process := range processes { p <- processes
p <- process
}
}(node, processChan) }(node, processChan)
} }
p.nodesLock.RUnlock() p.nodesLock.RUnlock()
@@ -432,14 +430,9 @@ func (p *proxy) FindNodeFromProcess(id app.ProcessID) (string, error) {
return nodeid, nil return nodeid, nil
} }
type processList struct { func (p *proxy) ListProcesses(options ProcessListOptions) []clientapi.Process {
nodeid string processChan := make(chan []clientapi.Process, 64)
processes []clientapi.Process processList := []clientapi.Process{}
}
func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi.Process {
processChan := make(chan processList, 64)
processMap := map[string][]clientapi.Process{}
wgList := sync.WaitGroup{} wgList := sync.WaitGroup{}
wgList.Add(1) wgList.Add(1)
@@ -448,7 +441,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi
defer wgList.Done() defer wgList.Done()
for list := range processChan { for list := range processChan {
processMap[list.nodeid] = list.processes processList = append(processList, list...)
} }
}() }()
@@ -458,7 +451,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi
for _, node := range p.nodes { for _, node := range p.nodes {
wg.Add(1) wg.Add(1)
go func(node Node, p chan<- processList) { go func(node Node, p chan<- []clientapi.Process) {
defer wg.Done() defer wg.Done()
processes, err := node.ProcessList(options) processes, err := node.ProcessList(options)
@@ -466,11 +459,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi
return return
} }
p <- processList{ p <- processes
nodeid: node.About().ID,
processes: processes,
}
}(node, processChan) }(node, processChan)
} }
p.nodesLock.RUnlock() p.nodesLock.RUnlock()
@@ -481,7 +470,7 @@ func (p *proxy) ListProcesses(options ProcessListOptions) map[string][]clientapi
wgList.Wait() wgList.Wait()
return processMap return processList
} }
func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error { func (p *proxy) AddProcess(nodeid string, config *app.Config, metadata map[string]interface{}) error {

View File

@@ -50,7 +50,7 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error {
ownerpattern := util.DefaultQuery(c, "ownerpattern", "") ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
domainpattern := util.DefaultQuery(c, "domainpattern", "") domainpattern := util.DefaultQuery(c, "domainpattern", "")
procsMap := h.proxy.ListProcesses(proxy.ProcessListOptions{ procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
ID: wantids, ID: wantids,
Filter: filter.Slice(), Filter: filter.Slice(),
Domain: domain, Domain: domain,
@@ -64,7 +64,6 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error {
processes := []clientapi.Process{} processes := []clientapi.Process{}
pmap := map[app.ProcessID]struct{}{} pmap := map[app.ProcessID]struct{}{}
for _, procs := range procsMap {
for _, p := range procs { for _, p := range procs {
if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") { if !h.iam.Enforce(ctxuser, domain, "process:"+p.ID, "read") {
continue continue
@@ -73,7 +72,6 @@ func (h *ClusterHandler) GetAllProcesses(c echo.Context) error {
processes = append(processes, p) processes = append(processes, p)
pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{} pmap[app.NewProcessID(p.ID, p.Domain)] = struct{}{}
} }
}
missing := []api.Process{} missing := []api.Process{}
@@ -319,18 +317,12 @@ func (h *ClusterHandler) GetProcess(c echo.Context) error {
return api.Err(http.StatusForbidden, "") return api.Err(http.StatusForbidden, "")
} }
procsMap := h.proxy.ListProcesses(proxy.ProcessListOptions{ procs := h.proxy.ListProcesses(proxy.ProcessListOptions{
ID: []string{id}, ID: []string{id},
Filter: filter.Slice(), Filter: filter.Slice(),
Domain: domain, Domain: domain,
}) })
procs := []clientapi.Process{}
for _, processes := range procsMap {
procs = append(procs, processes...)
}
if len(procs) == 0 { if len(procs) == 0 {
// Check the store in the store for an undeployed process // Check the store in the store for an undeployed process
p, err := h.cluster.GetProcess(app.NewProcessID(id, domain)) p, err := h.cluster.GetProcess(app.NewProcessID(id, domain))