From 9e52f19a66b93a541f2738266b3d25d76e01a79b Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 22 Jul 2024 09:25:23 +0200 Subject: [PATCH] Introduce synchronize budget, experimental --- cluster/leader_synchronize.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/cluster/leader_synchronize.go b/cluster/leader_synchronize.go index f618f769..0ddd5ac4 100644 --- a/cluster/leader_synchronize.go +++ b/cluster/leader_synchronize.go @@ -160,6 +160,11 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce } opStack := []interface{}{} + opStackStart := []interface{}{} + opStackDelete := []interface{}{} + opStackUpdate := []interface{}{} + opStackAdd := []interface{}{} + opBudget := 100 // Now we iterate through the processes we actually have running on the nodes // and remove them from the wantMap. We also make sure that they have the correct order. @@ -172,7 +177,7 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce wantP, ok := wantMap[pid] if !ok { // The process is not on the wantMap. Delete it and adjust the resources. - opStack = append(opStack, processOpDelete{ + opStackDelete = append(opStackDelete, processOpDelete{ nodeid: haveP.NodeID, processid: haveP.Config.ProcessID(), }) @@ -185,16 +190,18 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce // The process is on the wantMap. Update the process if the configuration and/or metadata differ. hasConfigChanges := !wantP.Config.Equal(haveP.Config) hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata) - if hasConfigChanges || hasMetadataChanges { + if (hasConfigChanges || hasMetadataChanges) && opBudget > 0 { // TODO: When the required resources increase, should we move this process to a node // that has them available? Otherwise, this node might start throttling. However, this // will result in rebalancing. - opStack = append(opStack, processOpUpdate{ + opStackUpdate = append(opStackUpdate, processOpUpdate{ nodeid: haveP.NodeID, processid: haveP.Config.ProcessID(), config: wantP.Config, metadata: metadata, }) + + opBudget -= 3 } delete(wantMap, pid) @@ -267,10 +274,16 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce } */ - opStack = append(opStack, processOpStart{ + opStackStart = append(opStackStart, processOpStart{ nodeid: nodeid, processid: haveP.Config.ProcessID(), }) + + opBudget -= 3 + + if opBudget <= 0 { + break + } } have = haveAfterRemove @@ -348,14 +361,16 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce } } - if len(nodeid) != 0 { - opStack = append(opStack, processOpAdd{ + if len(nodeid) != 0 && opBudget > 0 { + opStackAdd = append(opStackAdd, processOpAdd{ nodeid: nodeid, config: wantP.Config, metadata: wantP.Metadata, order: wantP.Order, }) + opBudget -= 3 + // Consume the resources resources.Add(nodeid, wantP.Config.LimitCPU, wantP.Config.LimitMemory) @@ -368,9 +383,12 @@ func synchronize(wish map[string]string, want []store.Process, have []node.Proce err: errNotEnoughResourcesForDeployment, }) } - - //break } + opStack = append(opStack, opStackDelete...) + opStack = append(opStack, opStackUpdate...) + opStack = append(opStack, opStackStart...) + opStack = append(opStack, opStackAdd...) + return opStack, resources.Map(), reality }