Add process synchronization in leader role

This commit is contained in:
Ingo Oppermann
2023-05-09 20:48:30 +02:00
parent ab86b8fd5d
commit ae04dc50c7
7 changed files with 924 additions and 65 deletions

View File

@@ -116,6 +116,7 @@ type cluster struct {
store Store store Store
reassertLeaderCh chan chan error reassertLeaderCh chan chan error
cancelLeaderShip context.CancelFunc
leaveCh chan struct{} leaveCh chan struct{}

View File

@@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/app"
) )
const NOTIFY_FOLLOWER = 0 const NOTIFY_FOLLOWER = 0
@@ -279,57 +280,395 @@ WAIT:
return return
case <-interval: case <-interval:
goto RECONCILE goto RECONCILE
case errCh := <-c.reassertLeaderCh:
// we can get into this state when the initial
// establishLeadership has failed as well as the follow
// up leadershipTransfer. Afterwards we will be waiting
// for the interval to trigger a reconciliation and can
// potentially end up here. There is no point to
// reassert because this agent was never leader in the
// first place.
if !establishedLeader {
errCh <- fmt.Errorf("leadership has not been established")
continue
}
// continue to reassert only if we previously were the
// leader, which means revokeLeadership followed by an
// establishLeadership().
c.revokeLeadership()
err := c.establishLeadership(context.TODO())
errCh <- err
// in case establishLeadership failed, we will try to
// transfer leadership. At this time raft thinks we are
// the leader, but we disagree.
if err != nil {
if err := c.leadershipTransfer(); err != nil {
// establishedLeader was true before,
// but it no longer is since it revoked
// leadership and Leadership transfer
// also failed. Which is why it stays
// in the leaderLoop, but now
// establishedLeader needs to be set to
// false.
establishedLeader = false
interval = time.After(5 * time.Second)
goto WAIT
}
// leadershipTransfer was successful and it is
// time to leave the leaderLoop.
return
}
} }
} }
} }
func (c *cluster) establishLeadership(ctx context.Context) error { func (c *cluster) establishLeadership(ctx context.Context) error {
c.logger.Debug().Log("establishing leadership") c.logger.Debug().Log("establishing leadership")
ctx, cancel := context.WithCancel(ctx)
c.cancelLeaderShip = cancel
go c.startRebalance(ctx)
return nil return nil
} }
func (c *cluster) revokeLeadership() { func (c *cluster) revokeLeadership() {
c.logger.Debug().Log("revoking leadership") c.logger.Debug().Log("revoking leadership")
c.cancelLeaderShip()
}
func (c *cluster) startRebalance(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.doSynchronize()
//c.doRebalance()
}
}
}
func (c *cluster) applyOpStack(stack []interface{}) {
for _, op := range stack {
switch v := op.(type) {
case processOpAdd:
err := c.proxy.ProcessAdd(v.nodeid, v.config)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"nodeid": v.nodeid,
}).Log("Adding process failed")
break
}
err = c.proxy.ProcessStart(v.nodeid, v.config.ID)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"nodeid": v.nodeid,
}).Log("Starting process failed")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID,
"nodeid": v.nodeid,
}).Log("Adding process")
case processOpDelete:
err := c.proxy.ProcessDelete(v.nodeid, v.processid)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Removing process failed")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Removing process")
case processOpMove:
err := c.proxy.ProcessAdd(v.toNodeid, v.config)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, adding process failed")
break
}
err = c.proxy.ProcessDelete(v.fromNodeid, v.config.ID)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, removing process failed")
break
}
err = c.proxy.ProcessStart(v.toNodeid, v.config.ID)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.config.ID,
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process, starting process failed")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.config.ID,
"fromnodeid": v.fromNodeid,
"tonodeid": v.toNodeid,
}).Log("Moving process")
case processOpStart:
err := c.proxy.ProcessStart(v.nodeid, v.processid)
if err != nil {
c.logger.Info().WithError(err).WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Starting process failed")
break
}
c.logger.Info().WithFields(log.Fields{
"processid": v.processid,
"nodeid": v.nodeid,
}).Log("Starting process")
case processOpSkip:
c.logger.Warn().WithField("processid", v.processid).Log("Not enough resources to deploy process")
default:
c.logger.Warn().Log("Unknown operation on stack: %+v", v)
}
}
}
func (c *cluster) doSynchronize() {
want := c.store.ProcessList()
have := c.proxy.ProcessList()
resources := c.proxy.Resources()
opStack := synchronize(want, have, resources)
c.applyOpStack(opStack)
}
func (c *cluster) doRebalance() {
have := c.proxy.ProcessList()
resources := c.proxy.Resources()
opStack := c.rebalance(have, resources)
c.applyOpStack(opStack)
}
type processOpDelete struct {
nodeid string
processid string
}
type processOpMove struct {
fromNodeid string
toNodeid string
config *app.Config
}
type processOpStart struct {
nodeid string
processid string
}
type processOpAdd struct {
nodeid string
config *app.Config
}
type processOpSkip struct {
processid string
}
// normalizeProcessesAndResources normalizes the CPU and memory consumption of the processes and resources in-place.
func normalizeProcessesAndResources(processes []ProcessConfig, resources map[string]NodeResources) {
maxNCPU := .0
maxMemTotal := .0
for _, r := range resources {
if r.NCPU > maxNCPU {
maxNCPU = r.NCPU
}
if r.MemTotal > maxMemTotal {
maxMemTotal = r.MemTotal
}
}
for id, r := range resources {
factor := maxNCPU / r.NCPU
r.CPU = 100 - (100-r.CPU)/factor
factor = maxMemTotal / r.MemTotal
r.Mem = 100 - (100-r.Mem)/factor
resources[id] = r
}
for i, p := range processes {
r, ok := resources[p.NodeID]
if !ok {
p.CPU = 100
p.Mem = 100
}
factor := maxNCPU / r.NCPU
p.CPU = 100 - (100-p.CPU)/factor
factor = maxMemTotal / r.MemTotal
p.Mem = 100 - (100-p.Mem)/factor
processes[i] = p
}
for id, r := range resources {
r.NCPU = maxNCPU
r.MemTotal = maxMemTotal
resources[id] = r
}
}
// synchronize returns a list of operations in order to adjust the "have" list to the "want" list
// with taking the available resources on each node into account.
func synchronize(want []app.Config, have []ProcessConfig, resources map[string]NodeResources) []interface{} {
opStack := []interface{}{}
normalizeProcessesAndResources(have, resources)
// A map from the process ID to the process config of the processes
// we want to be running on the nodes.
wantMap := map[string]*app.Config{}
for _, config := range want {
wantMap[config.ID] = &config
}
haveAfterRemove := []ProcessConfig{}
// Now we iterate through the processes we actually have running on the nodes
// and remove them from the wantMap. We also make sure that they are running.
// If a process is not on the wantMap, it will be deleted from the nodes.
for _, p := range have {
if _, ok := wantMap[p.Config.ID]; !ok {
opStack = append(opStack, processOpDelete{
nodeid: p.NodeID,
processid: p.Config.ID,
})
r, ok := resources[p.NodeID]
if ok {
r.CPU -= p.CPU
r.Mem -= p.Mem
resources[p.NodeID] = r
}
continue
}
delete(wantMap, p.Config.ID)
if p.Order != "start" {
opStack = append(opStack, processOpStart{
nodeid: p.NodeID,
processid: p.Config.ID,
})
}
haveAfterRemove = append(haveAfterRemove, p)
}
have = haveAfterRemove
// A map from the process reference to the node it is running on
haveReferenceAffinityMap := map[string]string{}
for _, p := range have {
haveReferenceAffinityMap[p.Config.Reference] = p.NodeID
}
// Now all remaining processes in the wantMap must be added to one of the nodes
for _, config := range wantMap {
// Check if there are already processes with the same reference, and if so
// chose this node. Then check the node if it has enough resources left. If
// not, then select a node with the most available resources.
reference := config.Reference
nodeid := haveReferenceAffinityMap[reference]
if len(nodeid) != 0 {
cpu := config.LimitCPU / resources[nodeid].NCPU
mem := float64(config.LimitMemory) / float64(resources[nodeid].MemTotal)
if resources[nodeid].CPU+cpu < 90 && resources[nodeid].Mem+mem < 90 {
opStack = append(opStack, processOpAdd{
nodeid: nodeid,
config: config,
})
continue
}
}
// Find the node with the most resources available
nodeid = ""
for id, r := range resources {
cpu := config.LimitCPU / r.NCPU
mem := float64(config.LimitMemory) / float64(r.MemTotal)
if len(nodeid) == 0 {
if r.CPU+cpu < 90 && r.Mem+mem < 90 {
nodeid = id
}
continue
}
if r.CPU+r.Mem < resources[nodeid].CPU+resources[nodeid].Mem {
nodeid = id
}
/*
if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem {
nodeid = id
} else if r.Mem < resources[nodeid].Mem {
nodeid = id
} else if r.CPU < resources[nodeid].CPU {
nodeid = id
}
*/
}
if len(nodeid) != 0 {
opStack = append(opStack, processOpAdd{
nodeid: nodeid,
config: config,
})
} else {
opStack = append(opStack, processOpSkip{
processid: config.ID,
})
}
}
return opStack
}
// rebalance returns a list of operations that will lead to nodes that are not overloaded
func (c *cluster) rebalance(have []ProcessConfig, resources map[string]NodeResources) []interface{} {
processNodeMap := map[string][]ProcessConfig{}
for _, p := range have {
processNodeMap[p.NodeID] = append(processNodeMap[p.NodeID], p)
}
opStack := []interface{}{}
// Check if any of the nodes is overloaded
for id, r := range resources {
if r.CPU >= 90 || r.Mem >= 90 {
// Node is overloaded
// Find another node with more resources available
nodeid := id
for id, r := range resources {
if id == nodeid {
continue
}
if r.CPU < resources[nodeid].CPU && r.Mem < resources[nodeid].Mem {
nodeid = id
}
}
if nodeid != id {
// there's an other node with more resources available
diff_cpu := r.CPU - resources[nodeid].CPU
diff_mem := r.Mem - resources[nodeid].Mem
// all processes that could be moved, should be listed in
// an arry. this array should be sorted by the process' runtime
// in order to chose the one with the least runtime. repeat.
for _, p := range processNodeMap[id] {
if p.CPU < diff_cpu && p.Mem < diff_mem {
opStack = append(opStack, processOpMove{
fromNodeid: id,
toNodeid: nodeid,
config: p.Config,
})
break
}
}
}
}
}
return opStack
} }

291
cluster/leader_test.go Normal file
View File

@@ -0,0 +1,291 @@
package cluster
import (
"testing"
"github.com/datarhei/core/v16/restream/app"
"github.com/stretchr/testify/require"
)
func TestNormalize(t *testing.T) {
have := []ProcessConfig{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar",
},
},
}
resources := map[string]NodeResources{
"node1": {
NCPU: 2,
CPU: 7,
Mem: 35,
MemTotal: 2 * 1024 * 1024 * 1024, // 2GB
},
"node2": {
NCPU: 1,
CPU: 75,
Mem: 11,
MemTotal: 4 * 1024 * 1024 * 1024, // 4GB
},
}
normalizeProcessesAndResources(have, resources)
require.Equal(t, []ProcessConfig{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 56,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar",
},
},
}, have)
require.Equal(t, map[string]NodeResources{
"node1": {
NCPU: 2,
CPU: 7,
Mem: 67.5,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 2,
CPU: 87.5,
Mem: 11,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}, resources)
// test idempotency
normalizeProcessesAndResources(have, resources)
require.Equal(t, []ProcessConfig{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 56,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar",
},
},
}, have)
require.Equal(t, map[string]NodeResources{
"node1": {
NCPU: 2,
CPU: 7,
Mem: 67.5,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 2,
CPU: 87.5,
Mem: 11,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}, resources)
}
func TestSynchronizeAdd(t *testing.T) {
want := []app.Config{
{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
}
have := []ProcessConfig{}
resources := map[string]NodeResources{
"node1": {
NCPU: 1,
CPU: 7,
Mem: 67.5,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 1,
CPU: 87.5,
Mem: 11,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}
stack := synchronize(want, have, resources)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node1",
config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
},
}, stack)
}
func TestSynchronizeAddLimit(t *testing.T) {
want := []app.Config{
{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
}
have := []ProcessConfig{}
resources := map[string]NodeResources{
"node1": {
NCPU: 1,
CPU: 81,
Mem: 72,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 1,
CPU: 79,
Mem: 72,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}
stack := synchronize(want, have, resources)
require.Equal(t, []interface{}{
processOpAdd{
nodeid: "node2",
config: &app.Config{
ID: "foobar",
LimitCPU: 10,
LimitMemory: 50 * 1024 * 1024,
},
},
}, stack)
}
func TestSynchronizeRemove(t *testing.T) {
want := []app.Config{}
have := []ProcessConfig{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar",
},
},
}
resources := map[string]NodeResources{
"node1": {
NCPU: 1,
CPU: 7,
Mem: 67.5,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 1,
CPU: 87.5,
Mem: 11,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}
stack := synchronize(want, have, resources)
require.Equal(t, map[string]NodeResources{
"node1": {
NCPU: 1,
CPU: 7,
Mem: 67.5,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 1,
CPU: 75.5,
Mem: 6,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}, resources)
require.Equal(t, []interface{}{
processOpDelete{
nodeid: "node2",
processid: "foobar",
},
}, stack)
}
func TestSynchronizeAddRemove(t *testing.T) {
want := []app.Config{
{
ID: "foobar1",
},
}
have := []ProcessConfig{
{
NodeID: "node2",
Order: "start",
State: "running",
CPU: 12,
Mem: 5,
Runtime: 42,
Config: &app.Config{
ID: "foobar2",
},
},
}
resources := map[string]NodeResources{
"node1": {
NCPU: 1,
CPU: 7,
Mem: 67.5,
MemTotal: 4 * 1024 * 1024 * 1024,
},
"node2": {
NCPU: 1,
CPU: 87.5,
Mem: 11,
MemTotal: 4 * 1024 * 1024 * 1024,
},
}
stack := synchronize(want, have, resources)
require.Equal(t, []interface{}{
processOpDelete{
nodeid: "node2",
processid: "foobar2",
},
processOpAdd{
nodeid: "node1",
config: &app.Config{
ID: "foobar1",
},
},
}, stack)
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/datarhei/core/v16/client" "github.com/datarhei/core/v16/client"
httpapi "github.com/datarhei/core/v16/http/api" httpapi "github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/restream/app"
) )
type Node interface { type Node interface {
@@ -26,8 +27,8 @@ type Node interface {
GetURL(path string) (string, error) GetURL(path string) (string, error)
GetFile(path string) (io.ReadCloser, error) GetFile(path string) (io.ReadCloser, error)
ProcessGetAll() ([]string, error) ProcessList() ([]ProcessConfig, error)
ProcessSet() error ProcessAdd(*app.Config) error
ProcessStart(id string) error ProcessStart(id string) error
ProcessStop(id string) error ProcessStop(id string) error
ProcessDelete(id string) error ProcessDelete(id string) error
@@ -49,13 +50,19 @@ type NodeFiles struct {
LastUpdate time.Time LastUpdate time.Time
} }
type NodeResources struct {
NCPU float64
CPU float64
Mem float64
MemTotal float64
}
type NodeState struct { type NodeState struct {
ID string ID string
State string State string
LastContact time.Time LastContact time.Time
Latency time.Duration Latency time.Duration
CPU float64 Resources NodeResources
Mem float64
} }
type nodeState string type nodeState string
@@ -80,8 +87,10 @@ type node struct {
lastContact time.Time lastContact time.Time
resources struct { resources struct {
cpu float64 ncpu float64
mem float64 cpu float64
mem float64
memTotal float64
} }
state nodeState state nodeState
@@ -233,6 +242,9 @@ func (n *node) Connect() error {
// Metrics // Metrics
metrics, err := n.peer.Metrics(httpapi.MetricsQuery{ metrics, err := n.peer.Metrics(httpapi.MetricsQuery{
Metrics: []httpapi.MetricsQueryMetric{ Metrics: []httpapi.MetricsQueryMetric{
{
Name: "cpu_ncpu",
},
{ {
Name: "cpu_idle", Name: "cpu_idle",
}, },
@@ -247,10 +259,13 @@ func (n *node) Connect() error {
if err != nil { if err != nil {
n.stateLock.Lock() n.stateLock.Lock()
n.resources.cpu = 100 n.resources.cpu = 100
n.resources.ncpu = 1
n.resources.mem = 100 n.resources.mem = 100
n.resources.memTotal = 1
n.stateLock.Unlock() n.stateLock.Unlock()
} }
cpu_ncpu := .0
cpu_idle := .0 cpu_idle := .0
mem_total := .0 mem_total := .0
mem_free := .0 mem_free := .0
@@ -258,6 +273,8 @@ func (n *node) Connect() error {
for _, x := range metrics.Metrics { for _, x := range metrics.Metrics {
if x.Name == "cpu_idle" { if x.Name == "cpu_idle" {
cpu_idle = x.Values[0].Value cpu_idle = x.Values[0].Value
} else if x.Name == "cpu_ncpu" {
cpu_ncpu = x.Values[0].Value
} else if x.Name == "mem_total" { } else if x.Name == "mem_total" {
mem_total = x.Values[0].Value mem_total = x.Values[0].Value
} else if x.Name == "mem_free" { } else if x.Name == "mem_free" {
@@ -266,11 +283,14 @@ func (n *node) Connect() error {
} }
n.stateLock.Lock() n.stateLock.Lock()
n.resources.ncpu = cpu_ncpu
n.resources.cpu = 100 - cpu_idle n.resources.cpu = 100 - cpu_idle
if mem_total != 0 { if mem_total != 0 {
n.resources.mem = (mem_total - mem_free) / mem_total * 100 n.resources.mem = (mem_total - mem_free) / mem_total * 100
n.resources.memTotal = mem_total
} else { } else {
n.resources.mem = 100 n.resources.mem = 100
n.resources.memTotal = 1
} }
n.lastContact = time.Now() n.lastContact = time.Now()
n.stateLock.Unlock() n.stateLock.Unlock()
@@ -382,8 +402,12 @@ func (n *node) State() NodeState {
LastContact: n.lastContact, LastContact: n.lastContact,
State: n.state.String(), State: n.state.String(),
Latency: time.Duration(n.latency * float64(time.Second)), Latency: time.Duration(n.latency * float64(time.Second)),
CPU: n.resources.cpu, Resources: NodeResources{
Mem: n.resources.mem, NCPU: n.resources.ncpu,
CPU: n.resources.cpu,
Mem: n.resources.mem,
MemTotal: n.resources.memTotal,
},
} }
return state return state
@@ -551,23 +575,41 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) {
return nil, fmt.Errorf("unknown prefix") return nil, fmt.Errorf("unknown prefix")
} }
func (n *node) ProcessGetAll() ([]string, error) { func (n *node) ProcessList() ([]ProcessConfig, error) {
processes, err := n.peer.ProcessList(nil, nil) list, err := n.peer.ProcessList(nil, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
list := []string{} processes := []ProcessConfig{}
for _, p := range processes { for _, p := range list {
list = append(list, p.ID) process := ProcessConfig{
NodeID: n.ID(),
Order: p.State.Order,
State: p.State.State,
Mem: float64(p.State.Memory) / float64(n.resources.memTotal),
Runtime: time.Duration(p.State.Runtime) * time.Second,
Config: p.Config.Marshal(),
}
if x, err := p.State.CPU.Float64(); err == nil {
process.CPU = x / n.resources.ncpu
} else {
process.CPU = 100
}
processes = append(processes, process)
} }
return list, nil return processes, nil
} }
func (n *node) ProcessSet() error { func (n *node) ProcessAdd(config *app.Config) error {
return nil cfg := httpapi.ProcessConfig{}
cfg.Unmarshal(config)
return n.peer.ProcessAdd(cfg)
} }
func (n *node) ProcessStart(id string) error { func (n *node) ProcessStart(id string) error {

View File

@@ -9,6 +9,7 @@ import (
"github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app"
) )
type Proxy interface { type Proxy interface {
@@ -19,8 +20,63 @@ type Proxy interface {
RemoveNode(id string) error RemoveNode(id string) error
ProxyReader ProxyReader
Reader() ProxyReader Reader() ProxyReader
ProxyProcessor
Processor() ProxyProcessor
}
type ProxyProcessor interface {
Resources() map[string]NodeResources
ProcessList() []ProcessConfig
ProcessAdd(nodeid string, config *app.Config) error
ProcessDelete(nodeid string, id string) error
ProcessStart(nodeid string, id string) error
}
type proxyProcessor struct {
proxy *proxy
}
func (p *proxyProcessor) Resources() map[string]NodeResources {
if p.proxy == nil {
return nil
}
return p.proxy.Resources()
}
func (p *proxyProcessor) ProcessList() []ProcessConfig {
if p.proxy == nil {
return nil
}
return p.proxy.ProcessList()
}
func (p *proxyProcessor) ProcessAdd(nodeid string, config *app.Config) error {
if p.proxy == nil {
return fmt.Errorf("no proxy provided")
}
return p.proxy.ProcessAdd(nodeid, config)
}
func (p *proxyProcessor) ProcessDelete(nodeid string, id string) error {
if p.proxy == nil {
return fmt.Errorf("no proxy provided")
}
return p.proxy.ProcessDelete(nodeid, id)
}
func (p *proxyProcessor) ProcessStart(nodeid string, id string) error {
if p.proxy == nil {
return fmt.Errorf("no proxy provided")
}
return p.proxy.ProcessStart(nodeid, id)
} }
type ProxyReader interface { type ProxyReader interface {
@@ -205,6 +261,25 @@ func (p *proxy) Reader() ProxyReader {
} }
} }
func (p *proxy) Processor() ProxyProcessor {
return &proxyProcessor{
proxy: p,
}
}
func (p *proxy) Resources() map[string]NodeResources {
resources := map[string]NodeResources{}
p.lock.RLock()
defer p.lock.RUnlock()
for _, node := range p.nodes {
resources[node.ID()] = node.State().Resources
}
return resources
}
func (p *proxy) AddNode(id string, node Node) (string, error) { func (p *proxy) AddNode(id string, node Node) (string, error) {
if id != node.ID() { if id != node.ID() {
return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID()) return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID())
@@ -370,3 +445,114 @@ func (p *proxy) GetFile(path string) (io.ReadCloser, error) {
return data, nil return data, nil
} }
type ProcessConfig struct {
NodeID string
Order string
State string
CPU float64
Mem float64
Runtime time.Duration
Config *app.Config
}
func (p *proxy) ProcessList() []ProcessConfig {
processChan := make(chan ProcessConfig, 64)
processList := []ProcessConfig{}
wgList := sync.WaitGroup{}
wgList.Add(1)
go func() {
defer wgList.Done()
for file := range processChan {
processList = append(processList, file)
}
}()
wg := sync.WaitGroup{}
p.lock.RLock()
for _, node := range p.nodes {
wg.Add(1)
go func(node Node, p chan<- ProcessConfig) {
defer wg.Done()
processes, err := node.ProcessList()
if err != nil {
return
}
for _, process := range processes {
p <- process
}
}(node, processChan)
}
p.lock.RUnlock()
wg.Wait()
close(processChan)
wgList.Wait()
return processList
}
func (p *proxy) ProcessAdd(nodeid string, config *app.Config) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
err := node.ProcessAdd(config)
if err != nil {
return err
}
err = node.ProcessStart(config.ID)
if err != nil {
return err
}
return nil
}
func (p *proxy) ProcessDelete(nodeid string, id string) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
err := node.ProcessDelete(id)
if err != nil {
return err
}
return nil
}
func (p *proxy) ProcessStart(nodeid string, id string) error {
p.lock.RLock()
defer p.lock.RUnlock()
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
err := node.ProcessStart(id)
if err != nil {
return err
}
return nil
}

View File

@@ -14,7 +14,7 @@ import (
type Store interface { type Store interface {
raft.FSM raft.FSM
ListProcesses() []app.Config ProcessList() []app.Config
GetProcess(id string) (app.Config, error) GetProcess(id string) (app.Config, error)
} }
@@ -126,7 +126,7 @@ func (s *store) Restore(snapshot io.ReadCloser) error {
return nil return nil
} }
func (s *store) ListProcesses() []app.Config { func (s *store) ProcessList() []app.Config {
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()

View File

@@ -49,8 +49,8 @@ func (h *ClusterHandler) GetProxyNodes(c echo.Context) error {
LastContact: state.LastContact.Unix(), LastContact: state.LastContact.Unix(),
Latency: state.Latency.Seconds() * 1000, Latency: state.Latency.Seconds() * 1000,
State: state.State, State: state.State,
CPU: state.CPU, CPU: state.Resources.CPU,
Mem: state.Mem, Mem: state.Resources.Mem,
} }
list = append(list, n) list = append(list, n)
@@ -86,8 +86,8 @@ func (h *ClusterHandler) GetProxyNode(c echo.Context) error {
LastContact: state.LastContact.Unix(), LastContact: state.LastContact.Unix(),
Latency: state.Latency.Seconds() * 1000, Latency: state.Latency.Seconds() * 1000,
State: state.State, State: state.State,
CPU: state.CPU, CPU: state.Resources.CPU,
Mem: state.Mem, Mem: state.Resources.Mem,
} }
return c.JSON(http.StatusOK, node) return c.JSON(http.StatusOK, node)