diff --git a/app/api/api.go b/app/api/api.go index 26abea14..b9d2c010 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -471,20 +471,21 @@ func (a *api) start() error { } cluster, err := cluster.New(cluster.ClusterConfig{ - ID: cfg.ID, - Name: cfg.Name, - Path: filepath.Join(cfg.DB.Dir, "cluster"), - Bootstrap: cfg.Cluster.Bootstrap, - Recover: cfg.Cluster.Recover, - Address: cfg.Cluster.Address, - Peers: peers, - SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, - NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, - CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), - CoreAPIUsername: cfg.API.Auth.Username, - CoreAPIPassword: cfg.API.Auth.Password, - IPLimiter: a.sessionsLimiter, - Logger: a.log.logger.core.WithComponent("Cluster"), + ID: cfg.ID, + Name: cfg.Name, + Path: filepath.Join(cfg.DB.Dir, "cluster"), + Bootstrap: cfg.Cluster.Bootstrap, + Recover: cfg.Cluster.Recover, + Address: cfg.Cluster.Address, + Peers: peers, + SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, + NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, + EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, + CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), + CoreAPIUsername: cfg.API.Auth.Username, + CoreAPIPassword: cfg.API.Auth.Password, + IPLimiter: a.sessionsLimiter, + Logger: a.log.logger.core.WithComponent("Cluster"), }) if err != nil { return fmt.Errorf("unable to create cluster: %w", err) diff --git a/cluster/cluster.go b/cluster/cluster.go index 54a47b52..6d0cd131 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -80,8 +80,9 @@ type ClusterConfig struct { Address string // Listen address for the raft protocol Peers []Peer // Address of a member of a cluster to join - SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes - NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes + SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes + NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes + EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader CoreAPIAddress string // Address of the core API CoreAPIUsername string // Username for the core API @@ -113,8 +114,9 @@ type cluster struct { shutdownCh chan struct{} shutdownLock sync.Mutex - syncInterval time.Duration - nodeRecoverTimeout time.Duration + syncInterval time.Duration + nodeRecoverTimeout time.Duration + emergencyLeaderTimeout time.Duration forwarder forwarder.Forwarder api API @@ -143,8 +145,9 @@ func New(config ClusterConfig) (Cluster, error) { shutdownCh: make(chan struct{}), - syncInterval: config.SyncInterval, - nodeRecoverTimeout: config.NodeRecoverTimeout, + syncInterval: config.SyncInterval, + nodeRecoverTimeout: config.NodeRecoverTimeout, + emergencyLeaderTimeout: config.EmergencyLeaderTimeout, nodes: map[string]proxy.Node{}, } @@ -971,15 +974,15 @@ func (c *cluster) sentinel() { c.logger.Debug().WithFields(log.Fields{ "state": stats.State, - "last_contact": stats.LastContact.String(), + "last_contact": stats.LastContact, "num_peers": stats.NumPeers, }).Log("Stats") - if stats.LastContact > 10*time.Second && !isEmergencyLeader { + if stats.LastContact > c.emergencyLeaderTimeout && !isEmergencyLeader { c.logger.Warn().Log("Force leadership due to lost contact to leader") c.raftEmergencyNotifyCh <- true isEmergencyLeader = true - } else if stats.LastContact <= 10*time.Second && isEmergencyLeader { + } else if stats.LastContact <= c.emergencyLeaderTimeout && isEmergencyLeader { c.logger.Warn().Log("Stop forced leadership due to contact to leader") c.raftEmergencyNotifyCh <- false isEmergencyLeader = false diff --git a/cluster/leader.go b/cluster/leader.go index 89a1a471..d32b68bf 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -241,7 +241,7 @@ RECONCILE: // Check if we need to handle initial leadership actions if !establishedLeader { - if err := c.establishLeadership(context.TODO()); err != nil { + if err := c.establishLeadership(context.TODO(), emergency); err != nil { c.logger.Error().WithError(err).Log("Establish leadership") // Immediately revoke leadership since we didn't successfully // establish leadership. @@ -286,15 +286,13 @@ WAIT: } } -func (c *cluster) establishLeadership(ctx context.Context) error { - c.logger.Debug().Log("Establishing leadership") - - // creating a map of which process runs where +func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error { + c.logger.Debug().WithField("emergency", emergency).Log("Establishing leadership") ctx, cancel := context.WithCancel(ctx) c.cancelLeaderShip = cancel - go c.startRebalance(ctx, c.syncInterval) + go c.startSynchronizeAndRebalance(ctx, c.syncInterval, emergency) return nil } @@ -305,7 +303,26 @@ func (c *cluster) revokeLeadership() { c.cancelLeaderShip() } -func (c *cluster) startRebalance(ctx context.Context, interval time.Duration) { +// startSynchronizeAndRebalance synchronizes and rebalances the processes in a given interval. Synchronizing +// takes care that all processes in the cluster DB are running on one node. It writes the process->node mapping +// to the cluster DB such that when a new leader gets elected it knows where which process should be running. +// This is checked against the actual state. If a node is not reachable, the leader still knows which processes +// should be running on that node. For a certain duration (nodeRecoverTimeout) this is tolerated in case the +// node comes back. If not, the processes will be distributed to the remaining nodes. The actual observed state +// is stored back into the cluster DB. +// +// It follows the rebalancing which takes care that processes are taken from overloaded nodes. In each iteration +// only one process is taken away from a node. If a node is not reachable, its processes will be not part of the +// rebalancing and no attempt will be made to move processes from and to that node. +// +// All this happens if there's a leader. If there's no leader election possible, the node goes into the +// emergency leadership mode after a certain duration (emergencyLeaderTimeout). The synchronization phase will +// happen based on the last known list of processes from the cluster DB. Until nodeRecoverTimeout is reached, +// process that would run on unreachable nodes will not be moved to the node. Rebalancing will be disabled. +// +// The goal of synchronizing and rebalancing is to make as little moves as possible and to be tolerant for +// a while if a node is not reachable. +func (c *cluster) startSynchronizeAndRebalance(ctx context.Context, interval time.Duration, emergency bool) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -314,8 +331,11 @@ func (c *cluster) startRebalance(ctx context.Context, interval time.Duration) { case <-ctx.Done(): return case <-ticker.C: - c.doSynchronize() - c.doRebalance() + c.doSynchronize(emergency) + + if !emergency { + c.doRebalance(emergency) + } } } } @@ -377,6 +397,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Adding process") break } + err = c.proxy.ProcessStart(v.nodeid, v.config.ProcessID()) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ @@ -398,6 +419,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Updating process") break } + c.logger.Info().WithFields(log.Fields{ "processid": v.config.ID, "nodeid": v.nodeid, @@ -411,6 +433,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Removing process") break } + c.logger.Info().WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -425,6 +448,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Moving process, adding process") break } + err = c.proxy.ProcessDelete(v.fromNodeid, v.config.ProcessID()) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ @@ -434,6 +458,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Moving process, removing process") break } + err = c.proxy.ProcessStart(v.toNodeid, v.config.ProcessID()) if err != nil { c.logger.Info().WithError(err).WithFields(log.Fields{ @@ -443,6 +468,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Moving process, starting process") break } + c.logger.Info().WithFields(log.Fields{ "processid": v.config.ID, "fromnodeid": v.fromNodeid, @@ -457,6 +483,7 @@ func (c *cluster) applyOpStack(stack []interface{}) { }).Log("Starting process") break } + c.logger.Info().WithFields(log.Fields{ "processid": v.processid, "nodeid": v.nodeid, @@ -474,39 +501,73 @@ func (c *cluster) applyOpStack(stack []interface{}) { } } -func (c *cluster) doSynchronize() { +func (c *cluster) doSynchronize(emergency bool) { + wish := c.store.GetProcessNodeMap() want := c.store.ProcessList() have := c.proxy.ListProcesses() - resources := c.proxy.Resources() + nodes := c.proxy.ListNodes() + + nodesMap := map[string]proxy.NodeAbout{} + + for _, node := range nodes { + about := node.About() + nodesMap[about.ID] = about + } c.logger.Debug().WithFields(log.Fields{ - "want": want, - "have": have, - "resources": resources, + "want": want, + "have": have, + "nodes": nodesMap, }).Log("Synchronize") - opStack := synchronize(want, have, resources) + opStack, _, reality := synchronize(wish, want, have, nodesMap, c.nodeRecoverTimeout) + + if !emergency { + cmd := &store.Command{ + Operation: store.OpSetProcessNodeMap, + Data: store.CommandSetProcessNodeMap{ + Map: reality, + }, + } + + c.applyCommand(cmd) + } c.applyOpStack(opStack) } -func (c *cluster) doRebalance() { +func (c *cluster) doRebalance(emergency bool) { have := c.proxy.ListProcesses() - resources := c.proxy.Resources() + nodes := c.proxy.ListNodes() + + nodesMap := map[string]proxy.NodeAbout{} + + for _, node := range nodes { + about := node.About() + nodesMap[about.ID] = about + } c.logger.Debug().WithFields(log.Fields{ - "have": have, - "resources": resources, + "have": have, + "nodes": nodes, }).Log("Rebalance") - opStack := rebalance(have, resources) + opStack, _ := rebalance(have, nodesMap) c.applyOpStack(opStack) } // synchronize returns a list of operations in order to adjust the "have" list to the "want" list // with taking the available resources on each node into account. -func synchronize(want []store.Process, have []proxy.Process, resources map[string]proxy.NodeResources) []interface{} { +func synchronize(wish map[string]string, want []store.Process, have []proxy.Process, nodes map[string]proxy.NodeAbout, nodeRecoverTimeout time.Duration) ([]interface{}, map[string]proxy.NodeResources, map[string]string) { + resources := map[string]proxy.NodeResources{} + for nodeid, about := range nodes { + resources[nodeid] = about.Resources + } + + // A map same as wish, but reflecting the actual situation. + reality := map[string]string{} + // A map from the process ID to the process config of the processes // we want to be running on the nodes. wantMap := map[string]store.Process{} @@ -519,31 +580,32 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin // Now we iterate through the processes we actually have running on the nodes // and remove them from the wantMap. We also make sure that they are running. - // If a process is not on the wantMap, it will be deleted from the nodes. + // If a process cannot be found on the wantMap, it will be deleted from the nodes. haveAfterRemove := []proxy.Process{} - for _, p := range have { - pid := p.Config.ProcessID().String() + for _, haveP := range have { + pid := haveP.Config.ProcessID().String() if wantP, ok := wantMap[pid]; !ok { + // The process is not on the wantMap. Delete it and adjust the resources. opStack = append(opStack, processOpDelete{ - nodeid: p.NodeID, - processid: p.Config.ProcessID(), + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), }) - // Adjust the resources - r, ok := resources[p.NodeID] + r, ok := resources[haveP.NodeID] if ok { - r.CPU -= p.CPU - r.Mem -= p.Mem - resources[p.NodeID] = r + r.CPU -= haveP.CPU + r.Mem -= haveP.Mem + resources[haveP.NodeID] = r } continue } else { - if wantP.UpdatedAt.After(p.UpdatedAt) { + // The process is on the wantMap. Update the process if the configuration differ. + if !wantP.Config.Equal(haveP.Config) { opStack = append(opStack, processOpUpdate{ - nodeid: p.NodeID, - processid: p.Config.ProcessID(), + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), config: wantP.Config, metadata: wantP.Metadata, }) @@ -551,24 +613,44 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin } delete(wantMap, pid) + reality[pid] = haveP.NodeID - if p.Order != "start" { + if haveP.Order != "start" { opStack = append(opStack, processOpStart{ - nodeid: p.NodeID, - processid: p.Config.ProcessID(), + nodeid: haveP.NodeID, + processid: haveP.Config.ProcessID(), }) } - haveAfterRemove = append(haveAfterRemove, p) + haveAfterRemove = append(haveAfterRemove, haveP) } have = haveAfterRemove - // A map from the process reference to the node it is running on + // In case a node didn't respond, some PID are still on the wantMap, that would run on + // the currently not responding nodes. We use the wish map to assign them to the node. + // If the node is unavailable for too long, keep these processes on the wantMap, otherwise + // remove them and hope that they will reappear during the nodeRecoverTimeout. + for pid := range wantMap { + // Check if this PID is be assigned to a node. + if nodeid, ok := wish[pid]; ok { + // Check for how long the node hasn't been contacted, or if it still exists. + if node, ok := nodes[nodeid]; ok { + if time.Since(node.LastContact) <= nodeRecoverTimeout { + reality[pid] = nodeid + delete(wantMap, pid) + } + } + } + } + + // The wantMap now contains only those processes that need to be installed on a node. + + // A map from the process reference to the node it is running on. haveReferenceAffinityMap := createReferenceAffinityMap(have) - // Now all remaining processes in the wantMap must be added to one of the nodes - for _, process := range wantMap { + // Now all remaining processes in the wantMap must be added to one of the nodes. + for pid, process := range wantMap { // If a process doesn't have any limits defined, reject that process if process.Config.LimitCPU <= 0 || process.Config.LimitMemory <= 0 { opStack = append(opStack, processOpReject{ @@ -580,19 +662,19 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin } // Check if there are already processes with the same reference, and if so - // chose this node. Then check the node if it has enough resources left. If + // choose this node. Then check the node if it has enough resources left. If // not, then select a node with the most available resources. nodeid := "" // Try to add the process to a node where other processes with the same // reference currently reside. if len(process.Config.Reference) != 0 { - for _, count := range haveReferenceAffinityMap[process.Config.Reference] { + for _, count := range haveReferenceAffinityMap[process.Config.Reference+"@"+process.Config.Domain] { r := resources[count.nodeid] cpu := process.Config.LimitCPU mem := process.Config.LimitMemory - if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { nodeid = count.nodeid break } @@ -606,7 +688,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin mem := process.Config.LimitMemory if len(nodeid) == 0 { - if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit { + if r.CPU+cpu < r.CPULimit && r.Mem+mem < r.MemLimit && !r.IsThrottling { nodeid = id } @@ -633,6 +715,8 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin r.Mem += process.Config.LimitMemory resources[nodeid] = r } + + reality[pid] = nodeid } else { opStack = append(opStack, processOpReject{ processid: process.Config.ProcessID(), @@ -641,7 +725,7 @@ func synchronize(want []store.Process, have []proxy.Process, resources map[strin } } - return opStack + return opStack, resources, reality } type referenceAffinityNodeCount struct { @@ -649,6 +733,8 @@ type referenceAffinityNodeCount struct { count uint64 } +// createReferenceAffinityMap returns a map of references (per domain) to an array of nodes this reference +// is found on and their count. The array is sorted by the count, the highest first. func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenceAffinityNodeCount { referenceAffinityMap := map[string][]referenceAffinityNodeCount{} for _, p := range processes { @@ -656,11 +742,13 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc continue } + ref := p.Config.Reference + "@" + p.Config.Domain + // Here we count how often a reference is present on a node. When // moving processes to a different node, the node with the highest // count of same references will be the first candidate. found := false - arr := referenceAffinityMap[p.Config.Reference] + arr := referenceAffinityMap[ref] for i, count := range arr { if count.nodeid == p.NodeID { count.count++ @@ -677,7 +765,7 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc }) } - referenceAffinityMap[p.Config.Reference] = arr + referenceAffinityMap[ref] = arr } // Sort every reference count in decreasing order for each reference @@ -692,32 +780,15 @@ func createReferenceAffinityMap(processes []proxy.Process) map[string][]referenc return referenceAffinityMap } -// rebalance returns a list of operations that will move running processes away from nodes -// that are overloaded. -func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) []interface{} { - // Group the processes by node - processNodeMap := map[string][]proxy.Process{} - - for _, p := range have { - processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p) +// rebalance returns a list of operations that will move running processes away from nodes that are overloaded. +func rebalance(have []proxy.Process, nodes map[string]proxy.NodeAbout) ([]interface{}, map[string]proxy.NodeResources) { + resources := map[string]proxy.NodeResources{} + for nodeid, about := range nodes { + resources[nodeid] = about.Resources } - // Sort the processes by their runtime (if they are running) for each node - for nodeid, processes := range processNodeMap { - sort.SliceStable(processes, func(a, b int) bool { - if processes[a].State == "running" { - if processes[b].State != "running" { - return false - } - - return processes[a].Runtime < processes[b].Runtime - } - - return false - }) - - processNodeMap[nodeid] = processes - } + // Group the processes by node and sort them + nodeProcessMap := createNodeProcessMap(have) // A map from the process reference to the nodes it is running on haveReferenceAffinityMap := createReferenceAffinityMap(have) @@ -725,15 +796,17 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [ opStack := []interface{}{} // Check if any of the nodes is overloaded - for id, r := range resources { + for id, node := range nodes { + r := node.Resources + // Check if node is overloaded - if r.CPU < r.CPULimit && r.Mem < r.MemLimit { + if r.CPU < r.CPULimit && r.Mem < r.MemLimit && !r.IsThrottling { continue } // Move processes from this noed to another node with enough free resources. // The processes are ordered ascending by their runtime. - processes := processNodeMap[id] + processes := nodeProcessMap[id] if len(processes) == 0 { // If there are no processes on that node, we can't do anything continue @@ -752,13 +825,13 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [ // Try to move the process to a node where other processes with the same // reference currently reside. if len(p.Config.Reference) != 0 { - for _, count := range haveReferenceAffinityMap[p.Config.Reference] { + for _, count := range haveReferenceAffinityMap[p.Config.Reference+"@"+p.Config.Domain] { if count.nodeid == overloadedNodeid { continue } r := resources[count.nodeid] - if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit { + if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit && !r.IsThrottling { availableNodeid = count.nodeid break } @@ -767,13 +840,15 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [ // Find another node with enough resources available if len(availableNodeid) == 0 { - for id, r := range resources { + for id, node := range nodes { if id == overloadedNodeid { // Skip the overloaded node continue } - if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit { + r := node.Resources + + if r.CPU+p.CPU < r.CPULimit && r.Mem+p.Mem < r.MemLimit && !r.IsThrottling { availableNodeid = id break } @@ -812,12 +887,40 @@ func rebalance(have []proxy.Process, resources map[string]proxy.NodeResources) [ r.Mem -= p.Mem resources[overloadedNodeid] = r - // If this node is not anymore overloaded, stop moving processes around - if r.CPU < r.CPULimit && r.Mem < r.MemLimit { - break - } + // Move only one process at a time + break } } - return opStack + return opStack, resources +} + +// createNodeProcessMap takes a list of processes and groups them by the nodeid they +// are running on. Each group gets sorted by their preference to be moved somewhere +// else, decreasing. +func createNodeProcessMap(processes []proxy.Process) map[string][]proxy.Process { + nodeProcessMap := map[string][]proxy.Process{} + + for _, p := range processes { + nodeProcessMap[p.NodeID] = append(nodeProcessMap[p.NodeID], p) + } + + // Sort the processes by their runtime (if they are running) for each node + for nodeid, processes := range nodeProcessMap { + sort.SliceStable(processes, func(a, b int) bool { + if processes[a].State == "running" { + if processes[b].State != "running" { + return false + } + + return processes[a].Runtime < processes[b].Runtime + } + + return false + }) + + nodeProcessMap[nodeid] = processes + } + + return nodeProcessMap } diff --git a/cluster/leader_test.go b/cluster/leader_test.go index 6e41a8df..0845ec0f 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -12,6 +12,8 @@ import ( ) func TestSynchronizeAdd(t *testing.T) { + wish := map[string]string{} + want := []store.Process{ { UpdatedAt: time.Now(), @@ -25,24 +27,30 @@ func TestSynchronizeAdd(t *testing.T) { have := []proxy.Process{} - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 7, - Mem: 35, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 7, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 85, - Mem: 11, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 85, + Mem: 11, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpAdd{ @@ -55,6 +63,10 @@ func TestSynchronizeAdd(t *testing.T) { }, }, stack) + require.Equal(t, map[string]string{ + "foobar@": "node1", + }, reality) + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, @@ -74,6 +86,10 @@ func TestSynchronizeAdd(t *testing.T) { } func TestSynchronizeAddReferenceAffinity(t *testing.T) { + wish := map[string]string{ + "foobar@": "node2", + } + now := time.Now() want := []store.Process{ @@ -107,30 +123,38 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { Runtime: 42, UpdatedAt: now, Config: &app.Config{ - ID: "foobar", - Reference: "barfoo", + ID: "foobar", + Reference: "barfoo", + LimitCPU: 10, + LimitMemory: 20, }, }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 1, - Mem: 1, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 1, - Mem: 1, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, _, reality := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpAdd{ @@ -143,9 +167,16 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { }, }, }, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node2", + "foobar2@": "node2", + }, reality) } func TestSynchronizeAddLimit(t *testing.T) { + wish := map[string]string{} + want := []store.Process{ { UpdatedAt: time.Now(), @@ -159,24 +190,30 @@ func TestSynchronizeAddLimit(t *testing.T) { have := []proxy.Process{} - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 81, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 81, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 79, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 79, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpAdd{ @@ -189,6 +226,10 @@ func TestSynchronizeAddLimit(t *testing.T) { }, }, stack) + require.Equal(t, map[string]string{ + "foobar@": "node2", + }, reality) + require.Equal(t, map[string]proxy.NodeResources{ "node1": { NCPU: 1, @@ -208,6 +249,8 @@ func TestSynchronizeAddLimit(t *testing.T) { } func TestSynchronizeAddNoResourcesCPU(t *testing.T) { + wish := map[string]string{} + want := []store.Process{ { UpdatedAt: time.Now(), @@ -221,24 +264,30 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) { have := []proxy.Process{} - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 81, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 81, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 79, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 79, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, _, _ := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpReject{ @@ -249,6 +298,8 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) { } func TestSynchronizeAddNoResourcesMemory(t *testing.T) { + wish := map[string]string{} + want := []store.Process{ { UpdatedAt: time.Now(), @@ -262,24 +313,30 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) { have := []proxy.Process{} - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 81, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 81, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 79, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 79, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, _, _ := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpReject{ @@ -290,6 +347,8 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) { } func TestSynchronizeAddNoLimits(t *testing.T) { + wish := map[string]string{} + want := []store.Process{ { UpdatedAt: time.Now(), @@ -301,24 +360,30 @@ func TestSynchronizeAddNoLimits(t *testing.T) { have := []proxy.Process{} - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 81, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 81, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 79, - Mem: 72, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 79, + Mem: 72, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, _, _ := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpReject{ @@ -329,6 +394,10 @@ func TestSynchronizeAddNoLimits(t *testing.T) { } func TestSynchronizeRemove(t *testing.T) { + wish := map[string]string{ + "foobar@": "node2", + } + want := []store.Process{} have := []proxy.Process{ @@ -345,24 +414,30 @@ func TestSynchronizeRemove(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 7, - Mem: 65, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 7, + Mem: 65, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 85, - Mem: 11, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 85, + Mem: 11, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpDelete{ @@ -387,9 +462,15 @@ func TestSynchronizeRemove(t *testing.T) { MemLimit: 90, }, }, resources) + + require.Equal(t, map[string]string{}, reality) } func TestSynchronizeAddRemove(t *testing.T) { + wish := map[string]string{ + "foobar2@": "node2", + } + want := []store.Process{ { UpdatedAt: time.Now(), @@ -415,24 +496,30 @@ func TestSynchronizeAddRemove(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 7, - Mem: 35, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 7, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 85, - Mem: 65, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 85, + Mem: 65, + CPULimit: 90, + MemLimit: 90, + }, }, } - stack := synchronize(want, have, resources) + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) require.Equal(t, []interface{}{ processOpDelete{ @@ -465,6 +552,10 @@ func TestSynchronizeAddRemove(t *testing.T) { MemLimit: 90, }, }, resources) + + require.Equal(t, map[string]string{ + "foobar1@": "node1", + }, reality) } func TestRebalanceNothingToDo(t *testing.T) { @@ -493,24 +584,30 @@ func TestRebalanceNothingToDo(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 42, - Mem: 35, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 42, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 37, - Mem: 11, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 37, + Mem: 11, + CPULimit: 90, + MemLimit: 90, + }, }, } - opStack := rebalance(processes, resources) + opStack, _ := rebalance(processes, nodes) require.Empty(t, opStack) } @@ -552,24 +649,30 @@ func TestRebalanceOverload(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 91, - Mem: 35, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 91, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 15, - Mem: 11, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 15, + Mem: 11, + CPULimit: 90, + MemLimit: 90, + }, }, } - opStack := rebalance(processes, resources) + opStack, resources := rebalance(processes, nodes) require.NotEmpty(t, opStack) @@ -638,24 +741,30 @@ func TestRebalanceSkip(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 91, - Mem: 35, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 91, + Mem: 35, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 15, - Mem: 92, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 15, + Mem: 92, + CPULimit: 90, + MemLimit: 90, + }, }, } - opStack := rebalance(processes, resources) + opStack, resources := rebalance(processes, nodes) require.NotEmpty(t, opStack) @@ -758,31 +867,40 @@ func TestRebalanceReferenceAffinity(t *testing.T) { }, } - resources := map[string]proxy.NodeResources{ + nodes := map[string]proxy.NodeAbout{ "node1": { - NCPU: 1, - CPU: 90, - Mem: 90, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 90, + Mem: 90, + CPULimit: 90, + MemLimit: 90, + }, }, "node2": { - NCPU: 1, - CPU: 1, - Mem: 1, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, }, "node3": { - NCPU: 1, - CPU: 1, - Mem: 1, - CPULimit: 90, - MemLimit: 90, + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, }, } - opStack := rebalance(processes, resources) + opStack, resources := rebalance(processes, nodes) require.NotEmpty(t, opStack) @@ -822,6 +940,164 @@ func TestRebalanceReferenceAffinity(t *testing.T) { }, resources) } +func TestCreateNodeProcessMap(t *testing.T) { + processes := []proxy.Process{ + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + Config: &app.Config{ + ID: "foobar1", + }, + }, + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 1, + Config: &app.Config{ + ID: "foobar2", + Reference: "ref1", + }, + }, + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 67, + Config: &app.Config{ + ID: "foobar3", + Reference: "ref3", + }, + }, + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + Config: &app.Config{ + ID: "foobar3", + Reference: "ref2", + }, + }, + { + NodeID: "node3", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 41, + Config: &app.Config{ + ID: "foobar4", + Reference: "ref1", + }, + }, + { + NodeID: "node3", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + Config: &app.Config{ + ID: "foobar5", + Reference: "ref1", + }, + }, + } + + nodeProcessMap := createNodeProcessMap(processes) + + require.Equal(t, map[string][]proxy.Process{ + "node1": { + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 1, + Config: &app.Config{ + ID: "foobar2", + Reference: "ref1", + }, + }, + { + NodeID: "node1", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + Config: &app.Config{ + ID: "foobar1", + }, + }, + }, + "node2": { + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + Config: &app.Config{ + ID: "foobar3", + Reference: "ref2", + }, + }, + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 67, + Config: &app.Config{ + ID: "foobar3", + Reference: "ref3", + }, + }, + }, + "node3": { + { + NodeID: "node3", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 41, + Config: &app.Config{ + ID: "foobar4", + Reference: "ref1", + }, + }, + { + NodeID: "node3", + Order: "start", + State: "running", + CPU: 1, + Mem: 1, + Runtime: 42, + Config: &app.Config{ + ID: "foobar5", + Reference: "ref1", + }, + }, + }, + }, nodeProcessMap) +} + func TestCreateReferenceAffinityNodeMap(t *testing.T) { processes := []proxy.Process{ { @@ -900,7 +1176,7 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) { affinityMap := createReferenceAffinityMap(processes) require.Equal(t, map[string][]referenceAffinityNodeCount{ - "ref1": { + "ref1@": { { nodeid: "node3", count: 2, @@ -910,13 +1186,13 @@ func TestCreateReferenceAffinityNodeMap(t *testing.T) { count: 1, }, }, - "ref2": { + "ref2@": { { nodeid: "node2", count: 1, }, }, - "ref3": { + "ref3@": { { nodeid: "node2", count: 1, diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index f860f2a4..4c4449ad 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -41,6 +41,7 @@ type NodeReader interface { IPs() []string About() NodeAbout Version() NodeVersion + Resources() NodeResources Files() NodeFiles ProcessList() ([]Process, error) @@ -495,6 +496,22 @@ func (n *node) About() NodeAbout { return nodeAbout } +func (n *node) Resources() NodeResources { + n.stateLock.RLock() + defer n.stateLock.RUnlock() + + r := NodeResources{ + IsThrottling: n.resources.throttling, + NCPU: n.resources.ncpu, + CPU: n.resources.cpu, + CPULimit: n.resources.cpuLimit, + Mem: n.resources.mem, + MemLimit: n.resources.memLimit, + } + + return r +} + func (n *node) Version() NodeVersion { about, err := n.AboutPeer() if err != nil { diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 8d6655f6..8036e44e 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -239,9 +239,8 @@ func (p *proxy) Resources() map[string]NodeResources { p.lock.RLock() defer p.lock.RUnlock() - for _, node := range p.nodes { - about := node.About() - resources[about.ID] = about.Resources + for id, node := range p.nodes { + resources[id] = node.Resources() } return resources diff --git a/cluster/store/store.go b/cluster/store/store.go index 86a33ab4..f2816064 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -1,7 +1,6 @@ package store import ( - "bytes" "encoding/json" "fmt" "io" @@ -23,6 +22,7 @@ type Store interface { ProcessList() []Process GetProcess(id app.ProcessID) (Process, error) + GetProcessNodeMap() map[string]string UserList() Users GetUser(name string) Users @@ -68,6 +68,7 @@ const ( OpUpdateIdentity Operation = "updateIdentity" OpRemoveIdentity Operation = "removeIdentity" OpSetPolicies Operation = "setPolicies" + OpSetProcessNodeMap Operation = "setProcessNodeMap" ) type Command struct { @@ -112,10 +113,19 @@ type CommandSetPolicies struct { Policies []access.Policy } +type CommandSetProcessNodeMap struct { + Map map[string]string +} + // Implement a FSM type store struct { - lock sync.RWMutex - Process map[string]Process + lock sync.RWMutex + callback func(op Operation) + + logger log.Logger + + Process map[string]Process + ProcessNodeMap map[string]string Users struct { UpdatedAt time.Time @@ -126,10 +136,6 @@ type store struct { UpdatedAt time.Time Policies map[string][]access.Policy } - - callback func(op Operation) - - logger log.Logger } type Config struct { @@ -138,8 +144,9 @@ type Config struct { func NewStore(config Config) (Store, error) { s := &store{ - Process: map[string]Process{}, - logger: config.Logger, + Process: map[string]Process{}, + ProcessNodeMap: map[string]string{}, + logger: config.Logger, } s.Users.Users = map[string]identity.User{} @@ -219,6 +226,12 @@ func (s *store) Apply(entry *raft.Log) interface{} { json.Unmarshal(b, &cmd) err = s.setPolicies(cmd) + case OpSetProcessNodeMap: + b, _ := json.Marshal(c.Data) + cmd := CommandSetProcessNodeMap{} + json.Unmarshal(b, &cmd) + + err = s.setProcessNodeMap(cmd) default: s.logger.Warn().WithField("operation", c.Operation).Log("Unknown operation") return nil @@ -244,6 +257,10 @@ func (s *store) addProcess(cmd CommandAddProcess) error { id := cmd.Config.ProcessID().String() + if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 { + return NewStoreError("the process with the ID '%s' must have limits defined", id) + } + _, ok := s.Process[id] if ok { return NewStoreError("the process with the ID '%s' already exists", id) @@ -283,15 +300,16 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error { srcid := cmd.ID.String() dstid := cmd.Config.ProcessID().String() + if cmd.Config.LimitCPU <= 0 || cmd.Config.LimitMemory <= 0 { + return NewStoreError("the process with the ID '%s' must have limits defined", dstid) + } + p, ok := s.Process[srcid] if !ok { return NewStoreError("the process with the ID '%s' doesn't exists", srcid) } - currentHash := p.Config.Hash() - replaceHash := cmd.Config.Hash() - - if bytes.Equal(currentHash, replaceHash) { + if p.Config.Equal(cmd.Config) { return nil } @@ -404,6 +422,15 @@ func (s *store) setPolicies(cmd CommandSetPolicies) error { return nil } +func (s *store) setProcessNodeMap(cmd CommandSetProcessNodeMap) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.ProcessNodeMap = cmd.Map + + return nil +} + func (s *store) OnApply(fn func(op Operation)) { s.lock.Lock() defer s.lock.Unlock() @@ -545,6 +572,19 @@ func (s *store) PolicyUserList(name string) Policies { return p } +func (s *store) GetProcessNodeMap() map[string]string { + s.lock.RLock() + defer s.lock.RUnlock() + + m := map[string]string{} + + for key, value := range s.ProcessNodeMap { + m[key] = value + } + + return m +} + type fsmSnapshot struct { data []byte } diff --git a/config/config.go b/config/config.go index 94d8ec86..6e32efc4 100644 --- a/config/config.go +++ b/config/config.go @@ -291,6 +291,7 @@ func (d *Config) init() { d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false) d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) } // Validate validates the current state of the Config for completeness and sanity. Errors are diff --git a/config/data.go b/config/data.go index 520b5a09..17234092 100644 --- a/config/data.go +++ b/config/data.go @@ -173,14 +173,15 @@ type Data struct { MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100 } `json:"resources"` Cluster struct { - Enable bool `json:"enable"` - Bootstrap bool `json:"bootstrap"` - Recover bool `json:"recover"` - Debug bool `json:"debug"` - Address string `json:"address"` // ip:port - Peers []string `json:"peers"` - SyncInterval int64 `json:"sync_interval" format:"int64"` // seconds - NodeRecoverTimeout int64 `json:"node_recover_timeout" format:"int64"` // seconds + Enable bool `json:"enable"` + Bootstrap bool `json:"bootstrap"` + Recover bool `json:"recover"` + Debug bool `json:"debug"` + Address string `json:"address"` // ip:port + Peers []string `json:"peers"` + SyncInterval int64 `json:"sync_interval" format:"int64"` // seconds + NodeRecoverTimeout int64 `json:"node_recover_timeout" format:"int64"` // seconds + EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout" format:"int64"` // seconds } `json:"cluster"` } diff --git a/log/format.go b/log/format.go index 70b0fb75..a99ad010 100644 --- a/log/format.go +++ b/log/format.go @@ -96,15 +96,25 @@ func (f *consoleFormatter) String(e *Event) string { value := e.Data[key] switch val := value.(type) { + case bool: + if val { + v = "true" + } else { + v = "false" + } case string: v = f.quote(val) case error: v = f.quote(val.Error()) default: - if jsonvalue, err := json.Marshal(value); err == nil { - v = string(jsonvalue) + if str, ok := val.(fmt.Stringer); ok { + v = f.quote(str.String()) } else { - v = f.quote(err.Error()) + if jsonvalue, err := json.Marshal(value); err == nil { + v = string(jsonvalue) + } else { + v = f.quote(err.Error()) + } } } diff --git a/restream/app/process.go b/restream/app/process.go index ee81764a..09fea2a5 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -176,6 +176,10 @@ func (config *Config) Hash() []byte { return sum[:] } +func (c *Config) Equal(a *Config) bool { + return bytes.Equal(c.Hash(), a.Hash()) +} + func (c *Config) ProcessID() ProcessID { return ProcessID{ ID: c.ID, @@ -293,7 +297,7 @@ func (p ProcessID) String() string { return p.ID + "@" + p.Domain } -func (p ProcessID) Equals(b ProcessID) bool { +func (p ProcessID) Equal(b ProcessID) bool { if p.ID == b.ID && p.Domain == b.Domain { return true } diff --git a/restream/restream.go b/restream/restream.go index a0fe9c5c..01870d4c 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1,7 +1,6 @@ package restream import ( - "bytes" "context" "errors" "fmt" @@ -1177,17 +1176,14 @@ func (r *restream) UpdateProcess(id app.ProcessID, config *app.Config) error { return err } - currentHash := task.config.Hash() - replaceHash := t.config.Hash() - // If the new config has the same hash as the current config, do nothing. - if bytes.Equal(currentHash, replaceHash) { + if task.config.Equal(t.config) { return nil } tid := t.ID() - if !tid.Equals(id) { + if !tid.Equal(id) { _, ok := r.tasks[tid] if ok { return ErrProcessExists