Parallelize reading out process information

This commit is contained in:
Ingo Oppermann
2024-04-30 14:15:40 +02:00
parent 43ae0c149b
commit 3ee4876290
2 changed files with 86 additions and 24 deletions

View File

@@ -2,9 +2,11 @@ package api
import ( import (
"net/http" "net/http"
"runtime"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/api"
@@ -127,40 +129,67 @@ func (h *ProcessHandler) GetAll(c echo.Context) error {
ownerpattern := util.DefaultQuery(c, "ownerpattern", "") ownerpattern := util.DefaultQuery(c, "ownerpattern", "")
domainpattern := util.DefaultQuery(c, "domainpattern", "") domainpattern := util.DefaultQuery(c, "domainpattern", "")
preids := h.restream.GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern) ids := h.restream.GetProcessIDs(idpattern, refpattern, ownerpattern, domainpattern)
ids := []app.ProcessID{}
for _, id := range preids { if len(wantids) != 0 {
if !h.iam.Enforce(ctxuser, domain, "process", id.ID, "read") { filteredIds := []app.ProcessID{}
continue
}
ids = append(ids, id)
}
processes := []api.Process{}
if len(wantids) == 0 || len(reference) != 0 {
for _, id := range ids {
if p, err := h.getProcess(id, filter); err == nil {
if len(reference) != 0 && p.Reference != reference {
continue
}
processes = append(processes, p)
}
}
} else {
for _, id := range ids { for _, id := range ids {
for _, wantid := range wantids { for _, wantid := range wantids {
if wantid == id.ID { if wantid == id.ID {
if p, err := h.getProcess(id, filter); err == nil { filteredIds = append(filteredIds, id)
processes = append(processes, p) break
}
} }
} }
} }
ids = filteredIds
} }
processes := make([]*api.Process, 0, len(ids))
hasReference := len(reference) != 0 && len(wantids) == 0
processesMu := sync.Mutex{}
idChan := make(chan app.ProcessID, len(ids))
wg := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func(idChan <-chan app.ProcessID) {
defer wg.Done()
for id := range idChan {
if !h.iam.Enforce(ctxuser, domain, "process", id.ID, "read") {
continue
}
process, err := h.getProcess(id, filter)
if err != nil {
continue
}
if hasReference && process.Reference != reference {
continue
}
processesMu.Lock()
processes = append(processes, &process)
processesMu.Unlock()
}
}(idChan)
}
for _, id := range ids {
idChan <- id
}
close(idChan)
wg.Wait()
return c.JSON(http.StatusOK, processes) return c.JSON(http.StatusOK, processes)
} }

View File

@@ -740,3 +740,36 @@ func TestMetadata(t *testing.T) {
response = mock.Request(t, http.StatusOK, router, "GET", "/metadata", nil) response = mock.Request(t, http.StatusOK, router, "GET", "/metadata", nil)
require.Equal(t, nil, response.Data) require.Equal(t, nil, response.Data)
} }
func BenchmarkAllProcesses(b *testing.B) {
router, err := getDummyRestreamRouter()
require.NoError(b, err)
data := bytes.Buffer{}
_, err = data.ReadFrom(mock.Read(b, "./fixtures/addProcess.json"))
require.NoError(b, err)
process := api.ProcessConfig{}
err = json.Unmarshal(data.Bytes(), &process)
require.NoError(b, err)
for i := 0; i < 1000; i++ {
process.ID = "test_" + strconv.Itoa(i)
encoded, err := json.Marshal(&process)
require.NoError(b, err)
data.Reset()
_, err = data.Write(encoded)
require.NoError(b, err)
mock.Request(b, http.StatusOK, router, "POST", "/", &data)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
response := mock.RequestEx(b, http.StatusOK, router, "GET", "/", nil, false)
require.Equal(b, response.Code, 200)
}
}