Add auto-removal of nodes if unresponsive

This commit is contained in:
Ingo Oppermann
2025-01-22 14:07:46 +01:00
parent 2a787c4f4f
commit aadd734c1d
9 changed files with 311 additions and 19 deletions

View File

@@ -545,6 +545,7 @@ func (a *api) start(ctx context.Context) error {
SyncInterval: time.Duration(cfg.Cluster.SyncInterval) * time.Second,
NodeRecoverTimeout: time.Duration(cfg.Cluster.NodeRecoverTimeout) * time.Second,
EmergencyLeaderTimeout: time.Duration(cfg.Cluster.EmergencyLeaderTimeout) * time.Second,
RecoverTimeout: time.Duration(cfg.Cluster.RecoverTimeout) * time.Second,
CoreConfig: cfg.Clone(),
CoreSkills: a.ffmpeg.Skills(),
IPLimiter: a.sessionsLimiter,

View File

@@ -55,6 +55,7 @@ type Cluster interface {
Leave(origin, id string) error // gracefully remove a node from the cluster
TransferLeadership(origin, id string) error // transfer leadership to another node
Snapshot(origin string) (io.ReadCloser, error)
IsRaftLeader() bool
HasRaftLeader() bool
ProcessAdd(origin string, config *app.Config) error
@@ -108,6 +109,7 @@ type Config struct {
SyncInterval time.Duration // Interval between aligning the process in the cluster DB with the processes on the nodes
NodeRecoverTimeout time.Duration // Timeout for a node to recover before rebalancing the processes
EmergencyLeaderTimeout time.Duration // Timeout for establishing the emergency leadership after lost contact to raft leader
RecoverTimeout time.Duration // Timeout for recovering the cluster if there's no raft leader
CoreConfig *config.Config
CoreSkills skills.Skills
@@ -127,7 +129,7 @@ type cluster struct {
logger log.Logger
raft raft.Raft
raft raft.RaftRecoverer
raftRemoveGracePeriod time.Duration
raftAddress string
raftNotifyCh chan bool
@@ -136,7 +138,8 @@ type cluster struct {
store store.Store
cancelLeaderShip context.CancelFunc
cancelLeaderShip context.CancelFunc
cancelFollowerShip context.CancelFunc
shutdown bool
shutdownCh chan struct{}
@@ -146,6 +149,7 @@ type cluster struct {
syncInterval time.Duration
nodeRecoverTimeout time.Duration
emergencyLeaderTimeout time.Duration
recoverTimeout time.Duration
forwarder *forwarder.Forwarder
api API
@@ -158,10 +162,12 @@ type cluster struct {
hostnames []string
stateLock sync.RWMutex
isRaftLeader bool
hasRaftLeader bool
isLeader bool
leaderLock sync.Mutex
isRaftLeader bool
hasRaftLeader bool
isLeader bool
isEmergencyLeader bool
lastLeaderChange time.Time
leaderLock sync.Mutex
isTLSRequired bool
clusterKVS ClusterKVS
@@ -196,6 +202,7 @@ func New(config Config) (Cluster, error) {
syncInterval: config.SyncInterval,
nodeRecoverTimeout: config.NodeRecoverTimeout,
emergencyLeaderTimeout: config.EmergencyLeaderTimeout,
recoverTimeout: config.RecoverTimeout,
config: config.CoreConfig,
skills: config.CoreSkills,
@@ -324,7 +331,7 @@ func New(config Config) (Cluster, error) {
c.raftLeaderObservationCh = make(chan string, 16)
c.raftEmergencyNotifyCh = make(chan bool, 16)
raft, err := raft.New(raft.Config{
raft, err := raft.NewRecoverer(raft.Config{
ID: config.NodeID,
Path: config.Path,
Address: config.Address,
@@ -357,11 +364,25 @@ func New(config Config) (Cluster, error) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timer := time.NewTimer(c.nodeRecoverTimeout)
defer timer.Stop()
for {
select {
case <-c.shutdownCh:
return
case <-timer.C:
c.logger.Warn().WithFields(log.Fields{
"peer": peerAddress,
"timeout": c.nodeRecoverTimeout,
}).Log("Giving up joining cluster via peer")
return
case <-ticker.C:
if c.HasRaftLeader() {
c.logger.Warn().WithField("peer", peerAddress).Log("Stop joining cluster via peer, already joined")
return
}
err := c.Join("", c.nodeID, c.raftAddress, peerAddress)
if err != nil {
c.logger.Warn().WithError(err).Log("Join cluster")
@@ -750,11 +771,11 @@ func (c *cluster) Leave(origin, id string) error {
return err
}
numPeers := len(servers)
numServers := len(servers)
if id == c.nodeID {
// We're going to remove ourselves
if numPeers <= 1 {
if numServers <= 1 {
// Don't do so if we're the only server in the cluster
c.logger.Debug().Log("We're the leader without any peers, not doing anything")
return nil
@@ -1015,6 +1036,7 @@ func (c *cluster) trackLeaderChanges() {
}
c.forwarder.SetLeader(leaderAddress)
c.leaderLock.Lock()
c.lastLeaderChange = time.Now()
if len(leaderAddress) == 0 {
c.hasRaftLeader = false
} else {

View File

@@ -1,7 +1,22 @@
package cluster
import (
"context"
"time"
"github.com/datarhei/core/v16/cluster/raft"
)
// followerLoop is run by every follower node in the cluster.
func (c *cluster) followerLoop(stopCh chan struct{}) {
establishedFollower := false
if !establishedFollower {
c.establishFollowership(context.TODO())
establishedFollower = true
defer c.revokeFollowership()
}
for {
select {
case <-stopCh:
@@ -11,3 +26,83 @@ func (c *cluster) followerLoop(stopCh chan struct{}) {
}
}
}
func (c *cluster) establishFollowership(ctx context.Context) {
c.logger.Info().Log("Establishing followership")
ctx, cancel := context.WithCancel(ctx)
c.cancelFollowerShip = cancel
if c.recoverTimeout > 0 {
go c.recoverCluster(ctx, c.syncInterval)
}
}
func (c *cluster) revokeFollowership() {
c.logger.Info().Log("Revoking followership")
if c.cancelFollowerShip != nil {
c.cancelFollowerShip()
c.cancelFollowerShip = nil
}
}
func (c *cluster) recoverCluster(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.leaderLock.Lock()
hasLeader := c.hasRaftLeader
lastLeaderChange := c.lastLeaderChange
c.leaderLock.Unlock()
uptime := c.raft.Stats().Uptime
if uptime < 2*c.recoverTimeout {
continue
}
if !hasLeader && c.recoverTimeout > 0 && time.Since(lastLeaderChange) > c.recoverTimeout {
peers := []raft.Peer{}
// find living peers and recover
servers, err := c.raft.Servers()
if err != nil {
break
}
nodes := c.manager.NodeList()
for _, node := range nodes {
if _, err := node.Status(); err != nil {
continue
}
id := node.About().ID
for _, server := range servers {
if server.ID == id && server.ID != c.nodeID {
peers = append(peers, raft.Peer{
ID: id,
Address: server.Address,
})
}
}
}
c.logger.Warn().WithField("peers", peers).Log("Recovering raft")
// recover raft with new set of peers
err = c.raft.Recover(peers, 2*interval)
if err != nil {
c.logger.Error().WithError(err).Log("Recovering raft failed, shutting down")
c.Shutdown()
return
}
}
}
}
}

View File

@@ -111,6 +111,7 @@ func (c *cluster) monitorLeadership() {
c.leaderLock.Lock()
c.isRaftLeader = false
c.isLeader = false
c.isEmergencyLeader = false
c.leaderLock.Unlock()
} else if notification == NOTIFY_LEADER {
if weAreLeaderCh != nil {
@@ -144,6 +145,7 @@ func (c *cluster) monitorLeadership() {
c.leaderLock.Lock()
c.isRaftLeader = true
c.isLeader = true
c.isEmergencyLeader = false
c.leaderLock.Unlock()
} else if notification == NOTIFY_EMERGENCY {
if weAreEmergencyLeaderCh != nil {
@@ -177,6 +179,7 @@ func (c *cluster) monitorLeadership() {
c.leaderLock.Lock()
c.isRaftLeader = false
c.isLeader = true
c.isEmergencyLeader = true
c.leaderLock.Unlock()
}
case <-c.shutdownCh:
@@ -318,11 +321,38 @@ func (c *cluster) establishLeadership(ctx context.Context, emergency bool) error
if !emergency {
go c.clearLocks(ctx, time.Minute)
go c.clearDeadNodes(ctx, c.syncInterval, c.nodeRecoverTimeout)
}
return nil
}
func (c *cluster) clearDeadNodes(ctx context.Context, interval, nodeRecoverTimeout time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
nodes := c.manager.NodeList()
for _, node := range nodes {
about := node.About()
if time.Since(about.SpawnedAt) > nodeRecoverTimeout && time.Since(about.LastContact) > nodeRecoverTimeout {
c.logger.Warn().WithFields(log.Fields{
"id": about.ID,
"after": nodeRecoverTimeout,
"lastContact": about.LastContact,
"spawnedAt": about.SpawnedAt,
}).Log("Removing peer from cluster")
c.raft.RemoveServer(about.ID)
}
}
}
}
}
func (c *cluster) revokeLeadership() {
c.logger.Info().Log("Revoking leadership")

View File

@@ -16,10 +16,11 @@ import (
)
type Node struct {
id string
address string
ips []string
version string
id string
address string
ips []string
version string
spawnedAt time.Time
node client.APIClient
nodeAbout About
@@ -57,9 +58,10 @@ func New(config Config) *Node {
tr.IdleConnTimeout = 30 * time.Second
n := &Node{
id: config.ID,
address: config.Address,
version: "0.0.0",
id: config.ID,
address: config.Address,
version: "0.0.0",
spawnedAt: time.Now(),
node: client.APIClient{
Address: config.Address,
Client: &http.Client{
@@ -136,6 +138,7 @@ type About struct {
Error error
Core CoreAbout
Resources Resources
SpawnedAt time.Time
}
type ResourcesGPU struct {
@@ -167,9 +170,10 @@ func (n *Node) About() About {
defer n.lock.RUnlock()
a := About{
ID: n.id,
Version: n.version,
Address: n.address,
ID: n.id,
Version: n.version,
Address: n.address,
SpawnedAt: n.spawnedAt,
}
a.Name = n.coreAbout.Name

View File

@@ -85,6 +85,7 @@ type Server struct {
}
type Stats struct {
Uptime time.Duration
Address string
State string
LastContact time.Duration
@@ -210,6 +211,7 @@ func (r *raft) Servers() ([]Server, error) {
func (r *raft) Stats() Stats {
stats := Stats{
Uptime: time.Since(r.raftStart),
Address: r.raftAddress,
}

136
cluster/raft/recovery.go Normal file
View File

@@ -0,0 +1,136 @@
package raft
import (
"io"
"sync"
"time"
)
type RaftRecoverer interface {
Raft
Recover([]Peer, time.Duration) error
}
type raftRecoverer struct {
raft Raft
config Config
lock sync.RWMutex
}
func NewRecoverer(config Config) (RaftRecoverer, error) {
r := &raftRecoverer{
config: config,
}
raft, err := New(config)
if err != nil {
return nil, err
}
r.raft = raft
return r, nil
}
func (r *raftRecoverer) Recover(peers []Peer, cooldown time.Duration) error {
r.lock.Lock()
defer r.lock.Unlock()
r.raft.Shutdown()
if r.config.Logger != nil {
r.config.Logger.Warn().WithField("duration", cooldown).Log("Cooling down")
}
time.Sleep(cooldown)
r.config.Peers = peers
raft, err := New(r.config)
if err != nil {
return err
}
r.raft = raft
return nil
}
func (r *raftRecoverer) Shutdown() {
r.lock.RLock()
defer r.lock.RUnlock()
r.raft.Shutdown()
}
func (r *raftRecoverer) IsLeader() bool {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.IsLeader()
}
func (r *raftRecoverer) Leader() (string, string) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Leader()
}
func (r *raftRecoverer) Servers() ([]Server, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Servers()
}
func (r *raftRecoverer) Stats() Stats {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Stats()
}
func (r *raftRecoverer) Apply(data []byte) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Apply(data)
}
func (r *raftRecoverer) Barrier(timeout time.Duration) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Barrier(timeout)
}
func (r *raftRecoverer) AddServer(id, address string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.AddServer(id, address)
}
func (r *raftRecoverer) RemoveServer(id string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.RemoveServer(id)
}
func (r *raftRecoverer) LeadershipTransfer(id string) error {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.LeadershipTransfer(id)
}
func (r *raftRecoverer) Snapshot() (io.ReadCloser, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.raft.Snapshot()
}

View File

@@ -342,6 +342,7 @@ func (d *Config) init() {
d.vars.Register(value.NewInt64(&d.Cluster.SyncInterval, 5), "cluster.sync_interval_sec", "CORE_CLUSTER_SYNC_INTERVAL_SEC", nil, "Interval between aligning the process in the cluster DB with the processes on the nodes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.NodeRecoverTimeout, 120), "cluster.node_recover_timeout_sec", "CORE_CLUSTER_NODE_RECOVER_TIMEOUT_SEC", nil, "Timeout for a node to recover before rebalancing the processes", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.EmergencyLeaderTimeout, 10), "cluster.emergency_leader_timeout_sec", "CORE_CLUSTER_EMERGENCY_LEADER_TIMEOUT_SEC", nil, "Timeout for establishing the emergency leadership after lost contact to raft leader", true, false)
d.vars.Register(value.NewInt64(&d.Cluster.RecoverTimeout, 0), "cluster.recover_timeout_sec", "CORE_CLUSTER_RECOVER_TIMEOUT_SEC", nil, "Timeout for recovering the cluster if no raft leader can be elected", false, false)
d.vars.Register(value.NewBool(&d.Cluster.Debug.DisableFFmpegCheck, false), "cluster.debug.disable_ffmpeg_check", "CORE_CLUSTER_DEBUG_DISABLE_FFMPEG_CHECK", nil, "Disable checking for identical FFmpeg versions on all nodes", false, false)
}

View File

@@ -199,6 +199,7 @@ type Data struct {
SyncInterval int64 `json:"sync_interval_sec" format:"int64"` // seconds
NodeRecoverTimeout int64 `json:"node_recover_timeout_sec" format:"int64"` // seconds
EmergencyLeaderTimeout int64 `json:"emergency_leader_timeout_sec" format:"int64"` // seconds
RecoverTimeout int64 `json:"revocer_timeout_sec" format:"int64"` // seconds
Debug struct {
DisableFFmpegCheck bool `json:"disable_ffmpeg_check"`
} `json:"debug"`