Use reference affinity when distributing processes

This commit is contained in:
Ingo Oppermann
2023-05-10 19:59:15 +02:00
parent 7a2d0a7ad9
commit 862c36c9e6
2 changed files with 108 additions and 47 deletions

View File

@@ -521,8 +521,6 @@ func normalizeProcessesAndResources(processes []ProcessConfig, resources map[str
// synchronize returns a list of operations in order to adjust the "have" list to the "want" list
// with taking the available resources on each node into account.
func synchronize(want []app.Config, have []ProcessConfig, resources map[string]NodeResources) []interface{} {
opStack := []interface{}{}
normalizeProcessesAndResources(have, resources)
// A map from the process ID to the process config of the processes
@@ -532,11 +530,13 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
wantMap[config.ID] = &config
}
haveAfterRemove := []ProcessConfig{}
opStack := []interface{}{}
// Now we iterate through the processes we actually have running on the nodes
// and remove them from the wantMap. We also make sure that they are running.
// If a process is not on the wantMap, it will be deleted from the nodes.
haveAfterRemove := []ProcessConfig{}
for _, p := range have {
if _, ok := wantMap[p.Config.ID]; !ok {
opStack = append(opStack, processOpDelete{
@@ -569,18 +569,10 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
have = haveAfterRemove
// A map from the process reference to the node it is running on
haveReferenceAffinityMap := map[string]string{}
for _, p := range have {
if len(p.Config.Reference) == 0 {
continue
}
createReferenceAffinityMap(have)
// This is a simplification because a reference could be on several nodes,
// but here take into consideration on which node the reference has been
// seen the last. This is good enough for now.
haveReferenceAffinityMap[p.Config.Reference] = p.NodeID
}
// A map from the process reference to the node it is running on
haveReferenceAffinityMap := createReferenceAffinityMap(have)
// Now all remaining processes in the wantMap must be added to one of the nodes
for _, config := range wantMap {
@@ -597,49 +589,50 @@ func synchronize(want []app.Config, have []ProcessConfig, resources map[string]N
// Check if there are already processes with the same reference, and if so
// chose this node. Then check the node if it has enough resources left. If
// not, then select a node with the most available resources.
reference := config.Reference
nodeid := haveReferenceAffinityMap[reference]
nodeid := ""
if len(nodeid) != 0 {
cpu := config.LimitCPU / resources[nodeid].NCPU
mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal) * 100
// Try to add the process to a node where other processes with the same
// reference currently reside.
if len(config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[config.Reference] {
r := resources[count.nodeid]
cpu := config.LimitCPU / r.NCPU
mem := float64(config.LimitMemory) / r.MemTotal * 100
if resources[nodeid].CPU+cpu < resources[nodeid].CPULimit && resources[nodeid].Mem+mem < resources[nodeid].MemLimit {
opStack = append(opStack, processOpAdd{
nodeid: nodeid,
config: config,
})
continue
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
nodeid = count.nodeid
break
}
}
}
// Find the node with the most resources available
nodeid = ""
for id, r := range resources {
cpu := config.LimitCPU / r.NCPU
mem := float64(config.LimitMemory) / float64(r.MemTotal) * 100
if len(nodeid) == 0 {
for id, r := range resources {
cpu := config.LimitCPU / r.NCPU
mem := float64(config.LimitMemory) / float64(r.MemTotal) * 100
if len(nodeid) == 0 {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
nodeid = id
if len(nodeid) == 0 {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
nodeid = id
}
continue
}
continue
}
if r.CPU+r.Mem < resources[nodeid].CPU+resources[nodeid].Mem {
nodeid = id
}
/*
if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem {
nodeid = id
} else if r.Mem < resources[nodeid].Mem {
nodeid = id
} else if r.CPU < resources[nodeid].CPU {
if r.CPU+r.Mem < resources[nodeid].CPU+resources[nodeid].Mem {
nodeid = id
}
*/
/*
if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem {
nodeid = id
} else if r.Mem < resources[nodeid].Mem {
nodeid = id
} else if r.CPU < resources[nodeid].CPU {
nodeid = id
}
*/
}
}
if len(nodeid) != 0 {
@@ -717,6 +710,8 @@ func createReferenceAffinityMap(processes []ProcessConfig) map[string][]referenc
// rebalance returns a list of operations that will move running processes away from nodes
// that are overloaded.
func rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} {
normalizeProcessesAndResources(have, resources)
// Group the processes by node
processNodeMap := map[string][]ProcessConfig{}

View File

@@ -4,6 +4,7 @@ import (
"testing"
"github.com/datarhei/core/v16/restream/app"
"github.com/stretchr/testify/require"
)
@@ -164,6 +165,71 @@ func TestSynchronizeAdd(t *testing.T) {
}, resources)
}
func TestSynchronizeAddReferenceAffinity(t *testing.T) {
want := []app.Config{
{
ID: "foobar",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
}
have := []ProcessConfig{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar",
Reference: "barfoo",
},
},
}
resources := map[string]NodeResources{
"node1": {
NCPU: 1,
CPU: 1,
Mem: 1,
MemTotal: 4 * 1024 * 1024 * 1024,
CPULimit: 90,
MemLimit: 90,
},
"node2": {
NCPU: 1,
CPU: 1,
Mem: 1,
MemTotal: 4 * 1024 * 1024 * 1024,
CPULimit: 90,
MemLimit: 90,
},
}
stack := synchronize(want, have, resources)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node2",
config: &app.Config{
ID: "foobar2",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
},
}, stack)
}
func TestSynchronizeAddLimit(t *testing.T) {
want := []app.Config{
{
@@ -689,7 +755,7 @@ func TestRebalanceSkip(t *testing.T) {
require.NotEmpty(t, opStack)
require.Equal(t, []interface{}{
require.ElementsMatch(t, []interface{}{
processOpSkip{
nodeid: "node1",
processid: "foobar3",