diff --git a/cluster/cluster.go b/cluster/cluster.go index 0b20f44b..fbf11ba3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -116,6 +116,7 @@ type cluster struct { store Store reassertLeaderCh chan chan error + cancelLeaderShip context.CancelFunc leaveCh chan struct{} diff --git a/cluster/leader.go b/cluster/leader.go index 3fbd8a19..bb28b4bc 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -7,6 +7,7 @@ import ( "time" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/restream/app" ) const NOTIFY_FOLLOWER = 0 @@ -279,57 +280,395 @@ WAIT: return case <-interval: goto RECONCILE - case errCh := <-c.reassertLeaderCh: - // we can get into this state when the initial - // establishLeadership has failed as well as the follow - // up leadershipTransfer. Afterwards we will be waiting - // for the interval to trigger a reconciliation and can - // potentially end up here. There is no point to - // reassert because this agent was never leader in the - // first place. - if !establishedLeader { - errCh <- fmt.Errorf("leadership has not been established") - continue - } - - // continue to reassert only if we previously were the - // leader, which means revokeLeadership followed by an - // establishLeadership(). - c.revokeLeadership() - err := c.establishLeadership(context.TODO()) - errCh <- err - - // in case establishLeadership failed, we will try to - // transfer leadership. At this time raft thinks we are - // the leader, but we disagree. - if err != nil { - if err := c.leadershipTransfer(); err != nil { - // establishedLeader was true before, - // but it no longer is since it revoked - // leadership and Leadership transfer - // also failed. Which is why it stays - // in the leaderLoop, but now - // establishedLeader needs to be set to - // false. - establishedLeader = false - interval = time.After(5 * time.Second) - goto WAIT - } - - // leadershipTransfer was successful and it is - // time to leave the leaderLoop. - return - } - } } } func (c *cluster) establishLeadership(ctx context.Context) error { c.logger.Debug().Log("establishing leadership") + + ctx, cancel := context.WithCancel(ctx) + c.cancelLeaderShip = cancel + + go c.startRebalance(ctx) + return nil } func (c *cluster) revokeLeadership() { c.logger.Debug().Log("revoking leadership") + + c.cancelLeaderShip() +} + +func (c *cluster) startRebalance(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.doSynchronize() + //c.doRebalance() + } + } +} + +func (c *cluster) applyOpStack(stack []interface{}) { + for _, op := range stack { + switch v := op.(type) { + case processOpAdd: + err := c.proxy.ProcessAdd(v.nodeid, v.config) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "nodeid": v.nodeid, + }).Log("Adding process failed") + break + } + err = c.proxy.ProcessStart(v.nodeid, v.config.ID) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "nodeid": v.nodeid, + }).Log("Starting process failed") + break + } + c.logger.Info().WithFields(log.Fields{ + "processid": v.config.ID, + "nodeid": v.nodeid, + }).Log("Adding process") + case processOpDelete: + err := c.proxy.ProcessDelete(v.nodeid, v.processid) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Removing process failed") + break + } + c.logger.Info().WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Removing process") + case processOpMove: + err := c.proxy.ProcessAdd(v.toNodeid, v.config) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, adding process failed") + break + } + err = c.proxy.ProcessDelete(v.fromNodeid, v.config.ID) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, removing process failed") + break + } + err = c.proxy.ProcessStart(v.toNodeid, v.config.ID) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.config.ID, + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process, starting process failed") + break + } + c.logger.Info().WithFields(log.Fields{ + "processid": v.config.ID, + "fromnodeid": v.fromNodeid, + "tonodeid": v.toNodeid, + }).Log("Moving process") + case processOpStart: + err := c.proxy.ProcessStart(v.nodeid, v.processid) + if err != nil { + c.logger.Info().WithError(err).WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Starting process failed") + break + } + c.logger.Info().WithFields(log.Fields{ + "processid": v.processid, + "nodeid": v.nodeid, + }).Log("Starting process") + case processOpSkip: + c.logger.Warn().WithField("processid", v.processid).Log("Not enough resources to deploy process") + default: + c.logger.Warn().Log("Unknown operation on stack: %+v", v) + } + } +} + +func (c *cluster) doSynchronize() { + want := c.store.ProcessList() + have := c.proxy.ProcessList() + resources := c.proxy.Resources() + + opStack := synchronize(want, have, resources) + + c.applyOpStack(opStack) +} + +func (c *cluster) doRebalance() { + have := c.proxy.ProcessList() + resources := c.proxy.Resources() + + opStack := c.rebalance(have, resources) + + c.applyOpStack(opStack) +} + +type processOpDelete struct { + nodeid string + processid string +} + +type processOpMove struct { + fromNodeid string + toNodeid string + config *app.Config +} + +type processOpStart struct { + nodeid string + processid string +} + +type processOpAdd struct { + nodeid string + config *app.Config +} + +type processOpSkip struct { + processid string +} + +// normalizeProcessesAndResources normalizes the CPU and memory consumption of the processes and resources in-place. +func normalizeProcessesAndResources(processes []ProcessConfig, resources map[string]NodeResources) { + maxNCPU := .0 + maxMemTotal := .0 + + for _, r := range resources { + if r.NCPU > maxNCPU { + maxNCPU = r.NCPU + } + if r.MemTotal > maxMemTotal { + maxMemTotal = r.MemTotal + } + } + + for id, r := range resources { + factor := maxNCPU / r.NCPU + r.CPU = 100 - (100-r.CPU)/factor + + factor = maxMemTotal / r.MemTotal + r.Mem = 100 - (100-r.Mem)/factor + + resources[id] = r + } + + for i, p := range processes { + r, ok := resources[p.NodeID] + if !ok { + p.CPU = 100 + p.Mem = 100 + } + + factor := maxNCPU / r.NCPU + p.CPU = 100 - (100-p.CPU)/factor + + factor = maxMemTotal / r.MemTotal + p.Mem = 100 - (100-p.Mem)/factor + + processes[i] = p + } + + for id, r := range resources { + r.NCPU = maxNCPU + r.MemTotal = maxMemTotal + + resources[id] = r + } +} + +// synchronize returns a list of operations in order to adjust the "have" list to the "want" list +// with taking the available resources on each node into account. +func synchronize(want []app.Config, have []ProcessConfig, resources map[string]NodeResources) []interface{} { + opStack := []interface{}{} + + normalizeProcessesAndResources(have, resources) + + // A map from the process ID to the process config of the processes + // we want to be running on the nodes. + wantMap := map[string]*app.Config{} + for _, config := range want { + wantMap[config.ID] = &config + } + + haveAfterRemove := []ProcessConfig{} + + // Now we iterate through the processes we actually have running on the nodes + // and remove them from the wantMap. We also make sure that they are running. + // If a process is not on the wantMap, it will be deleted from the nodes. + for _, p := range have { + if _, ok := wantMap[p.Config.ID]; !ok { + opStack = append(opStack, processOpDelete{ + nodeid: p.NodeID, + processid: p.Config.ID, + }) + + r, ok := resources[p.NodeID] + if ok { + r.CPU -= p.CPU + r.Mem -= p.Mem + resources[p.NodeID] = r + } + + continue + } + + delete(wantMap, p.Config.ID) + + if p.Order != "start" { + opStack = append(opStack, processOpStart{ + nodeid: p.NodeID, + processid: p.Config.ID, + }) + } + + haveAfterRemove = append(haveAfterRemove, p) + } + + have = haveAfterRemove + + // A map from the process reference to the node it is running on + haveReferenceAffinityMap := map[string]string{} + for _, p := range have { + haveReferenceAffinityMap[p.Config.Reference] = p.NodeID + } + + // Now all remaining processes in the wantMap must be added to one of the nodes + for _, config := range wantMap { + // Check if there are already processes with the same reference, and if so + // chose this node. Then check the node if it has enough resources left. If + // not, then select a node with the most available resources. + reference := config.Reference + nodeid := haveReferenceAffinityMap[reference] + + if len(nodeid) != 0 { + cpu := config.LimitCPU / resources[nodeid].NCPU + mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal) + + if resources[nodeid].CPU+cpu < 90 && resources[nodeid].Mem+mem < 90 { + opStack = append(opStack, processOpAdd{ + nodeid: nodeid, + config: config, + }) + + continue + } + } + + // Find the node with the most resources available + nodeid = "" + for id, r := range resources { + cpu := config.LimitCPU / r.NCPU + mem := float64(config.LimitMemory) / float64(r.MemTotal) + + if len(nodeid) == 0 { + if r.CPU+cpu < 90 && r.Mem+mem < 90 { + nodeid = id + } + + continue + } + + if r.CPU+r.Mem < resources[nodeid].CPU+resources[nodeid].Mem { + nodeid = id + } + /* + if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem { + nodeid = id + } else if r.Mem < resources[nodeid].Mem { + nodeid = id + } else if r.CPU < resources[nodeid].CPU { + nodeid = id + } + */ + } + + if len(nodeid) != 0 { + opStack = append(opStack, processOpAdd{ + nodeid: nodeid, + config: config, + }) + } else { + opStack = append(opStack, processOpSkip{ + processid: config.ID, + }) + } + } + + return opStack +} + +// rebalance returns a list of operations that will lead to nodes that are not overloaded +func (c *cluster) rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} { + processNodeMap := map[string][]ProcessConfig{} + + for _, p := range have { + processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p) + } + + opStack := []interface{}{} + + // Check if any of the nodes is overloaded + for id, r := range resources { + if r.CPU >= 90 || r.Mem >= 90 { + // Node is overloaded + + // Find another node with more resources available + nodeid := id + for id, r := range resources { + if id == nodeid { + continue + } + + if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem { + nodeid = id + } + } + + if nodeid != id { + // there's an other node with more resources available + diff_cpu := r.CPU - resources[nodeid].CPU + diff_mem := r.Mem - resources[nodeid].Mem + + // all processes that could be moved, should be listed in + // an arry. this array should be sorted by the process' runtime + // in order to chose the one with the least runtime. repeat. + for _, p := range processNodeMap[id] { + if p.CPU < diff_cpu && p.Mem < diff_mem { + opStack = append(opStack, processOpMove{ + fromNodeid: id, + toNodeid: nodeid, + config: p.Config, + }) + + break + } + } + } + } + } + + return opStack } diff --git a/cluster/leader_test.go b/cluster/leader_test.go new file mode 100644 index 00000000..0721a3ea --- /dev/null +++ b/cluster/leader_test.go @@ -0,0 +1,291 @@ +package cluster + +import ( + "testing" + + "github.com/datarhei/core/v16/restream/app" + "github.com/stretchr/testify/require" +) + +func TestNormalize(t *testing.T) { + have := []ProcessConfig{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 2, + CPU: 7, + Mem: 35, + MemTotal: 2 * 1024 * 1024 * 1024, // 2GB + }, + "node2": { + NCPU: 1, + CPU: 75, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, // 4GB + }, + } + + normalizeProcessesAndResources(have, resources) + + require.Equal(t, []ProcessConfig{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 56, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + }, + }, + }, have) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 2, + CPU: 7, + Mem: 67.5, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 2, + CPU: 87.5, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + }, resources) + + // test idempotency + normalizeProcessesAndResources(have, resources) + + require.Equal(t, []ProcessConfig{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 56, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + }, + }, + }, have) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 2, + CPU: 7, + Mem: 67.5, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 2, + CPU: 87.5, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + }, resources) +} + +func TestSynchronizeAdd(t *testing.T) { + want := []app.Config{ + { + ID: "foobar", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + } + + have := []ProcessConfig{} + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 7, + Mem: 67.5, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 1, + CPU: 87.5, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpAdd{ + nodeid: "node1", + config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + }, + }, stack) +} + +func TestSynchronizeAddLimit(t *testing.T) { + want := []app.Config{ + { + ID: "foobar", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + } + + have := []ProcessConfig{} + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 81, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 1, + CPU: 79, + Mem: 72, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpAdd{ + nodeid: "node2", + config: &app.Config{ + ID: "foobar", + LimitCPU: 10, + LimitMemory: 50 * 1024 * 1024, + }, + }, + }, stack) +} + +func TestSynchronizeRemove(t *testing.T) { + want := []app.Config{} + + have := []ProcessConfig{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 7, + Mem: 67.5, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 1, + CPU: 87.5, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 7, + Mem: 67.5, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 1, + CPU: 75.5, + Mem: 6, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + }, resources) + + require.Equal(t, []interface{}{ + processOpDelete{ + nodeid: "node2", + processid: "foobar", + }, + }, stack) +} + +func TestSynchronizeAddRemove(t *testing.T) { + want := []app.Config{ + { + ID: "foobar1", + }, + } + + have := []ProcessConfig{ + { + NodeID: "node2", + Order: "start", + State: "running", + CPU: 12, + Mem: 5, + Runtime: 42, + Config: &app.Config{ + ID: "foobar2", + }, + }, + } + + resources := map[string]NodeResources{ + "node1": { + NCPU: 1, + CPU: 7, + Mem: 67.5, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + "node2": { + NCPU: 1, + CPU: 87.5, + Mem: 11, + MemTotal: 4 * 1024 * 1024 * 1024, + }, + } + + stack := synchronize(want, have, resources) + + require.Equal(t, []interface{}{ + processOpDelete{ + nodeid: "node2", + processid: "foobar2", + }, + processOpAdd{ + nodeid: "node1", + config: &app.Config{ + ID: "foobar1", + }, + }, + }, stack) +} diff --git a/cluster/node.go b/cluster/node.go index 62ab5ed2..9f2665f1 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -14,6 +14,7 @@ import ( "github.com/datarhei/core/v16/client" httpapi "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/restream/app" ) type Node interface { @@ -26,8 +27,8 @@ type Node interface { GetURL(path string) (string, error) GetFile(path string) (io.ReadCloser, error) - ProcessGetAll() ([]string, error) - ProcessSet() error + ProcessList() ([]ProcessConfig, error) + ProcessAdd(*app.Config) error ProcessStart(id string) error ProcessStop(id string) error ProcessDelete(id string) error @@ -49,13 +50,19 @@ type NodeFiles struct { LastUpdate time.Time } +type NodeResources struct { + NCPU float64 + CPU float64 + Mem float64 + MemTotal float64 +} + type NodeState struct { ID string State string LastContact time.Time Latency time.Duration - CPU float64 - Mem float64 + Resources NodeResources } type nodeState string @@ -80,8 +87,10 @@ type node struct { lastContact time.Time resources struct { - cpu float64 - mem float64 + ncpu float64 + cpu float64 + mem float64 + memTotal float64 } state nodeState @@ -233,6 +242,9 @@ func (n *node) Connect() error { // Metrics metrics, err := n.peer.Metrics(httpapi.MetricsQuery{ Metrics: []httpapi.MetricsQueryMetric{ + { + Name: "cpu_ncpu", + }, { Name: "cpu_idle", }, @@ -247,10 +259,13 @@ func (n *node) Connect() error { if err != nil { n.stateLock.Lock() n.resources.cpu = 100 + n.resources.ncpu = 1 n.resources.mem = 100 + n.resources.memTotal = 1 n.stateLock.Unlock() } + cpu_ncpu := .0 cpu_idle := .0 mem_total := .0 mem_free := .0 @@ -258,6 +273,8 @@ func (n *node) Connect() error { for _, x := range metrics.Metrics { if x.Name == "cpu_idle" { cpu_idle = x.Values[0].Value + } else if x.Name == "cpu_ncpu" { + cpu_ncpu = x.Values[0].Value } else if x.Name == "mem_total" { mem_total = x.Values[0].Value } else if x.Name == "mem_free" { @@ -266,11 +283,14 @@ func (n *node) Connect() error { } n.stateLock.Lock() + n.resources.ncpu = cpu_ncpu n.resources.cpu = 100 - cpu_idle if mem_total != 0 { n.resources.mem = (mem_total - mem_free) / mem_total * 100 + n.resources.memTotal = mem_total } else { n.resources.mem = 100 + n.resources.memTotal = 1 } n.lastContact = time.Now() n.stateLock.Unlock() @@ -382,8 +402,12 @@ func (n *node) State() NodeState { LastContact: n.lastContact, State: n.state.String(), Latency: time.Duration(n.latency * float64(time.Second)), - CPU: n.resources.cpu, - Mem: n.resources.mem, + Resources: NodeResources{ + NCPU: n.resources.ncpu, + CPU: n.resources.cpu, + Mem: n.resources.mem, + MemTotal: n.resources.memTotal, + }, } return state @@ -551,23 +575,41 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { return nil, fmt.Errorf("unknown prefix") } -func (n *node) ProcessGetAll() ([]string, error) { - processes, err := n.peer.ProcessList(nil, nil) +func (n *node) ProcessList() ([]ProcessConfig, error) { + list, err := n.peer.ProcessList(nil, nil) if err != nil { return nil, err } - list := []string{} + processes := []ProcessConfig{} - for _, p := range processes { - list = append(list, p.ID) + for _, p := range list { + process := ProcessConfig{ + NodeID: n.ID(), + Order: p.State.Order, + State: p.State.State, + Mem: float64(p.State.Memory) / float64(n.resources.memTotal), + Runtime: time.Duration(p.State.Runtime) * time.Second, + Config: p.Config.Marshal(), + } + + if x, err := p.State.CPU.Float64(); err == nil { + process.CPU = x / n.resources.ncpu + } else { + process.CPU = 100 + } + + processes = append(processes, process) } - return list, nil + return processes, nil } -func (n *node) ProcessSet() error { - return nil +func (n *node) ProcessAdd(config *app.Config) error { + cfg := httpapi.ProcessConfig{} + cfg.Unmarshal(config) + + return n.peer.ProcessAdd(cfg) } func (n *node) ProcessStart(id string) error { diff --git a/cluster/proxy.go b/cluster/proxy.go index 01b8e593..ced4b1f1 100644 --- a/cluster/proxy.go +++ b/cluster/proxy.go @@ -9,6 +9,7 @@ import ( "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/net" + "github.com/datarhei/core/v16/restream/app" ) type Proxy interface { @@ -19,8 +20,63 @@ type Proxy interface { RemoveNode(id string) error ProxyReader - Reader() ProxyReader + + ProxyProcessor + Processor() ProxyProcessor +} + +type ProxyProcessor interface { + Resources() map[string]NodeResources + + ProcessList() []ProcessConfig + ProcessAdd(nodeid string, config *app.Config) error + ProcessDelete(nodeid string, id string) error + ProcessStart(nodeid string, id string) error +} + +type proxyProcessor struct { + proxy *proxy +} + +func (p *proxyProcessor) Resources() map[string]NodeResources { + if p.proxy == nil { + return nil + } + + return p.proxy.Resources() +} + +func (p *proxyProcessor) ProcessList() []ProcessConfig { + if p.proxy == nil { + return nil + } + + return p.proxy.ProcessList() +} + +func (p *proxyProcessor) ProcessAdd(nodeid string, config *app.Config) error { + if p.proxy == nil { + return fmt.Errorf("no proxy provided") + } + + return p.proxy.ProcessAdd(nodeid, config) +} + +func (p *proxyProcessor) ProcessDelete(nodeid string, id string) error { + if p.proxy == nil { + return fmt.Errorf("no proxy provided") + } + + return p.proxy.ProcessDelete(nodeid, id) +} + +func (p *proxyProcessor) ProcessStart(nodeid string, id string) error { + if p.proxy == nil { + return fmt.Errorf("no proxy provided") + } + + return p.proxy.ProcessStart(nodeid, id) } type ProxyReader interface { @@ -205,6 +261,25 @@ func (p *proxy) Reader() ProxyReader { } } +func (p *proxy) Processor() ProxyProcessor { + return &proxyProcessor{ + proxy: p, + } +} + +func (p *proxy) Resources() map[string]NodeResources { + resources := map[string]NodeResources{} + + p.lock.RLock() + defer p.lock.RUnlock() + + for _, node := range p.nodes { + resources[node.ID()] = node.State().Resources + } + + return resources +} + func (p *proxy) AddNode(id string, node Node) (string, error) { if id != node.ID() { return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID()) @@ -370,3 +445,114 @@ func (p *proxy) GetFile(path string) (io.ReadCloser, error) { return data, nil } + +type ProcessConfig struct { + NodeID string + Order string + State string + CPU float64 + Mem float64 + Runtime time.Duration + Config *app.Config +} + +func (p *proxy) ProcessList() []ProcessConfig { + processChan := make(chan ProcessConfig, 64) + processList := []ProcessConfig{} + + wgList := sync.WaitGroup{} + wgList.Add(1) + + go func() { + defer wgList.Done() + + for file := range processChan { + processList = append(processList, file) + } + }() + + wg := sync.WaitGroup{} + + p.lock.RLock() + for _, node := range p.nodes { + wg.Add(1) + + go func(node Node, p chan<- ProcessConfig) { + defer wg.Done() + + processes, err := node.ProcessList() + if err != nil { + return + } + + for _, process := range processes { + p <- process + } + }(node, processChan) + } + p.lock.RUnlock() + + wg.Wait() + + close(processChan) + + wgList.Wait() + + return processList +} + +func (p *proxy) ProcessAdd(nodeid string, config *app.Config) error { + p.lock.RLock() + defer p.lock.RUnlock() + + node, ok := p.nodes[nodeid] + if !ok { + return fmt.Errorf("node not found") + } + + err := node.ProcessAdd(config) + if err != nil { + return err + } + + err = node.ProcessStart(config.ID) + if err != nil { + return err + } + + return nil +} + +func (p *proxy) ProcessDelete(nodeid string, id string) error { + p.lock.RLock() + defer p.lock.RUnlock() + + node, ok := p.nodes[nodeid] + if !ok { + return fmt.Errorf("node not found") + } + + err := node.ProcessDelete(id) + if err != nil { + return err + } + + return nil +} + +func (p *proxy) ProcessStart(nodeid string, id string) error { + p.lock.RLock() + defer p.lock.RUnlock() + + node, ok := p.nodes[nodeid] + if !ok { + return fmt.Errorf("node not found") + } + + err := node.ProcessStart(id) + if err != nil { + return err + } + + return nil +} diff --git a/cluster/store.go b/cluster/store.go index 711f5945..04f615a6 100644 --- a/cluster/store.go +++ b/cluster/store.go @@ -14,7 +14,7 @@ import ( type Store interface { raft.FSM - ListProcesses() []app.Config + ProcessList() []app.Config GetProcess(id string) (app.Config, error) } @@ -126,7 +126,7 @@ func (s *store) Restore(snapshot io.ReadCloser) error { return nil } -func (s *store) ListProcesses() []app.Config { +func (s *store) ProcessList() []app.Config { s.lock.RLock() defer s.lock.RUnlock() diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 506fe167..60198ddb 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -49,8 +49,8 @@ func (h *ClusterHandler) GetProxyNodes(c echo.Context) error { LastContact: state.LastContact.Unix(), Latency: state.Latency.Seconds() * 1000, State: state.State, - CPU: state.CPU, - Mem: state.Mem, + CPU: state.Resources.CPU, + Mem: state.Resources.Mem, } list = append(list, n) @@ -86,8 +86,8 @@ func (h *ClusterHandler) GetProxyNode(c echo.Context) error { LastContact: state.LastContact.Unix(), Latency: state.Latency.Seconds() * 1000, State: state.State, - CPU: state.CPU, - Mem: state.Mem, + CPU: state.Resources.CPU, + Mem: state.Resources.Mem, } return c.JSON(http.StatusOK, node)