Fix cluster process update on metadata change

This commit is contained in:
Ingo Oppermann
2023-06-28 16:26:36 +02:00
parent a6d454b03f
commit 2b58c11bb1
16 changed files with 612 additions and 25 deletions

View File

@@ -1,7 +1,9 @@
package cluster
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"sort"
@@ -571,7 +573,7 @@ func (c *cluster) doSynchronize(emergency bool) {
opStack, _, reality := synchronize(wish, want, have, nodesMap, c.nodeRecoverTimeout)
if !emergency {
if !emergency && len(opStack) != 0 {
cmd := &store.Command{
Operation: store.OpSetProcessNodeMap,
Data: store.CommandSetProcessNodeMap{
@@ -606,6 +608,55 @@ func (c *cluster) doRebalance(emergency bool) {
c.applyOpStack(opStack)
}
// isMetadataUpdateRequired compares two metadata. It relies on the documents property that json.Marshal
// sorts the map keys prior encoding.
func isMetadataUpdateRequired(wantMap map[string]interface{}, haveMap map[string]interface{}) (bool, map[string]interface{}) {
hasChanges := false
changeMap := map[string]interface{}{}
haveMapKeys := map[string]struct{}{}
for key := range haveMap {
haveMapKeys[key] = struct{}{}
}
for key, wantMapValue := range wantMap {
haveMapValue, ok := haveMap[key]
if !ok {
// A key in map1 exists, that doesn't exist in map2, we need to update
hasChanges = true
}
// Compare the values
changesData, err := json.Marshal(wantMapValue)
if err != nil {
continue
}
completeData, err := json.Marshal(haveMapValue)
if err != nil {
continue
}
if !bytes.Equal(changesData, completeData) {
// The values are not equal, we need to update
hasChanges = true
}
delete(haveMapKeys, key)
changeMap[key] = wantMapValue
}
for key := range haveMapKeys {
// If there keys in map2 that are not in map1, we have to update
hasChanges = true
changeMap[key] = nil
}
return hasChanges, changeMap
}
// synchronize returns a list of operations in order to adjust the "have" list to the "want" list
// with taking the available resources on each node into account.
func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) {
@@ -654,13 +705,15 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc
continue
} else {
// The process is on the wantMap. Update the process if the configuration differ.
if !wantP.Config.Equal(haveP.Config) {
// The process is on the wantMap. Update the process if the configuration and/or metadata differ.
hasConfigChanges := !wantP.Config.Equal(haveP.Config)
hasMetadataChanges, metadata := isMetadataUpdateRequired(wantP.Metadata, haveP.Metadata)
if hasConfigChanges || hasMetadataChanges {
opStack = append(opStack, processOpUpdate{
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
config: wantP.Config,
metadata: wantP.Metadata,
metadata: metadata,
})
}
}