diff --git a/app/api/api.go b/app/api/api.go index 31fc46a0..fc7362d5 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -545,6 +545,7 @@ func (a *api) start(ctx context.Context) error { SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second, NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second, EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second, + RecoverTimeout: time.Duration(cfg.Cluster.RecoverTimeout) * time.Second, CoreConfig: cfg.Clone(), CoreSkills: a.ffmpeg.Skills(), IPLimiter: a.sessionsLimiter, diff --git a/cluster/cluster.go b/cluster/cluster.go index 60e41235..ccb9e65c 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -55,6 +55,7 @@ type Cluster interface { Leave(origin, id string) error // gracefully remove a node from the cluster TransferLeadership(origin, id string) error // transfer leadership to another node Snapshot(origin string) (io.ReadCloser, error) + IsRaftLeader() bool HasRaftLeader() bool ProcessAdd(origin string, config *app.Config) error @@ -108,6 +109,7 @@ type Config struct { SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader + RecoverTimeout time.Duration // Timeout for recovering the cluster if there's no raft leader CoreConfig *config.Config CoreSkills skills.Skills @@ -127,7 +129,7 @@ type cluster struct { logger log.Logger - raft raft.Raft + raft raft.RaftRecoverer raftRemoveGracePeriod time.Duration raftAddress string raftNotifyCh chan bool @@ -136,7 +138,8 @@ type cluster struct { store store.Store - cancelLeaderShip context.CancelFunc + cancelLeaderShip context.CancelFunc + cancelFollowerShip context.CancelFunc shutdown bool shutdownCh chan struct{} @@ -146,6 +149,7 @@ type cluster struct { syncInterval time.Duration nodeRecoverTimeout time.Duration emergencyLeaderTimeout time.Duration + recoverTimeout time.Duration forwarder *forwarder.Forwarder api API @@ -158,10 +162,12 @@ type cluster struct { hostnames []string stateLock sync.RWMutex - isRaftLeader bool - hasRaftLeader bool - isLeader bool - leaderLock sync.Mutex + isRaftLeader bool + hasRaftLeader bool + isLeader bool + isEmergencyLeader bool + lastLeaderChange time.Time + leaderLock sync.Mutex isTLSRequired bool clusterKVS ClusterKVS @@ -196,6 +202,7 @@ func New(config Config) (Cluster, error) { syncInterval: config.SyncInterval, nodeRecoverTimeout: config.NodeRecoverTimeout, emergencyLeaderTimeout: config.EmergencyLeaderTimeout, + recoverTimeout: config.RecoverTimeout, config: config.CoreConfig, skills: config.CoreSkills, @@ -324,7 +331,7 @@ func New(config Config) (Cluster, error) { c.raftLeaderObservationCh = make(chan string, 16) c.raftEmergencyNotifyCh = make(chan bool, 16) - raft, err := raft.New(raft.Config{ + raft, err := raft.NewRecoverer(raft.Config{ ID: config.NodeID, Path: config.Path, Address: config.Address, @@ -357,11 +364,25 @@ func New(config Config) (Cluster, error) { ticker := time.NewTicker(time.Second) defer ticker.Stop() + timer := time.NewTimer(c.nodeRecoverTimeout) + defer timer.Stop() + for { select { case <-c.shutdownCh: return + case <-timer.C: + c.logger.Warn().WithFields(log.Fields{ + "peer": peerAddress, + "timeout": c.nodeRecoverTimeout, + }).Log("Giving up joining cluster via peer") + return case <-ticker.C: + if c.HasRaftLeader() { + c.logger.Warn().WithField("peer", peerAddress).Log("Stop joining cluster via peer, already joined") + return + } + err := c.Join("", c.nodeID, c.raftAddress, peerAddress) if err != nil { c.logger.Warn().WithError(err).Log("Join cluster") @@ -750,11 +771,11 @@ func (c *cluster) Leave(origin, id string) error { return err } - numPeers := len(servers) + numServers := len(servers) if id == c.nodeID { // We're going to remove ourselves - if numPeers <= 1 { + if numServers <= 1 { // Don't do so if we're the only server in the cluster c.logger.Debug().Log("We're the leader without any peers, not doing anything") return nil @@ -1015,6 +1036,7 @@ func (c *cluster) trackLeaderChanges() { } c.forwarder.SetLeader(leaderAddress) c.leaderLock.Lock() + c.lastLeaderChange = time.Now() if len(leaderAddress) == 0 { c.hasRaftLeader = false } else { diff --git a/cluster/follower.go b/cluster/follower.go index afeeb664..2698b746 100644 --- a/cluster/follower.go +++ b/cluster/follower.go @@ -1,7 +1,22 @@ package cluster +import ( + "context" + "time" + + "github.com/datarhei/core/v16/cluster/raft" +) + // followerLoop is run by every follower node in the cluster. func (c *cluster) followerLoop(stopCh chan struct{}) { + establishedFollower := false + + if !establishedFollower { + c.establishFollowership(context.TODO()) + establishedFollower = true + defer c.revokeFollowership() + } + for { select { case <-stopCh: @@ -11,3 +26,83 @@ func (c *cluster) followerLoop(stopCh chan struct{}) { } } } + +func (c *cluster) establishFollowership(ctx context.Context) { + c.logger.Info().Log("Establishing followership") + + ctx, cancel := context.WithCancel(ctx) + c.cancelFollowerShip = cancel + + if c.recoverTimeout > 0 { + go c.recoverCluster(ctx, c.syncInterval) + } +} + +func (c *cluster) revokeFollowership() { + c.logger.Info().Log("Revoking followership") + + if c.cancelFollowerShip != nil { + c.cancelFollowerShip() + c.cancelFollowerShip = nil + } +} + +func (c *cluster) recoverCluster(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.leaderLock.Lock() + hasLeader := c.hasRaftLeader + lastLeaderChange := c.lastLeaderChange + c.leaderLock.Unlock() + + uptime := c.raft.Stats().Uptime + if uptime < 2*c.recoverTimeout { + continue + } + + if !hasLeader && c.recoverTimeout > 0 && time.Since(lastLeaderChange) > c.recoverTimeout { + peers := []raft.Peer{} + + // find living peers and recover + servers, err := c.raft.Servers() + if err != nil { + break + } + + nodes := c.manager.NodeList() + for _, node := range nodes { + if _, err := node.Status(); err != nil { + continue + } + + id := node.About().ID + + for _, server := range servers { + if server.ID == id && server.ID != c.nodeID { + peers = append(peers, raft.Peer{ + ID: id, + Address: server.Address, + }) + } + } + } + + c.logger.Warn().WithField("peers", peers).Log("Recovering raft") + + // recover raft with new set of peers + err = c.raft.Recover(peers, 2*interval) + if err != nil { + c.logger.Error().WithError(err).Log("Recovering raft failed, shutting down") + c.Shutdown() + return + } + } + } + } +} diff --git a/cluster/leader.go b/cluster/leader.go index 1f4ed594..14fb4be3 100644 --- a/cluster/leader.go +++ b/cluster/leader.go @@ -111,6 +111,7 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Lock() c.isRaftLeader = false c.isLeader = false + c.isEmergencyLeader = false c.leaderLock.Unlock() } else if notification == NOTIFY_LEADER { if weAreLeaderCh != nil { @@ -144,6 +145,7 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Lock() c.isRaftLeader = true c.isLeader = true + c.isEmergencyLeader = false c.leaderLock.Unlock() } else if notification == NOTIFY_EMERGENCY { if weAreEmergencyLeaderCh != nil { @@ -177,6 +179,7 @@ func (c *cluster) monitorLeadership() { c.leaderLock.Lock() c.isRaftLeader = false c.isLeader = true + c.isEmergencyLeader = true c.leaderLock.Unlock() } case <-c.shutdownCh: @@ -318,11 +321,38 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error if !emergency { go c.clearLocks(ctx, time.Minute) + go c.clearDeadNodes(ctx, c.syncInterval, c.nodeRecoverTimeout) } return nil } +func (c *cluster) clearDeadNodes(ctx context.Context, interval, nodeRecoverTimeout time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + nodes := c.manager.NodeList() + for _, node := range nodes { + about := node.About() + if time.Since(about.SpawnedAt) > nodeRecoverTimeout && time.Since(about.LastContact) > nodeRecoverTimeout { + c.logger.Warn().WithFields(log.Fields{ + "id": about.ID, + "after": nodeRecoverTimeout, + "lastContact": about.LastContact, + "spawnedAt": about.SpawnedAt, + }).Log("Removing peer from cluster") + c.raft.RemoveServer(about.ID) + } + } + } + } +} + func (c *cluster) revokeLeadership() { c.logger.Info().Log("Revoking leadership") diff --git a/cluster/node/node.go b/cluster/node/node.go index c1daf191..a455d6d1 100644 --- a/cluster/node/node.go +++ b/cluster/node/node.go @@ -16,10 +16,11 @@ import ( ) type Node struct { - id string - address string - ips []string - version string + id string + address string + ips []string + version string + spawnedAt time.Time node client.APIClient nodeAbout About @@ -57,9 +58,10 @@ func New(config Config) *Node { tr.IdleConnTimeout = 30 * time.Second n := &Node{ - id: config.ID, - address: config.Address, - version: "0.0.0", + id: config.ID, + address: config.Address, + version: "0.0.0", + spawnedAt: time.Now(), node: client.APIClient{ Address: config.Address, Client: &http.Client{ @@ -136,6 +138,7 @@ type About struct { Error error Core CoreAbout Resources Resources + SpawnedAt time.Time } type ResourcesGPU struct { @@ -167,9 +170,10 @@ func (n *Node) About() About { defer n.lock.RUnlock() a := About{ - ID: n.id, - Version: n.version, - Address: n.address, + ID: n.id, + Version: n.version, + Address: n.address, + SpawnedAt: n.spawnedAt, } a.Name = n.coreAbout.Name diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 347a7012..ac31729e 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -85,6 +85,7 @@ type Server struct { } type Stats struct { + Uptime time.Duration Address string State string LastContact time.Duration @@ -210,6 +211,7 @@ func (r *raft) Servers() ([]Server, error) { func (r *raft) Stats() Stats { stats := Stats{ + Uptime: time.Since(r.raftStart), Address: r.raftAddress, } diff --git a/cluster/raft/recovery.go b/cluster/raft/recovery.go new file mode 100644 index 00000000..8754184a --- /dev/null +++ b/cluster/raft/recovery.go @@ -0,0 +1,136 @@ +package raft + +import ( + "io" + "sync" + "time" +) + +type RaftRecoverer interface { + Raft + + Recover([]Peer, time.Duration) error +} + +type raftRecoverer struct { + raft Raft + config Config + + lock sync.RWMutex +} + +func NewRecoverer(config Config) (RaftRecoverer, error) { + r := &raftRecoverer{ + config: config, + } + + raft, err := New(config) + if err != nil { + return nil, err + } + + r.raft = raft + + return r, nil +} + +func (r *raftRecoverer) Recover(peers []Peer, cooldown time.Duration) error { + r.lock.Lock() + defer r.lock.Unlock() + + r.raft.Shutdown() + + if r.config.Logger != nil { + r.config.Logger.Warn().WithField("duration", cooldown).Log("Cooling down") + } + + time.Sleep(cooldown) + + r.config.Peers = peers + + raft, err := New(r.config) + if err != nil { + return err + } + + r.raft = raft + + return nil +} + +func (r *raftRecoverer) Shutdown() { + r.lock.RLock() + defer r.lock.RUnlock() + + r.raft.Shutdown() +} + +func (r *raftRecoverer) IsLeader() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.IsLeader() +} + +func (r *raftRecoverer) Leader() (string, string) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Leader() +} + +func (r *raftRecoverer) Servers() ([]Server, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Servers() +} + +func (r *raftRecoverer) Stats() Stats { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Stats() +} + +func (r *raftRecoverer) Apply(data []byte) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Apply(data) +} + +func (r *raftRecoverer) Barrier(timeout time.Duration) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Barrier(timeout) +} + +func (r *raftRecoverer) AddServer(id, address string) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.AddServer(id, address) +} + +func (r *raftRecoverer) RemoveServer(id string) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.RemoveServer(id) +} + +func (r *raftRecoverer) LeadershipTransfer(id string) error { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.LeadershipTransfer(id) +} + +func (r *raftRecoverer) Snapshot() (io.ReadCloser, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.raft.Snapshot() +} diff --git a/config/config.go b/config/config.go index 4c0cfd61..ac559052 100644 --- a/config/config.go +++ b/config/config.go @@ -342,6 +342,7 @@ func (d *Config) init() { d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL_SEC", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC", nil, "Timeout for a node to recover before rebalancing the processes", true, false) d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT_SEC", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false) + d.vars.Register(value.NewInt64(&d.Cluster.RecoverTimeout, 0), "cluster.recover_timeout_sec", "CORE_CLUSTER_RECOVER_TIMEOUT_SEC", nil, "Timeout for recovering the cluster if no raft leader can be elected", false, false) d.vars.Register(value.NewBool(&d.Cluster.Debug.DisableFFmpegCheck, false), "cluster.debug.disable_ffmpeg_check", "CORE_CLUSTER_DEBUG_DISABLE_FFMPEG_CHECK", nil, "Disable checking for identical FFmpeg versions on all nodes", false, false) } diff --git a/config/data.go b/config/data.go index be8b3ebb..7596575f 100644 --- a/config/data.go +++ b/config/data.go @@ -199,6 +199,7 @@ type Data struct { SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds + RecoverTimeout int64 `json:"revocer_timeout_sec" format:"int64"` // seconds Debug struct { DisableFFmpegCheck bool `json:"disable_ffmpeg_check"` } `json:"debug"`