Add SyncInterval and NodeRecoverTimeout to cluster config

This commit is contained in:
Ingo Oppermann
2023-06-07 10:18:23 +02:00
parent ae84fd1d21
commit bd75a5ad0f
5 changed files with 35 additions and 23 deletions

View File

@@ -471,18 +471,20 @@ func (a *api) start() error {
} }
cluster, err := cluster.New(cluster.ClusterConfig{ cluster, err := cluster.New(cluster.ClusterConfig{
ID: cfg.ID, ID: cfg.ID,
Name: cfg.Name, Name: cfg.Name,
Path: filepath.Join(cfg.DB.Dir, "cluster"), Path: filepath.Join(cfg.DB.Dir, "cluster"),
Bootstrap: cfg.Cluster.Bootstrap, Bootstrap: cfg.Cluster.Bootstrap,
Recover: cfg.Cluster.Recover, Recover: cfg.Cluster.Recover,
Address: cfg.Cluster.Address, Address: cfg.Cluster.Address,
Peers: peers, Peers: peers,
CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second,
CoreAPIUsername: cfg.API.Auth.Username, NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second,
CoreAPIPassword: cfg.API.Auth.Password, CoreAPIAddress: scheme + gonet.JoinHostPort(host, port),
IPLimiter: a.sessionsLimiter, CoreAPIUsername: cfg.API.Auth.Username,
Logger: a.log.logger.core.WithComponent("Cluster"), CoreAPIPassword: cfg.API.Auth.Password,
IPLimiter: a.sessionsLimiter,
Logger: a.log.logger.core.WithComponent("Cluster"),
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to create cluster: %w", err) return fmt.Errorf("unable to create cluster: %w", err)

View File

@@ -113,6 +113,9 @@ type cluster struct {
shutdownCh chan struct{} shutdownCh chan struct{}
shutdownLock sync.Mutex shutdownLock sync.Mutex
syncInterval time.Duration
nodeRecoverTimeout time.Duration
forwarder forwarder.Forwarder forwarder forwarder.Forwarder
api API api API
proxy proxy.Proxy proxy proxy.Proxy
@@ -140,6 +143,9 @@ func New(config ClusterConfig) (Cluster, error) {
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
syncInterval: config.SyncInterval,
nodeRecoverTimeout: config.NodeRecoverTimeout,
nodes: map[string]proxy.Node{}, nodes: map[string]proxy.Node{},
} }

View File

@@ -294,7 +294,7 @@ func (c *cluster) establishLeadership(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
c.cancelLeaderShip = cancel c.cancelLeaderShip = cancel
go c.startRebalance(ctx) go c.startRebalance(ctx, c.syncInterval)
return nil return nil
} }
@@ -305,8 +305,8 @@ func (c *cluster) revokeLeadership() {
c.cancelLeaderShip() c.cancelLeaderShip()
} }
func (c *cluster) startRebalance(ctx context.Context) { func (c *cluster) startRebalance(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
for { for {

View File

@@ -287,8 +287,10 @@ func (d *Config) init() {
d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false)
d.vars.Register(value.NewBool(&d.Cluster.Recover, false), "cluster.recover", "CORE_CLUSTER_RECOVER", nil, "Recover a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Recover, false), "cluster.recover", "CORE_CLUSTER_RECOVER", nil, "Recover a cluster", false, false)
d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false)
d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", false, true) d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false)
d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft address of a cores that are part of the cluster", false, true) d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false)
d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false)
} }
// Validate validates the current state of the Config for completeness and sanity. Errors are // Validate validates the current state of the Config for completeness and sanity. Errors are

View File

@@ -173,12 +173,14 @@ type Data struct {
MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100 MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100
} `json:"resources"` } `json:"resources"`
Cluster struct { Cluster struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Bootstrap bool `json:"bootstrap"` Bootstrap bool `json:"bootstrap"`
Recover bool `json:"recover"` Recover bool `json:"recover"`
Debug bool `json:"debug"` Debug bool `json:"debug"`
Address string `json:"address"` // ip:port Address string `json:"address"` // ip:port
Peers []string `json:"peers"` Peers []string `json:"peers"`
SyncInterval int64 `json:"sync_interval" format:"int64"` // seconds
NodeRecoverTimeout int64 `json:"node_recover_timeout" format:"int64"` // seconds
} `json:"cluster"` } `json:"cluster"`
} }