diff --git a/app/api/api.go b/app/api/api.go index 7613c33f..26abea14 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -471,18 +471,20 @@ func (a *api) start() error { } cluster, err := cluster.New(cluster.ClusterConfig{ - ID: cfg.ID, - Name: cfg.Name, - Path: filepath.Join(cfg.DB.Dir, "cluster"), - Bootstrap: cfg.Cluster.Bootstrap, - Recover: cfg.Cluster.Recover, - Address: cfg.Cluster.Address, - Peers: peers, - CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), - CoreAPIUsername: cfg.API.Auth.Username, - CoreAPIPassword: cfg.API.Auth.Password, - IPLimiter: a.sessionsLimiter, - Logger: a.log.logger.core.WithComponent("Cluster"), + ID: cfg.ID, + Name: cfg.Name, + Path: filepath.Join(cfg.DB.Dir, "cluster"), + Bootstrap: cfg.Cluster.Bootstrap, + Recover: cfg.Cluster.Recover, + Address: cfg.Cluster.Address, + Peers: peers, + SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, + NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, + CoreAPIAddress: scheme + gonet.JoinHostPort(host, port), + CoreAPIUsername: cfg.API.Auth.Username, + CoreAPIPassword: cfg.API.Auth.Password, + IPLimiter: a.sessionsLimiter, + Logger: a.log.logger.core.WithComponent("Cluster"), }) if err != nil { return fmt.Errorf("unable to create cluster: %w", err) diff --git a/cluster/cluster.go b/cluster/cluster.go index b85bed84..54a47b52 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -113,6 +113,9 @@ type cluster struct { shutdownCh chan struct{} shutdownLock sync.Mutex + syncInterval time.Duration + nodeRecoverTimeout time.Duration + forwarder forwarder.Forwarder api API proxy proxy.Proxy @@ -140,6 +143,9 @@ func New(config ClusterConfig) (Cluster, error) { shutdownCh: make(chan struct{}), + syncInterval: config.SyncInterval, + nodeRecoverTimeout: config.NodeRecoverTimeout, + nodes: map[string]proxy.Node{}, } diff --git a/cluster/leader.go b/cluster/leader.go index 5f7c79fd..89a1a471 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -294,7 +294,7 @@ func (c *cluster) establishLeadership(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) c.cancelLeaderShip = cancel - go c.startRebalance(ctx) + go c.startRebalance(ctx, c.syncInterval) return nil } @@ -305,8 +305,8 @@ func (c *cluster) revokeLeadership() { c.cancelLeaderShip() } -func (c *cluster) startRebalance(ctx context.Context) { - ticker := time.NewTicker(5 * time.Second) +func (c *cluster) startRebalance(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) defer ticker.Stop() for { diff --git a/config/config.go b/config/config.go index 64da833e..94d8ec86 100644 --- a/config/config.go +++ b/config/config.go @@ -287,8 +287,10 @@ func (d *Config) init() { d.vars.Register(value.NewBool(&d.Cluster.Bootstrap, false), "cluster.bootstrap", "CORE_CLUSTER_BOOTSTRAP", nil, "Bootstrap a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Recover, false), "cluster.recover", "CORE_CLUSTER_RECOVER", nil, "Recover a cluster", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug, false), "cluster.debug", "CORE_CLUSTER_DEBUG", nil, "Switch to debug mode, not for production", false, false) - d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", false, true) - d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft address of a cores that are part of the cluster", false, true) + d.vars.Register(value.NewClusterAddress(&d.Cluster.Address, "127.0.0.1:8000"), "cluster.address", "CORE_CLUSTER_ADDRESS", nil, "Raft listen address", true, false) + d.vars.Register(value.NewClusterPeerList(&d.Cluster.Peers, []string{""}, ","), "cluster.peers", "CORE_CLUSTER_PEERS", nil, "Raft addresses of cores that are part of the cluster", false, false) + d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval", "CORE_CLUSTER_SYNC_INTERVAL", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT", nil, "Timeout for a node to recover before rebalancing the processes", true, false) } // Validate validates the current state of the Config for completeness and sanity. Errors are diff --git a/config/data.go b/config/data.go index 0dff3ff5..520b5a09 100644 --- a/config/data.go +++ b/config/data.go @@ -173,12 +173,14 @@ type Data struct { MaxMemoryUsage float64 `json:"max_memory_usage"` // percent 0-100 } `json:"resources"` Cluster struct { - Enable bool `json:"enable"` - Bootstrap bool `json:"bootstrap"` - Recover bool `json:"recover"` - Debug bool `json:"debug"` - Address string `json:"address"` // ip:port - Peers []string `json:"peers"` + Enable bool `json:"enable"` + Bootstrap bool `json:"bootstrap"` + Recover bool `json:"recover"` + Debug bool `json:"debug"` + Address string `json:"address"` // ip:port + Peers []string `json:"peers"` + SyncInterval int64 `json:"sync_interval" format:"int64"` // seconds + NodeRecoverTimeout int64 `json:"node_recover_timeout" format:"int64"` // seconds } `json:"cluster"` }