diff --git a/cluster/leader.go b/cluster/leader.go index 8bf4bd26..1ed844af 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -998,6 +998,10 @@ func synchronize(wish map[string]string, want []store.Process, have []proxy.Proc if nodeid, ok := wish[pid]; ok { // Check for how long the node hasn't been contacted, or if it still exists. if node, ok := nodes[nodeid]; ok { + if node.State != "disconnected" { + continue + } + if time.Since(node.LastContact) <= nodeRecoverTimeout { reality[pid] = nodeid delete(wantMap, pid) diff --git a/cluster/leader_test.go b/cluster/leader_test.go index fbabb195..9cf814eb 100644 --- a/cluster/leader_test.go +++ b/cluster/leader_test.go @@ -39,6 +39,7 @@ func TestSynchronizeAdd(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -49,6 +50,7 @@ func TestSynchronizeAdd(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -106,6 +108,70 @@ func TestSynchronizeAdd(t *testing.T) { }, resources) } +func TestSynchronizeAddDeleted(t *testing.T) { + wish := map[string]string{ + "foobar@": "node1", + } + + now := time.Now() + + want := []store.Process{ + { + UpdatedAt: now, + Config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 20, + }, + Order: "stop", + }, + } + + have := []proxy.Process{} + + nodes := map[string]proxy.NodeAbout{ + "node1": { + State: "connected", + LastContact: time.Now(), + Resources: proxy.NodeResources{ + NCPU: 1, + CPU: 1, + Mem: 1, + CPULimit: 90, + MemLimit: 90, + }, + }, + } + + stack, resources, reality := synchronize(wish, want, have, nodes, 2*time.Minute) + + require.Equal(t, []interface{}{ + processOpAdd{ + nodeid: "node1", + config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 20, + }, + order: "stop", + }, + }, stack) + + require.Equal(t, map[string]string{ + "foobar@": "node1", + }, reality) + + require.Equal(t, map[string]proxy.NodeResources{ + "node1": { + NCPU: 1, + CPU: 11, + Mem: 21, + CPULimit: 90, + MemLimit: 90, + }, + }, resources) +} + func TestSynchronizeOrderStop(t *testing.T) { wish := map[string]string{ "foobar@": "node1", @@ -144,6 +210,7 @@ func TestSynchronizeOrderStop(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -154,6 +221,7 @@ func TestSynchronizeOrderStop(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -234,6 +302,7 @@ func TestSynchronizeOrderStart(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -244,6 +313,7 @@ func TestSynchronizeOrderStart(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -336,6 +406,7 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -346,6 +417,7 @@ func TestSynchronizeAddReferenceAffinity(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -436,6 +508,7 @@ func TestSynchronizeAddReferenceAffinityMultiple(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -446,6 +519,7 @@ func TestSynchronizeAddReferenceAffinityMultiple(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -531,6 +605,7 @@ func TestSynchronizeAddReferenceAffinityMultipleEmptyNodes(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -541,6 +616,7 @@ func TestSynchronizeAddReferenceAffinityMultipleEmptyNodes(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -584,6 +660,7 @@ func TestSynchronizeAddLimit(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -594,6 +671,7 @@ func TestSynchronizeAddLimit(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -660,6 +738,7 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -670,6 +749,7 @@ func TestSynchronizeAddNoResourcesCPU(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -710,6 +790,7 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -720,6 +801,7 @@ func TestSynchronizeAddNoResourcesMemory(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -758,6 +840,7 @@ func TestSynchronizeAddNoLimits(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -768,6 +851,7 @@ func TestSynchronizeAddNoLimits(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -812,6 +896,7 @@ func TestSynchronizeRemove(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -822,6 +907,7 @@ func TestSynchronizeRemove(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -895,6 +981,7 @@ func TestSynchronizeAddRemove(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -905,6 +992,7 @@ func TestSynchronizeAddRemove(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -993,6 +1081,7 @@ func TestSynchronizeNoUpdate(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1003,6 +1092,7 @@ func TestSynchronizeNoUpdate(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1060,6 +1150,7 @@ func TestSynchronizeUpdate(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1070,6 +1161,7 @@ func TestSynchronizeUpdate(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1142,6 +1234,7 @@ func TestSynchronizeUpdateMetadata(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1152,6 +1245,7 @@ func TestSynchronizeUpdateMetadata(t *testing.T) { }, }, "node2": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1237,6 +1331,7 @@ func TestSynchronizeWaitDisconnectedNode(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1247,6 +1342,7 @@ func TestSynchronizeWaitDisconnectedNode(t *testing.T) { }, }, "node2": { + State: "disconnected", LastContact: time.Now().Add(-time.Minute), Resources: proxy.NodeResources{ IsThrottling: true, @@ -1319,6 +1415,7 @@ func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1329,6 +1426,7 @@ func TestSynchronizeWaitDisconnectedNodeNoWish(t *testing.T) { }, }, "node2": { + State: "disconnected", LastContact: time.Now().Add(-time.Minute), Resources: proxy.NodeResources{ IsThrottling: true, @@ -1413,6 +1511,7 @@ func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1423,6 +1522,7 @@ func TestSynchronizeWaitDisconnectedNodeUnrealisticWish(t *testing.T) { }, }, "node2": { + State: "disconnected", LastContact: time.Now().Add(-time.Minute), Resources: proxy.NodeResources{ IsThrottling: true, @@ -1507,6 +1607,7 @@ func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) { nodes := map[string]proxy.NodeAbout{ "node1": { + State: "connected", LastContact: time.Now(), Resources: proxy.NodeResources{ NCPU: 1, @@ -1517,6 +1618,7 @@ func TestSynchronizeTimeoutDisconnectedNode(t *testing.T) { }, }, "node2": { + State: "disconnected", LastContact: time.Now().Add(-3 * time.Minute), Resources: proxy.NodeResources{ IsThrottling: true,