Add EmergencyLeaderTimeout parameter, ignore throttling nodes, implement NodeRecoverTimeout, introduce processNodeMap in cluster DB

This commit is contained in:
Ingo Oppermann
2023-06-07 22:08:07 +02:00
parent bd75a5ad0f
commit 7e7d1caca7
12 changed files with 746 additions and 295 deletions

View File

@@ -480,6 +480,7 @@ func (a *api) start() error {
Peers: peers,
SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second,
NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second,
EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second,
CoreAPIAddress: scheme + gonet.JoinHostPort(host, port),
CoreAPIUsername: cfg.API.Auth.Username,
CoreAPIPassword: cfg.API.Auth.Password,

View File

@@ -82,6 +82,7 @@ type ClusterConfig struct {
SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes
NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes
EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader
CoreAPIAddress string // Address of the core API
CoreAPIUsername string // Username for the core API
@@ -115,6 +116,7 @@ type cluster struct {
syncInterval time.Duration
nodeRecoverTimeout time.Duration
emergencyLeaderTimeout time.Duration
forwarder forwarder.Forwarder
api API
@@ -145,6 +147,7 @@ func New(config ClusterConfig) (Cluster, error) {
syncInterval: config.SyncInterval,
nodeRecoverTimeout: config.NodeRecoverTimeout,
emergencyLeaderTimeout: config.EmergencyLeaderTimeout,
nodes: map[string]proxy.Node{},
}
@@ -971,15 +974,15 @@ func (c *cluster) sentinel() {
c.logger.Debug().WithFields(log.Fields{
"state": stats.State,
"last_contact": stats.LastContact.String(),
"last_contact": stats.LastContact,
"num_peers": stats.NumPeers,
}).Log("Stats")
if stats.LastContact > 10*time.Second && !isEmergencyLeader {
if stats.LastContact > c.emergencyLeaderTimeout && !isEmergencyLeader {
c.logger.Warn().Log("Force leadership due to lost contact to leader")
c.raftEmergencyNotifyCh <- true
isEmergencyLeader = true
} else if stats.LastContact <= 10*time.Second && isEmergencyLeader {
} else if stats.LastContact <= c.emergencyLeaderTimeout && isEmergencyLeader {
c.logger.Warn().Log("Stop forced leadership due to contact to leader")
c.raftEmergencyNotifyCh <- false
isEmergencyLeader = false

View File

@@ -241,7 +241,7 @@ RECONCILE:
// Check if we need to handle initial leadership actions
if !establishedLeader {
if err := c.establishLeadership(context.TODO()); err != nil {
if err := c.establishLeadership(context.TODO(), emergency); err != nil {
c.logger.Error().WithError(err).Log("Establish leadership")
// Immediately revoke leadership since we didn't successfully
// establish leadership.
@@ -286,15 +286,13 @@ WAIT:
}
}
func (c *cluster) establishLeadership(ctx context.Context) error {
c.logger.Debug().Log("Establishing leadership")
// creating a map of which process runs where
func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error {
c.logger.Debug().WithField("emergency", emergency).Log("Establishing leadership")
ctx, cancel := context.WithCancel(ctx)
c.cancelLeaderShip = cancel
go c.startRebalance(ctx, c.syncInterval)
go c.startSynchronizeAndRebalance(ctx, c.syncInterval, emergency)
return nil
}
@@ -305,7 +303,26 @@ func (c *cluster) revokeLeadership() {
c.cancelLeaderShip()
}
func (c *cluster) startRebalance(ctx context.Context, interval time.Duration) {
// startSynchronizeAndRebalance synchronizes and rebalances the processes in a given interval. Synchronizing
// takes care that all processes in the cluster DB are running on one node. It writes the process->node mapping
// to the cluster DB such that when a new leader gets elected it knows where which process should be running.
// This is checked against the actual state. If a node is not reachable, the leader still knows which processes
// should be running on that node. For a certain duration (nodeRecoverTimeout) this is tolerated in case the
// node comes back. If not, the processes will be distributed to the remaining nodes. The actual observed state
// is stored back into the cluster DB.
//
// It follows the rebalancing which takes care that processes are taken from overloaded nodes. In each iteration
// only one process is taken away from a node. If a node is not reachable, its processes will be not part of the
// rebalancing and no attempt will be made to move processes from and to that node.
//
// All this happens if there's a leader. If there's no leader election possible, the node goes into the
// emergency leadership mode after a certain duration (emergencyLeaderTimeout). The synchronization phase will
// happen based on the last known list of processes from the cluster DB. Until nodeRecoverTimeout is reached,
// process that would run on unreachable nodes will not be moved to the node. Rebalancing will be disabled.
//
// The goal of synchronizing and rebalancing is to make as little moves as possible and to be tolerant for
// a while if a node is not reachable.
func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval time.Duration, emergency bool) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -314,8 +331,11 @@ func (c *cluster) startRebalance(ctx context.Context, interval time.Duration) {
case <-ctx.Done():
return
case <-ticker.C:
c.doSynchronize()
c.doRebalance()
c.doSynchronize(emergency)
if !emergency {
c.doRebalance(emergency)
}
}
}
}
@@ -377,6 +397,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Adding process")
break
}
err = c.proxy.ProcessStart(v.nodeid, v.config.ProcessID())
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
@@ -398,6 +419,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Updating process")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID,
"nodeid": v.nodeid,
@@ -411,6 +433,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Removing process")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
@@ -425,6 +448,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Moving process, adding process")
break
}
err = c.proxy.ProcessDelete(v.fromNodeid, v.config.ProcessID())
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
@@ -434,6 +458,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Moving process, removing process")
break
}
err = c.proxy.ProcessStart(v.toNodeid, v.config.ProcessID())
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
@@ -443,6 +468,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Moving process, starting process")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID,
"fromnodeid": v.fromNodeid,
@@ -457,6 +483,7 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}).Log("Starting process")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
@@ -474,39 +501,73 @@ func (c *cluster) applyOpStack(stack []interface{}) {
}
}
func (c *cluster) doSynchronize() {
func (c *cluster) doSynchronize(emergency bool) {
wish := c.store.GetProcessNodeMap()
want := c.store.ProcessList()
have := c.proxy.ListProcesses()
resources := c.proxy.Resources()
nodes := c.proxy.ListNodes()
nodesMap := map[string]proxy.NodeAbout{}
for _, node := range nodes {
about := node.About()
nodesMap[about.ID] = about
}
c.logger.Debug().WithFields(log.Fields{
"want": want,
"have": have,
"resources": resources,
"nodes": nodesMap,
}).Log("Synchronize")
opStack := synchronize(want, have, resources)
opStack, _, reality := synchronize(wish, want, have, nodesMap, c.nodeRecoverTimeout)
if !emergency {
cmd := &store.Command{
Operation: store.OpSetProcessNodeMap,
Data: store.CommandSetProcessNodeMap{
Map: reality,
},
}
c.applyCommand(cmd)
}
c.applyOpStack(opStack)
}
func (c *cluster) doRebalance() {
func (c *cluster) doRebalance(emergency bool) {
have := c.proxy.ListProcesses()
resources := c.proxy.Resources()
nodes := c.proxy.ListNodes()
nodesMap := map[string]proxy.NodeAbout{}
for _, node := range nodes {
about := node.About()
nodesMap[about.ID] = about
}
c.logger.Debug().WithFields(log.Fields{
"have": have,
"resources": resources,
"nodes": nodes,
}).Log("Rebalance")
opStack := rebalance(have, resources)
opStack, _ := rebalance(have, nodesMap)
c.applyOpStack(opStack)
}
// synchronize returns a list of operations in order to adjust the "have" list to the "want" list
// with taking the available resources on each node into account.
func synchronize(want []store.Process, have []proxy.Process, resources map[string]proxy.NodeResources) []interface{} {
func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) {
resources := map[string]proxy.NodeResources{}
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
// A map same as wish, but reflecting the actual situation.
reality := map[string]string{}
// A map from the process ID to the process config of the processes
// we want to be running on the nodes.
wantMap := map[string]store.Process{}
@@ -519,31 +580,32 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
// Now we iterate through the processes we actually have running on the nodes
// and remove them from the wantMap. We also make sure that they are running.
// If a process is not on the wantMap, it will be deleted from the nodes.
// If a process cannot be found on the wantMap, it will be deleted from the nodes.
haveAfterRemove := []proxy.Process{}
for _, p := range have {
pid := p.Config.ProcessID().String()
for _, haveP := range have {
pid := haveP.Config.ProcessID().String()
if wantP, ok := wantMap[pid]; !ok {
// The process is not on the wantMap. Delete it and adjust the resources.
opStack = append(opStack, processOpDelete{
nodeid: p.NodeID,
processid: p.Config.ProcessID(),
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
})
// Adjust the resources
r, ok := resources[p.NodeID]
r, ok := resources[haveP.NodeID]
if ok {
r.CPU -= p.CPU
r.Mem -= p.Mem
resources[p.NodeID] = r
r.CPU -= haveP.CPU
r.Mem -= haveP.Mem
resources[haveP.NodeID] = r
}
continue
} else {
if wantP.UpdatedAt.After(p.UpdatedAt) {
// The process is on the wantMap. Update the process if the configuration differ.
if !wantP.Config.Equal(haveP.Config) {
opStack = append(opStack, processOpUpdate{
nodeid: p.NodeID,
processid: p.Config.ProcessID(),
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
config: wantP.Config,
metadata: wantP.Metadata,
})
@@ -551,24 +613,44 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
}
delete(wantMap, pid)
reality[pid] = haveP.NodeID
if p.Order != "start" {
if haveP.Order != "start" {
opStack = append(opStack, processOpStart{
nodeid: p.NodeID,
processid: p.Config.ProcessID(),
nodeid: haveP.NodeID,
processid: haveP.Config.ProcessID(),
})
}
haveAfterRemove = append(haveAfterRemove, p)
haveAfterRemove = append(haveAfterRemove, haveP)
}
have = haveAfterRemove
// A map from the process reference to the node it is running on
// In case a node didn't respond, some PID are still on the wantMap, that would run on
// the currently not responding nodes. We use the wish map to assign them to the node.
// If the node is unavailable for too long, keep these processes on the wantMap, otherwise
// remove them and hope that they will reappear during the nodeRecoverTimeout.
for pid := range wantMap {
// Check if this PID is be assigned to a node.
if nodeid, ok := wish[pid]; ok {
// Check for how long the node hasn't been contacted, or if it still exists.
if node, ok := nodes[nodeid]; ok {
if time.Since(node.LastContact) <= nodeRecoverTimeout {
reality[pid] = nodeid
delete(wantMap, pid)
}
}
}
}
// The wantMap now contains only those processes that need to be installed on a node.
// A map from the process reference to the node it is running on.
haveReferenceAffinityMap := createReferenceAffinityMap(have)
// Now all remaining processes in the wantMap must be added to one of the nodes
for _, process := range wantMap {
// Now all remaining processes in the wantMap must be added to one of the nodes.
for pid, process := range wantMap {
// If a process doesn't have any limits defined, reject that process
if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 {
opStack = append(opStack, processOpReject{
@@ -580,19 +662,19 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
}
// Check if there are already processes with the same reference, and if so
// chose this node. Then check the node if it has enough resources left. If
// choose this node. Then check the node if it has enough resources left. If
// not, then select a node with the most available resources.
nodeid := ""
// Try to add the process to a node where other processes with the same
// reference currently reside.
if len(process.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[process.Config.Reference] {
for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] {
r := resources[count.nodeid]
cpu := process.Config.LimitCPU
mem := process.Config.LimitMemory
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
nodeid = count.nodeid
break
}
@@ -606,7 +688,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
mem := process.Config.LimitMemory
if len(nodeid) == 0 {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit {
if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling {
nodeid = id
}
@@ -633,6 +715,8 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
r.Mem += process.Config.LimitMemory
resources[nodeid] = r
}
reality[pid] = nodeid
} else {
opStack = append(opStack, processOpReject{
processid: process.Config.ProcessID(),
@@ -641,7 +725,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin
}
}
return opStack
return opStack, resources, reality
}
type referenceAffinityNodeCount struct {
@@ -649,6 +733,8 @@ type referenceAffinityNodeCount struct {
count uint64
}
// createReferenceAffinityMap returns a map of references (per domain) to an array of nodes this reference
// is found on and their count. The array is sorted by the count, the highest first.
func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenceAffinityNodeCount {
referenceAffinityMap := map[string][]referenceAffinityNodeCount{}
for _, p := range processes {
@@ -656,11 +742,13 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc
continue
}
ref := p.Config.Reference + "@" + p.Config.Domain
// Here we count how often a reference is present on a node. When
// moving processes to a different node, the node with the highest
// count of same references will be the first candidate.
found := false
arr := referenceAffinityMap[p.Config.Reference]
arr := referenceAffinityMap[ref]
for i, count := range arr {
if count.nodeid == p.NodeID {
count.count++
@@ -677,7 +765,7 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc
})
}
referenceAffinityMap[p.Config.Reference] = arr
referenceAffinityMap[ref] = arr
}
// Sort every reference count in decreasing order for each reference
@@ -692,32 +780,15 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc
return referenceAffinityMap
}
// rebalance returns a list of operations that will move running processes away from nodes
// that are overloaded.
func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) []interface{} {
// Group the processes by node
processNodeMap := map[string][]proxy.Process{}
for _, p := range have {
processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p)
// rebalance returns a list of operations that will move running processes away from nodes that are overloaded.
func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) {
resources := map[string]proxy.NodeResources{}
for nodeid, about := range nodes {
resources[nodeid] = about.Resources
}
// Sort the processes by their runtime (if they are running) for each node
for nodeid, processes := range processNodeMap {
sort.SliceStable(processes, func(a, b int) bool {
if processes[a].State == "running" {
if processes[b].State != "running" {
return false
}
return processes[a].Runtime < processes[b].Runtime
}
return false
})
processNodeMap[nodeid] = processes
}
// Group the processes by node and sort them
nodeProcessMap := createNodeProcessMap(have)
// A map from the process reference to the nodes it is running on
haveReferenceAffinityMap := createReferenceAffinityMap(have)
@@ -725,15 +796,17 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [
opStack := []interface{}{}
// Check if any of the nodes is overloaded
for id, r := range resources {
for id, node := range nodes {
r := node.Resources
// Check if node is overloaded
if r.CPU < r.CPULimit && r.Mem < r.MemLimit {
if r.CPU < r.CPULimit && r.Mem < r.MemLimit && !r.IsThrottling {
continue
}
// Move processes from this noed to another node with enough free resources.
// The processes are ordered ascending by their runtime.
processes := processNodeMap[id]
processes := nodeProcessMap[id]
if len(processes) == 0 {
// If there are no processes on that node, we can't do anything
continue
@@ -752,13 +825,13 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [
// Try to move the process to a node where other processes with the same
// reference currently reside.
if len(p.Config.Reference) != 0 {
for _, count := range haveReferenceAffinityMap[p.Config.Reference] {
for _, count := range haveReferenceAffinityMap[p.Config.Reference+"@"+p.Config.Domain] {
if count.nodeid == overloadedNodeid {
continue
}
r := resources[count.nodeid]
if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit {
if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit && !r.IsThrottling {
availableNodeid = count.nodeid
break
}
@@ -767,13 +840,15 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [
// Find another node with enough resources available
if len(availableNodeid) == 0 {
for id, r := range resources {
for id, node := range nodes {
if id == overloadedNodeid {
// Skip the overloaded node
continue
}
if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit {
r := node.Resources
if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit && !r.IsThrottling {
availableNodeid = id
break
}
@@ -812,12 +887,40 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [
r.Mem -= p.Mem
resources[overloadedNodeid] = r
// If this node is not anymore overloaded, stop moving processes around
if r.CPU < r.CPULimit && r.Mem < r.MemLimit {
// Move only one process at a time
break
}
}
return opStack, resources
}
// createNodeProcessMap takes a list of processes and groups them by the nodeid they
// are running on. Each group gets sorted by their preference to be moved somewhere
// else, decreasing.
func createNodeProcessMap(processes []proxy.Process) map[string][]proxy.Process {
nodeProcessMap := map[string][]proxy.Process{}
for _, p := range processes {
nodeProcessMap[p.NodeID] = append(nodeProcessMap[p.NodeID], p)
}
return opStack
// Sort the processes by their runtime (if they are running) for each node
for nodeid, processes := range nodeProcessMap {
sort.SliceStable(processes, func(a, b int) bool {
if processes[a].State == "running" {
if processes[b].State != "running" {
return false
}
return processes[a].Runtime < processes[b].Runtime
}
return false
})
nodeProcessMap[nodeid] = processes
}
return nodeProcessMap
}

View File

@@ -12,6 +12,8 @@ import (
)
func TestSynchronizeAdd(t *testing.T) {
wish := map[string]string{}
want := []store.Process{
{
UpdatedAt: time.Now(),
@@ -25,24 +27,30 @@ func TestSynchronizeAdd(t *testing.T) {
have := []proxy.Process{}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 7,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 85,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
@@ -55,6 +63,10 @@ func TestSynchronizeAdd(t *testing.T) {
},
}, stack)
require.Equal(t, map[string]string{
"foobar@": "node1",
}, reality)
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
@@ -74,6 +86,10 @@ func TestSynchronizeAdd(t *testing.T) {
}
func TestSynchronizeAddReferenceAffinity(t *testing.T) {
wish := map[string]string{
"foobar@": "node2",
}
now := time.Now()
want := []store.Process{
@@ -109,28 +125,36 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) {
Config: &app.Config{
ID: "foobar",
Reference: "barfoo",
LimitCPU: 10,
LimitMemory: 20,
},
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
@@ -143,9 +167,16 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) {
},
},
}, stack)
require.Equal(t, map[string]string{
"foobar@": "node2",
"foobar2@": "node2",
}, reality)
}
func TestSynchronizeAddLimit(t *testing.T) {
wish := map[string]string{}
want := []store.Process{
{
UpdatedAt: time.Now(),
@@ -159,24 +190,30 @@ func TestSynchronizeAddLimit(t *testing.T) {
have := []proxy.Process{}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 81,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 79,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpAdd{
@@ -189,6 +226,10 @@ func TestSynchronizeAddLimit(t *testing.T) {
},
}, stack)
require.Equal(t, map[string]string{
"foobar@": "node2",
}, reality)
require.Equal(t, map[string]proxy.NodeResources{
"node1": {
NCPU: 1,
@@ -208,6 +249,8 @@ func TestSynchronizeAddLimit(t *testing.T) {
}
func TestSynchronizeAddNoResourcesCPU(t *testing.T) {
wish := map[string]string{}
want := []store.Process{
{
UpdatedAt: time.Now(),
@@ -221,24 +264,30 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) {
have := []proxy.Process{}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 81,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 79,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, _, _ := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpReject{
@@ -249,6 +298,8 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) {
}
func TestSynchronizeAddNoResourcesMemory(t *testing.T) {
wish := map[string]string{}
want := []store.Process{
{
UpdatedAt: time.Now(),
@@ -262,24 +313,30 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) {
have := []proxy.Process{}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 81,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 79,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, _, _ := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpReject{
@@ -290,6 +347,8 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) {
}
func TestSynchronizeAddNoLimits(t *testing.T) {
wish := map[string]string{}
want := []store.Process{
{
UpdatedAt: time.Now(),
@@ -301,24 +360,30 @@ func TestSynchronizeAddNoLimits(t *testing.T) {
have := []proxy.Process{}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 81,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 79,
Mem: 72,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, _, _ := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpReject{
@@ -329,6 +394,10 @@ func TestSynchronizeAddNoLimits(t *testing.T) {
}
func TestSynchronizeRemove(t *testing.T) {
wish := map[string]string{
"foobar@": "node2",
}
want := []store.Process{}
have := []proxy.Process{
@@ -345,24 +414,30 @@ func TestSynchronizeRemove(t *testing.T) {
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 7,
Mem: 65,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 85,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpDelete{
@@ -387,9 +462,15 @@ func TestSynchronizeRemove(t *testing.T) {
MemLimit: 90,
},
}, resources)
require.Equal(t, map[string]string{}, reality)
}
func TestSynchronizeAddRemove(t *testing.T) {
wish := map[string]string{
"foobar2@": "node2",
}
want := []store.Process{
{
UpdatedAt: time.Now(),
@@ -415,24 +496,30 @@ func TestSynchronizeAddRemove(t *testing.T) {
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 7,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 85,
Mem: 65,
CPULimit: 90,
MemLimit: 90,
},
},
}
stack := synchronize(want, have, resources)
stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute)
require.Equal(t, []interface{}{
processOpDelete{
@@ -465,6 +552,10 @@ func TestSynchronizeAddRemove(t *testing.T) {
MemLimit: 90,
},
}, resources)
require.Equal(t, map[string]string{
"foobar1@": "node1",
}, reality)
}
func TestRebalanceNothingToDo(t *testing.T) {
@@ -493,24 +584,30 @@ func TestRebalanceNothingToDo(t *testing.T) {
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 42,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 37,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
}
opStack := rebalance(processes, resources)
opStack, _ := rebalance(processes, nodes)
require.Empty(t, opStack)
}
@@ -552,24 +649,30 @@ func TestRebalanceOverload(t *testing.T) {
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 91,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 15,
Mem: 11,
CPULimit: 90,
MemLimit: 90,
},
},
}
opStack := rebalance(processes, resources)
opStack, resources := rebalance(processes, nodes)
require.NotEmpty(t, opStack)
@@ -638,24 +741,30 @@ func TestRebalanceSkip(t *testing.T) {
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 91,
Mem: 35,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 15,
Mem: 92,
CPULimit: 90,
MemLimit: 90,
},
},
}
opStack := rebalance(processes, resources)
opStack, resources := rebalance(processes, nodes)
require.NotEmpty(t, opStack)
@@ -758,31 +867,40 @@ func TestRebalanceReferenceAffinity(t *testing.T) {
},
}
resources := map[string]proxy.NodeResources{
nodes := map[string]proxy.NodeAbout{
"node1": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 90,
Mem: 90,
CPULimit: 90,
MemLimit: 90,
},
},
"node2": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
"node3": {
LastContact: time.Now(),
Resources: proxy.NodeResources{
NCPU: 1,
CPU: 1,
Mem: 1,
CPULimit: 90,
MemLimit: 90,
},
},
}
opStack := rebalance(processes, resources)
opStack, resources := rebalance(processes, nodes)
require.NotEmpty(t, opStack)
@@ -822,6 +940,164 @@ func TestRebalanceReferenceAffinity(t *testing.T) {
}, resources)
}
func TestCreateNodeProcessMap(t *testing.T) {
processes := []proxy.Process{
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
Config: &app.Config{
ID: "foobar1",
},
},
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 1,
Config: &app.Config{
ID: "foobar2",
Reference: "ref1",
},
},
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 67,
Config: &app.Config{
ID: "foobar3",
Reference: "ref3",
},
},
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
Config: &app.Config{
ID: "foobar3",
Reference: "ref2",
},
},
{
NodeID: "node3",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 41,
Config: &app.Config{
ID: "foobar4",
Reference: "ref1",
},
},
{
NodeID: "node3",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
Config: &app.Config{
ID: "foobar5",
Reference: "ref1",
},
},
}
nodeProcessMap := createNodeProcessMap(processes)
require.Equal(t, map[string][]proxy.Process{
"node1": {
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 1,
Config: &app.Config{
ID: "foobar2",
Reference: "ref1",
},
},
{
NodeID: "node1",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
Config: &app.Config{
ID: "foobar1",
},
},
},
"node2": {
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
Config: &app.Config{
ID: "foobar3",
Reference: "ref2",
},
},
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 67,
Config: &app.Config{
ID: "foobar3",
Reference: "ref3",
},
},
},
"node3": {
{
NodeID: "node3",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 41,
Config: &app.Config{
ID: "foobar4",
Reference: "ref1",
},
},
{
NodeID: "node3",
Order: "start",
State: "running",
CPU: 1,
Mem: 1,
Runtime: 42,
Config: &app.Config{
ID: "foobar5",
Reference: "ref1",
},
},
},
}, nodeProcessMap)
}
func TestCreateReferenceAffinityNodeMap(t *testing.T) {
processes := []proxy.Process{
{
@@ -900,7 +1176,7 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) {
affinityMap := createReferenceAffinityMap(processes)
require.Equal(t, map[string][]referenceAffinityNodeCount{
"ref1": {
"ref1@": {
{
nodeid: "node3",
count: 2,
@@ -910,13 +1186,13 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) {
count: 1,
},
},
"ref2": {
"ref2@": {
{
nodeid: "node2",
count: 1,
},
},
"ref3": {
"ref3@": {
{
nodeid: "node2",
count: 1,

View File

@@ -41,6 +41,7 @@ type NodeReader interface {
IPs() []string
About() NodeAbout
Version() NodeVersion
Resources() NodeResources
Files() NodeFiles
ProcessList() ([]Process, error)
@@ -495,6 +496,22 @@ func (n *node) About() NodeAbout {
return nodeAbout
}
func (n *node) Resources() NodeResources {
n.stateLock.RLock()
defer n.stateLock.RUnlock()
r := NodeResources{
IsThrottling: n.resources.throttling,
NCPU: n.resources.ncpu,
CPU: n.resources.cpu,
CPULimit: n.resources.cpuLimit,
Mem: n.resources.mem,
MemLimit: n.resources.memLimit,
}
return r
}
func (n *node) Version() NodeVersion {
about, err := n.AboutPeer()
if err != nil {

View File

@@ -239,9 +239,8 @@ func (p *proxy) Resources() map[string]NodeResources {
p.lock.RLock()
defer p.lock.RUnlock()
for _, node := range p.nodes {
about := node.About()
resources[about.ID] = about.Resources
for id, node := range p.nodes {
resources[id] = node.Resources()
}
return resources

View File

@@ -1,7 +1,6 @@
package store
import (
"bytes"
"encoding/json"
"fmt"
"io"
@@ -23,6 +22,7 @@ type Store interface {
ProcessList() []Process
GetProcess(id app.ProcessID) (Process, error)
GetProcessNodeMap() map[string]string
UserList() Users
GetUser(name string) Users
@@ -68,6 +68,7 @@ const (
OpUpdateIdentity Operation = "updateIdentity"
OpRemoveIdentity Operation = "removeIdentity"
OpSetPolicies Operation = "setPolicies"
OpSetProcessNodeMap Operation = "setProcessNodeMap"
)
type Command struct {
@@ -112,10 +113,19 @@ type CommandSetPolicies struct {
Policies []access.Policy
}
type CommandSetProcessNodeMap struct {
Map map[string]string
}
// Implement a FSM
type store struct {
lock sync.RWMutex
callback func(op Operation)
logger log.Logger
Process map[string]Process
ProcessNodeMap map[string]string
Users struct {
UpdatedAt time.Time
@@ -126,10 +136,6 @@ type store struct {
UpdatedAt time.Time
Policies map[string][]access.Policy
}
callback func(op Operation)
logger log.Logger
}
type Config struct {
@@ -139,6 +145,7 @@ type Config struct {
func NewStore(config Config) (Store, error) {
s := &store{
Process: map[string]Process{},
ProcessNodeMap: map[string]string{},
logger: config.Logger,
}
@@ -219,6 +226,12 @@ func (s *store) Apply(entry *raft.Log) interface{} {
json.Unmarshal(b, &cmd)
err = s.setPolicies(cmd)
case OpSetProcessNodeMap:
b, _ := json.Marshal(c.Data)
cmd := CommandSetProcessNodeMap{}
json.Unmarshal(b, &cmd)
err = s.setProcessNodeMap(cmd)
default:
s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation")
return nil
@@ -244,6 +257,10 @@ func (s *store) addProcess(cmd CommandAddProcess) error {
id := cmd.Config.ProcessID().String()
if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 {
return NewStoreError("the process with the ID '%s' must have limits defined", id)
}
_, ok := s.Process[id]
if ok {
return NewStoreError("the process with the ID '%s' already exists", id)
@@ -283,15 +300,16 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
srcid := cmd.ID.String()
dstid := cmd.Config.ProcessID().String()
if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 {
return NewStoreError("the process with the ID '%s' must have limits defined", dstid)
}
p, ok := s.Process[srcid]
if !ok {
return NewStoreError("the process with the ID '%s' doesn't exists", srcid)
}
currentHash := p.Config.Hash()
replaceHash := cmd.Config.Hash()
if bytes.Equal(currentHash, replaceHash) {
if p.Config.Equal(cmd.Config) {
return nil
}
@@ -404,6 +422,15 @@ func (s *store) setPolicies(cmd CommandSetPolicies) error {
return nil
}
func (s *store) setProcessNodeMap(cmd CommandSetProcessNodeMap) error {
s.lock.Lock()
defer s.lock.Unlock()
s.ProcessNodeMap = cmd.Map
return nil
}
func (s *store) OnApply(fn func(op Operation)) {
s.lock.Lock()
defer s.lock.Unlock()
@@ -545,6 +572,19 @@ func (s *store) PolicyUserList(name string) Policies {
return p
}
func (s *store) GetProcessNodeMap() map[string]string {
s.lock.RLock()
defer s.lock.RUnlock()
m := map[string]string{}
for key, value := range s.ProcessNodeMap {
m[key] = value
}
return m
}
type fsmSnapshot struct {
data []byte
}

View File

@@ -291,6 +291,7 @@ func (d *Config) init() {
d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false)
d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false)
}
// Validate validates the current state of the Config for completeness and sanity. Errors are

View File

@@ -181,6 +181,7 @@ type Data struct {
Peers []string `json:"peers"`
SyncInterval int64 `json:"sync_interval" format:"int64"` // seconds
NodeRecoverTimeout int64 `json:"node_recover_timeout" format:"int64"` // seconds
EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout" format:"int64"` // seconds
} `json:"cluster"`
}

View File

@@ -96,17 +96,27 @@ func (f *consoleFormatter) String(e *Event) string {
value := e.Data[key]
switch val := value.(type) {
case bool:
if val {
v = "true"
} else {
v = "false"
}
case string:
v = f.quote(val)
case error:
v = f.quote(val.Error())
default:
if str, ok := val.(fmt.Stringer); ok {
v = f.quote(str.String())
} else {
if jsonvalue, err := json.Marshal(value); err == nil {
v = string(jsonvalue)
} else {
v = f.quote(err.Error())
}
}
}
message += fmt.Sprintf(" %s", f.writeKV(key, v))
}

View File

@@ -176,6 +176,10 @@ func (config *Config) Hash() []byte {
return sum[:]
}
func (c *Config) Equal(a *Config) bool {
return bytes.Equal(c.Hash(), a.Hash())
}
func (c *Config) ProcessID() ProcessID {
return ProcessID{
ID: c.ID,
@@ -293,7 +297,7 @@ func (p ProcessID) String() string {
return p.ID + "@" + p.Domain
}
func (p ProcessID) Equals(b ProcessID) bool {
func (p ProcessID) Equal(b ProcessID) bool {
if p.ID == b.ID && p.Domain == b.Domain {
return true
}

View File

@@ -1,7 +1,6 @@
package restream
import (
"bytes"
"context"
"errors"
"fmt"
@@ -1177,17 +1176,14 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error {
return err
}
currentHash := task.config.Hash()
replaceHash := t.config.Hash()
// If the new config has the same hash as the current config, do nothing.
if bytes.Equal(currentHash, replaceHash) {
if task.config.Equal(t.config) {
return nil
}
tid := t.ID()
if !tid.Equals(id) {
if !tid.Equal(id) {
_, ok := r.tasks[tid]
if ok {
return ErrProcessExists