From f4a2c92fc1827fc7ad667a20121cf6178097fddb Mon Sep 17 00:00:00 2001 From: finley Date: Sat, 19 Apr 2025 22:11:58 +0800 Subject: [PATCH] Support failover in cluster (experimental) --- aof/aof.go | 3 + cluster/cluster.go | 1 + cluster/core/core.go | 16 +- cluster/core/core_test.go | 104 +++++++++++++ cluster/core/cron.go | 36 +++++ cluster/core/{leader.go => node_manager.go} | 37 +++-- cluster/core/replica_manager.go | 155 ++++++++++++++++++++ cluster/raft/fsm.go | 133 +++++++++-------- cluster/raft/fsm_utils.go | 114 ++++++++++++++ cluster/raft/raft.go | 95 +++++++----- cluster/raft/utils.go | 42 ++++++ config/config.go | 1 + database/persistence.go | 4 +- database/replication_master.go | 4 +- database/replication_master_test.go | 6 +- database/replication_slave.go | 5 + database/replication_slave_test.go | 109 ++++++++++++-- database/server.go | 2 +- lib/pool/pool.go | 3 +- redis/connection/conn.go | 5 +- 20 files changed, 739 insertions(+), 136 deletions(-) create mode 100644 cluster/core/cron.go rename cluster/core/{leader.go => node_manager.go} (90%) create mode 100644 cluster/core/replica_manager.go create mode 100644 cluster/raft/fsm_utils.go create mode 100644 cluster/raft/utils.go diff --git a/aof/aof.go b/aof/aof.go index 3c1bef0..911eb83 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -259,6 +259,9 @@ func (persister *Persister) Fsync() { // Close gracefully stops aof persistence procedure func (persister *Persister) Close() { + if persister == nil { + return + } if persister.aofFile != nil { close(persister.aofChan) <-persister.aofFinished // wait for aof finished diff --git a/cluster/cluster.go b/cluster/cluster.go index b4edadc..728137b 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -30,6 +30,7 @@ func MakeCluster() *Cluster { }, StartAsSeed: config.Properties.ClusterAsSeed, JoinAddress: config.Properties.ClusterSeed, + Master: config.Properties.MasterInCluster, }) if err != nil { logger.Error(err.Error()) diff --git a/cluster/core/core.go b/cluster/core/core.go index 39cf3ca..3928a9c 100644 --- a/cluster/core/core.go +++ b/cluster/core/core.go @@ -22,6 +22,9 @@ type Cluster struct { slotsManager *slotsManager rebalanceManger *rebalanceManager transactions *TransactionManager + replicaManager *replicaManager + + closeChan chan struct{} // allow inject route implementation getSlotImpl func(key string) uint32 @@ -33,7 +36,9 @@ type Config struct { raft.RaftConfig StartAsSeed bool JoinAddress string + Master string connectionStub ConnectionFactory // for test + noCron bool // for test } func (c *Cluster) SelfID() string { @@ -123,7 +128,11 @@ func NewCluster(cfg *Config) (*Cluster, error) { if err != nil { return nil, err } - result := conn.Send(utils.ToCmdLine(joinClusterCommand, cfg.RedisAdvertiseAddr, cfg.RaftAdvertiseAddr)) + joinCmdLine := utils.ToCmdLine(joinClusterCommand, cfg.RedisAdvertiseAddr, cfg.RaftAdvertiseAddr) + if cfg.Master != "" { + joinCmdLine = append(joinCmdLine, []byte(cfg.Master)) + } + result := conn.Send(joinCmdLine) if err := protocol.Try2ErrorReply(result); err != nil { return nil, err } @@ -137,6 +146,8 @@ func NewCluster(cfg *Config) (*Cluster, error) { rebalanceManger: newRebalanceManager(), slotsManager: newSlotsManager(), transactions: newTransactionManager(), + replicaManager: newReplicaManager(), + closeChan: make(chan struct{}), } cluster.pickNodeImpl = func(slotID uint32) string { return defaultPickNodeImpl(cluster, slotID) @@ -146,6 +157,8 @@ func NewCluster(cfg *Config) (*Cluster, error) { } cluster.injectInsertCallback() cluster.injectDeleteCallback() + cluster.registerOnFailover() + go cluster.clusterCron() return cluster, nil } @@ -155,6 +168,7 @@ func (cluster *Cluster) AfterClientClose(c redis.Connection) { } func (cluster *Cluster) Close() { + close(cluster.closeChan) cluster.db.Close() err := cluster.raftNode.Close() if err != nil { diff --git a/cluster/core/core_test.go b/cluster/core/core_test.go index 2a9bfbb..6ebce8e 100644 --- a/cluster/core/core_test.go +++ b/cluster/core/core_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hdt3213/godis/cluster/raft" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/protocol" @@ -33,6 +34,7 @@ func TestClusterBootstrap(t *testing.T) { }, StartAsSeed: true, connectionStub: connections, + noCron: true, } leader, err := NewCluster(leaderCfg) if err != nil { @@ -72,6 +74,7 @@ func TestClusterBootstrap(t *testing.T) { StartAsSeed: false, JoinAddress: leaderCfg.RedisAdvertiseAddr, connectionStub: connections, + noCron: true, } follower, err := NewCluster(followerCfg) if err != nil { @@ -132,3 +135,104 @@ func TestClusterBootstrap(t *testing.T) { } } } + +func TestFailover(t *testing.T) { + // start leader + leaderDir := "test/0" + os.RemoveAll(leaderDir) + os.MkdirAll(leaderDir, 0777) + defer func() { + os.RemoveAll(leaderDir) + }() + RegisterCmd("slaveof", func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + return protocol.MakeOkReply() + }) + + // connection stub + connections := NewInMemConnectionFactory() + leaderCfg := &Config{ + RaftConfig: raft.RaftConfig{ + RedisAdvertiseAddr: "127.0.0.1:6399", + RaftListenAddr: "127.0.0.1:26666", + RaftAdvertiseAddr: "127.0.0.1:26666", + Dir: leaderDir, + }, + StartAsSeed: true, + connectionStub: connections, + noCron: true, + } + leader, err := NewCluster(leaderCfg) + if err != nil { + t.Error(err) + return + } + connections.nodes[leaderCfg.RedisAdvertiseAddr] = leader + + // start follower + followerDir := "test/1" + os.RemoveAll(followerDir) + os.MkdirAll(followerDir, 0777) + defer func() { + os.RemoveAll(followerDir) + }() + followerCfg := &Config{ + RaftConfig: raft.RaftConfig{ + RedisAdvertiseAddr: "127.0.0.1:6499", + RaftListenAddr: "127.0.0.1:26667", + RaftAdvertiseAddr: "127.0.0.1:26667", + Dir: followerDir, + }, + StartAsSeed: false, + JoinAddress: leaderCfg.RedisAdvertiseAddr, + connectionStub: connections, + noCron: true, + Master: leader.SelfID(), + } + follower, err := NewCluster(followerCfg) + if err != nil { + t.Error(err) + return + } + connections.nodes[followerCfg.RedisAdvertiseAddr] = follower + + _ = follower.SelfID() + // check nodes + joined := false + for i := 0; i < 10; i++ { + nodes, err := leader.raftNode.GetNodes() + if err != nil { + t.Log(err) + continue + } + if len(nodes) == 2 { + t.Log("join success") + joined = true + break + } + time.Sleep(time.Second) + } + if !joined { + t.Error("join failed") + return + } + + // rebalance + leader.replicaManager.masterHeartbeats[leader.SelfID()] = time.Now().Add(-time.Hour) + leader.doFailoverCheck() + time.Sleep(2 * time.Second) + for i := 0; i < 1000; i++ { + success := false + leader.raftNode.FSM.WithReadLock(func(fsm *raft.FSM) { + ms := fsm.MasterSlaves[follower.SelfID()] + if ms != nil && len(ms.Slaves) > 0 { + success = true + } + }) + if success { + t.Log("rebalance success") + break + } else { + time.Sleep(time.Second) + } + } +} diff --git a/cluster/core/cron.go b/cluster/core/cron.go new file mode 100644 index 0000000..5a5d2fe --- /dev/null +++ b/cluster/core/cron.go @@ -0,0 +1,36 @@ +package core + +import ( + "sync/atomic" + "time" + + "github.com/hdt3213/godis/cluster/raft" +) + +func (cluster *Cluster) clusterCron() { + if cluster.config.noCron { + return + } + ticker := time.NewTicker(time.Second) + var running int32 + for { + select { + case <-ticker.C: + if cluster.raftNode.State() == raft.Leader { + if atomic.CompareAndSwapInt32(&running, 0, 1) { + // Disable parallelism + go func() { + cluster.doFailoverCheck() + cluster.doRebalance() + atomic.StoreInt32(&running, 0) + }() + } + } else { + cluster.sendHearbeat() + } + case <-cluster.closeChan: + ticker.Stop() + return + } + } +} diff --git a/cluster/core/leader.go b/cluster/core/node_manager.go similarity index 90% rename from cluster/core/leader.go rename to cluster/core/node_manager.go index 931a7cd..1531802 100644 --- a/cluster/core/leader.go +++ b/cluster/core/node_manager.go @@ -23,10 +23,10 @@ func init() { RegisterCmd(migrationChangeRouteCommand, execMigrationChangeRoute) } -// execJoin handles cluster-join command -// format: cluster-join redisAddress (advertised)raftAddress +// execJoin handles cluster-join command as raft leader +// format: cluster-join redisAddress(advertised), raftAddress, masterId func execJoin(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) != 3 { + if len(cmdLine) < 3 { return protocol.MakeArgNumErrReply(joinClusterCommand) } state := cluster.raftNode.State() @@ -42,10 +42,26 @@ func execJoin(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply // self node is leader redisAddr := string(cmdLine[1]) raftAddr := string(cmdLine[2]) - err := cluster.raftNode.HandleJoin(redisAddr, raftAddr) + err := cluster.raftNode.AddToRaft(redisAddr, raftAddr) if err != nil { return protocol.MakeErrReply(err.Error()) } + master := "" + if len(cmdLine) == 4 { + master = string(cmdLine[3]) + } + _, err = cluster.raftNode.Propose(&raft.LogEntry{ + Event: raft.EventJoin, + JoinTask: &raft.JoinTask{ + NodeId: redisAddr, + Master: master, + }, + }) + if err != nil { + // todo: remove the node from raft + return protocol.MakeErrReply(err.Error()) + } + // join sucees, rebalance node return protocol.MakeOkReply() } @@ -114,17 +130,14 @@ func (cluster *Cluster) triggerMigrationTask(task *raft.MigratingTask) error { } func (cluster *Cluster) makeRebalancePlan() ([]*raft.MigratingTask, error) { - nodes, err := cluster.raftNode.GetNodes() - if err != nil { - return nil, err - } - avgSlot := int(math.Ceil(float64(SlotCount) / float64(len(nodes)))) + var migratings []*raft.MigratingTask cluster.raftNode.FSM.WithReadLock(func(fsm *raft.FSM) { + avgSlot := int(math.Ceil(float64(SlotCount) / float64(len(fsm.MasterSlaves)))) var exportingNodes []string var importingNodes []string - for _, node := range nodes { - nodeId := string(node.ID) + for _, ms := range fsm.MasterSlaves { + nodeId := ms.MasterId nodeSlots := fsm.Node2Slot[nodeId] if len(nodeSlots) > avgSlot+1 { exportingNodes = append(exportingNodes, nodeId) @@ -200,7 +213,7 @@ func (cluster *Cluster) waitCommitted(peer string, logIndex uint64) error { // format: cluster.migration.changeroute taskid func execMigrationChangeRoute(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) != 2 { - return protocol.MakeArgNumErrReply(joinClusterCommand) + return protocol.MakeArgNumErrReply(migrationChangeRouteCommand) } state := cluster.raftNode.State() if state != raft.Leader { diff --git a/cluster/core/replica_manager.go b/cluster/core/replica_manager.go new file mode 100644 index 0000000..3ba5ea2 --- /dev/null +++ b/cluster/core/replica_manager.go @@ -0,0 +1,155 @@ +package core + +import ( + "net" + "sync" + "time" + + "github.com/hdt3213/godis/cluster/raft" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/protocol" +) + +const heartbeatCommand = "cluster.heartbeat" + +func init() { + RegisterCmd(heartbeatCommand, execHeartbeat) +} + +const ( + statusNormal = iota + statusFailing // failover in progress +) + +type replicaManager struct { + mu sync.RWMutex + masterHeartbeats map[string]time.Time // id -> lastHeartbeatTime +} + +func newReplicaManager() *replicaManager { + return &replicaManager{ + masterHeartbeats: make(map[string]time.Time), + } +} + +// execHeartbeat receives heartbeat from follower as raft leader +// cmdLine: cluster.heartbeat nodeId +func execHeartbeat(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 2 { + return protocol.MakeArgNumErrReply(heartbeatCommand) + } + id := string(cmdLine[1]) + cluster.replicaManager.mu.Lock() + cluster.replicaManager.masterHeartbeats[id] = time.Now() + cluster.replicaManager.mu.Unlock() + + return protocol.MakeOkReply() +} + +func (cluster *Cluster) sendHearbeat() { + leaderConn, err := cluster.BorrowLeaderClient() + if err != nil { + logger.Error(err) + } + defer cluster.connections.ReturnPeerClient(leaderConn) + reply := leaderConn.Send(utils.ToCmdLine(heartbeatCommand, cluster.SelfID())) + if err := protocol.Try2ErrorReply(reply); err != nil { + logger.Error(err) + } +} + +const followerTimeout = 10 * time.Second + +func (cluster *Cluster) doFailoverCheck() { + // find timeout masters + var timeoutMasters []*raft.MasterSlave + ddl := time.Now().Add(-followerTimeout) + cluster.replicaManager.mu.RLock() + for masterId, lastTime := range cluster.replicaManager.masterHeartbeats { + if lastTime.Second() == 0 { + // do not set new joined node as timeout + cluster.replicaManager.masterHeartbeats[masterId] = time.Now() + } + if lastTime.Before(ddl) { + slaves := cluster.raftNode.GetSlaves(masterId) + if slaves != nil && len(slaves.Slaves) > 0 { + timeoutMasters = append(timeoutMasters, slaves) + } + + } + } + cluster.replicaManager.mu.RUnlock() + + // trigger failover + for _, failed := range timeoutMasters { + cluster.triggerFailover(failed) + } +} + +func (cluster *Cluster) triggerFailover(failed *raft.MasterSlave) error { + newMaster := failed.Slaves[0] + id := utils.RandString(20) + // propose change + _, err := cluster.raftNode.Propose(&raft.LogEntry{ + Event: raft.EventStartFailover, + FailoverTask: &raft.FailoverTask{ + ID: id, + OldMasterId: failed.MasterId, + NewMasterId: newMaster, + }, + }) + if err != nil { + return err + } + logger.Infof("proposed start failover id=%s, oldMaster=%s, newMaster=%s", id, failed.MasterId, newMaster) + // send slave of to new master + conn, err := cluster.connections.BorrowPeerClient(newMaster) + if err != nil { + return err + } + defer cluster.connections.ReturnPeerClient(conn) + + reply := conn.Send(utils.ToCmdLine("slaveof", "no", "one")) + if err := protocol.Try2ErrorReply(reply); err != nil { + return err + } + + // new master is ready to receive commands, change route + _, err = cluster.raftNode.Propose(&raft.LogEntry{ + Event: raft.EventFinishFailover, + FailoverTask: &raft.FailoverTask{ + ID: id, + OldMasterId: failed.MasterId, + NewMasterId: newMaster, + }, + }) + if err != nil { + return err + } + logger.Infof("proposed finish failover id=%s, oldMaster=%s, newMaster=%s", id, failed.MasterId, newMaster) + // other slaves will listen to raft to change master + return nil +} + +func (cluster *Cluster) registerOnFailover() { + cluster.raftNode.SetOnFailover(func(newMaster string) { + if newMaster != "" && newMaster != cluster.SelfID() { + // old master failed and other node become the new master + // this node may be the old master + ip, port, err := net.SplitHostPort(newMaster) + if err != nil { + logger.Errorf("illegal new master: %s", newMaster) + return + } + c := connection.NewFakeConn() + ret := cluster.db.Exec(c, utils.ToCmdLine("slaveof", ip, port)) + if err := protocol.Try2ErrorReply(ret); err != nil { + logger.Errorf("slave of failed: %v", err) + return + } + } + }) +} diff --git a/cluster/raft/fsm.go b/cluster/raft/fsm.go index 2c5cf3c..f0f61f5 100644 --- a/cluster/raft/fsm.go +++ b/cluster/raft/fsm.go @@ -3,7 +3,6 @@ package raft import ( "encoding/json" "io" - "sort" "sync" "github.com/hashicorp/raft" @@ -18,13 +17,22 @@ import ( // If the target node crashes during migrating, the migration will be canceled. // All related commands will be routed to the source node type FSM struct { - mu sync.RWMutex - Node2Slot map[string][]uint32 // nodeID -> slotIDs, slotIDs is in ascending order and distinct - Slot2Node map[uint32]string // slotID -> nodeID - Migratings map[string]*MigratingTask // taskId -> task + mu sync.RWMutex + Node2Slot map[string][]uint32 // nodeID -> slotIDs, slotIDs is in ascending order and distinct + Slot2Node map[uint32]string // slotID -> nodeID + Migratings map[string]*MigratingTask // taskId -> task + MasterSlaves map[string]*MasterSlave // masterId -> MasterSlave + SlaveMasters map[string]string // slaveId -> masterId + Failovers map[string]*FailoverTask // taskId -> task + changed func(*FSM) // called while fsm changed, within readlock } -// MigratingTask +type MasterSlave struct { + MasterId string + Slaves []string +} + +// MigratingTask is a running migrating task // It is immutable type MigratingTask struct { ID string @@ -35,17 +43,33 @@ type MigratingTask struct { Slots []uint32 } -// InitTask +// InitTask assigns all slots to seed node while starting a new cluster +// It is designed to init the FSM for a new cluster type InitTask struct { Leader string SlotCount int } +// FailoverTask represents a failover or joinery/quitting of slaves +type FailoverTask struct { + ID string + OldMasterId string + NewMasterId string +} + +type JoinTask struct { + NodeId string + Master string +} + // implements FSM.Apply after you created a new raft event const ( EventStartMigrate = iota + 1 EventFinishMigrate EventSeedStart + EventStartFailover + EventFinishFailover + EventJoin ) // LogEntry is an entry in raft log, stores a change of cluster @@ -53,6 +77,8 @@ type LogEntry struct { Event int MigratingTask *MigratingTask `json:"MigratingTask,omitempty"` InitTask *InitTask + FailoverTask *FailoverTask + JoinTask *JoinTask } // Apply is called once a log entry is committed by a majority of the cluster. @@ -82,52 +108,33 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} { slots[i] = uint32(i) } fsm.Node2Slot[entry.InitTask.Leader] = slots + fsm.addNode(entry.InitTask.Leader, "") + } else if entry.Event == EventStartFailover { + task := entry.FailoverTask + fsm.Failovers[task.ID] = task + } else if entry.Event == EventFinishFailover { + task := entry.FailoverTask + // change route + fsm.failover(task.OldMasterId, task.NewMasterId) + slots := fsm.Node2Slot[task.OldMasterId] + fsm.addSlots(task.NewMasterId, slots) + fsm.removeSlots(task.OldMasterId, slots) + delete(fsm.Failovers, task.ID) + } else if entry.Event == EventJoin { + task := entry.JoinTask + fsm.addNode(task.NodeId, task.Master) + } + if fsm.changed != nil { + fsm.changed(fsm) } - return nil } -func (fsm *FSM) addSlots(nodeID string, slots []uint32) { - for _, slotId := range slots { - /// update node2Slot - index := sort.Search(len(fsm.Node2Slot[nodeID]), func(i int) bool { - return fsm.Node2Slot[nodeID][i] >= slotId - }) - if !(index < len(fsm.Node2Slot[nodeID]) && fsm.Node2Slot[nodeID][index] == slotId) { - // not found in node's slots, insert - fsm.Node2Slot[nodeID] = append(fsm.Node2Slot[nodeID][:index], - append([]uint32{slotId}, fsm.Node2Slot[nodeID][index:]...)...) - } - /// update slot2Node - fsm.Slot2Node[slotId] = nodeID - } -} - -func (fsm *FSM) removeSlots(nodeID string, slots []uint32) { - for _, slotId := range slots { - /// update node2slot - index := sort.Search(len(fsm.Node2Slot[nodeID]), func(i int) bool { return fsm.Node2Slot[nodeID][i] >= slotId }) - // found slot remove - for index < len(fsm.Node2Slot[nodeID]) && fsm.Node2Slot[nodeID][index] == slotId { - fsm.Node2Slot[nodeID] = append(fsm.Node2Slot[nodeID][:index], fsm.Node2Slot[nodeID][index+1:]...) - } - // update slot2node - if fsm.Slot2Node[slotId] == nodeID { - delete(fsm.Slot2Node, slotId) - } - } -} - -func (fsm *FSM) GetMigratingTask(taskId string) *MigratingTask { - fsm.mu.RLock() - defer fsm.mu.RUnlock() - return fsm.Migratings[taskId] -} - // FSMSnapshot stores necessary data to restore FSM type FSMSnapshot struct { - Slot2Node map[uint32]string // slotID -> nodeID - Migratings map[string]*MigratingTask + Slot2Node map[uint32]string // slotID -> nodeID + Migratings map[string]*MigratingTask + MasterSlaves map[string]*MasterSlave } func (snapshot *FSMSnapshot) Persist(sink raft.SnapshotSink) error { @@ -161,9 +168,14 @@ func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) { for k, v := range fsm.Migratings { migratings[k] = v } + masterSlaves := make(map[string]*MasterSlave) + for k, v := range fsm.MasterSlaves { + masterSlaves[k] = v + } return &FSMSnapshot{ - Slot2Node: slot2Node, - Migratings: migratings, + Slot2Node: slot2Node, + Migratings: migratings, + MasterSlaves: masterSlaves, }, nil } @@ -181,23 +193,18 @@ func (fsm *FSM) Restore(src io.ReadCloser) error { } fsm.Slot2Node = snapshot.Slot2Node fsm.Migratings = snapshot.Migratings + fsm.MasterSlaves = snapshot.MasterSlaves fsm.Node2Slot = make(map[string][]uint32) for slot, node := range snapshot.Slot2Node { fsm.Node2Slot[node] = append(fsm.Node2Slot[node], slot) } + for master, slaves := range snapshot.MasterSlaves { + for _, slave := range slaves.Slaves { + fsm.SlaveMasters[slave] = master + } + } + if fsm.changed != nil { + fsm.changed(fsm) + } return nil } - -// PickNode returns node hosting slot, ignore migrating -func (fsm *FSM) PickNode(slot uint32) string { - fsm.mu.RLock() - defer fsm.mu.RUnlock() - return fsm.Slot2Node[slot] -} - -// WithReadLock allow invoker do something complicated with read lock -func (fsm *FSM) WithReadLock(fn func(fsm *FSM)) { - fsm.mu.RLock() - defer fsm.mu.RUnlock() - fn(fsm) -} diff --git a/cluster/raft/fsm_utils.go b/cluster/raft/fsm_utils.go new file mode 100644 index 0000000..54514f8 --- /dev/null +++ b/cluster/raft/fsm_utils.go @@ -0,0 +1,114 @@ +package raft + +import ( + "errors" + "sort" +) + +// PickNode returns node hosting slot, ignore migrating +func (fsm *FSM) PickNode(slot uint32) string { + fsm.mu.RLock() + defer fsm.mu.RUnlock() + return fsm.Slot2Node[slot] +} + +// WithReadLock allow invoker do something complicated with read lock +func (fsm *FSM) WithReadLock(fn func(fsm *FSM)) { + fsm.mu.RLock() + defer fsm.mu.RUnlock() + fn(fsm) +} + +func (fsm *FSM) GetMigratingTask(taskId string) *MigratingTask { + fsm.mu.RLock() + defer fsm.mu.RUnlock() + return fsm.Migratings[taskId] +} + +func (fsm *FSM) addSlots(nodeID string, slots []uint32) { + for _, slotId := range slots { + /// update node2Slot + index := sort.Search(len(fsm.Node2Slot[nodeID]), func(i int) bool { + return fsm.Node2Slot[nodeID][i] >= slotId + }) + if !(index < len(fsm.Node2Slot[nodeID]) && fsm.Node2Slot[nodeID][index] == slotId) { + // not found in node's slots, insert + fsm.Node2Slot[nodeID] = append(fsm.Node2Slot[nodeID][:index], + append([]uint32{slotId}, fsm.Node2Slot[nodeID][index:]...)...) + } + /// update slot2Node + fsm.Slot2Node[slotId] = nodeID + } +} + +func (fsm *FSM) removeSlots(nodeID string, slots []uint32) { + for _, slotId := range slots { + /// update node2slot + index := sort.Search(len(fsm.Node2Slot[nodeID]), func(i int) bool { return fsm.Node2Slot[nodeID][i] >= slotId }) + // found slot remove + for index < len(fsm.Node2Slot[nodeID]) && fsm.Node2Slot[nodeID][index] == slotId { + fsm.Node2Slot[nodeID] = append(fsm.Node2Slot[nodeID][:index], fsm.Node2Slot[nodeID][index+1:]...) + } + // update slot2node + if fsm.Slot2Node[slotId] == nodeID { + delete(fsm.Slot2Node, slotId) + } + } +} + +func (fsm *FSM) failover(oldMasterId, newMasterId string) { + oldSlaves := fsm.MasterSlaves[oldMasterId].Slaves + newSlaves := make([]string, 0, len(oldSlaves)) + // change other slaves + for _, slave := range oldSlaves { + if slave != newMasterId { + fsm.SlaveMasters[slave] = newMasterId + newSlaves = append(newSlaves, slave) + } + } + // change old master + delete(fsm.MasterSlaves, oldMasterId) + fsm.SlaveMasters[oldMasterId] = newMasterId + newSlaves = append(newSlaves, oldMasterId) + + // change new master + delete(fsm.SlaveMasters, newMasterId) + fsm.MasterSlaves[newMasterId] = &MasterSlave{ + MasterId: newMasterId, + Slaves: newSlaves, + } +} + +// getMaster returns "" if id points to a master node +func (fsm *FSM) getMaster(id string) string { + master := "" + fsm.WithReadLock(func(fsm *FSM) { + master = fsm.SlaveMasters[id] + }) + return master +} + +func (fsm *FSM) addNode(id, masterId string) error { + if masterId == "" { + fsm.MasterSlaves[id] = &MasterSlave{ + MasterId: id, + } + } else { + master := fsm.MasterSlaves[masterId] + if master == nil { + return errors.New("master not found") + } + exists := false + for _, slave := range master.Slaves { + if slave == id { + exists = true + break + } + } + if !exists { + master.Slaves = append(master.Slaves, id) + } + fsm.SlaveMasters[id] = masterId + } + return nil +} \ No newline at end of file diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index 5dc4925..c6bc22a 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -7,7 +7,6 @@ import ( "net" "os" "path/filepath" - "strconv" "time" "github.com/hashicorp/raft" @@ -22,6 +21,13 @@ type Node struct { stableStore raft.StableStore snapshotStore raft.SnapshotStore transport raft.Transport + watcher watcher +} + +type watcher struct { + watch func(*FSM) + currentMaster string + onFailover func(newMaster string) } type RaftConfig struct { @@ -74,9 +80,12 @@ func StartNode(cfg *RaftConfig) (*Node, error) { } storage := &FSM{ - Node2Slot: make(map[string][]uint32), - Slot2Node: make(map[uint32]string), - Migratings: make(map[string]*MigratingTask), + Node2Slot: make(map[string][]uint32), + Slot2Node: make(map[uint32]string), + Migratings: make(map[string]*MigratingTask), + MasterSlaves: make(map[string]*MasterSlave), + SlaveMasters: make(map[string]string), + Failovers: make(map[string]*FailoverTask), } logStore := boltDB @@ -85,8 +94,7 @@ func StartNode(cfg *RaftConfig) (*Node, error) { if err != nil { return nil, err } - - return &Node{ + node := &Node{ Cfg: cfg, inner: inner, FSM: storage, @@ -94,7 +102,9 @@ func StartNode(cfg *RaftConfig) (*Node, error) { stableStore: stableStore, snapshotStore: snapshotStore, transport: transport, - }, nil + } + node.setupWatch() + return node, nil } func (node *Node) HasExistingState() (bool, error) { @@ -130,37 +140,13 @@ func (node *Node) BootstrapCluster(slotCount int) error { return err } -func (node *Node) Shutdown() error { +func (node *Node) Close() error { future := node.inner.Shutdown() - return future.Error() + return fmt.Errorf("raft shutdown %v", future.Error()) } -func (node *Node) State() raft.RaftState { - return node.inner.State() -} - -func (node *Node) CommittedIndex() (uint64, error) { - stats := node.inner.Stats() - committedIndex0 := stats["commit_index"] - return strconv.ParseUint(committedIndex0, 10, 64) -} - -func (node *Node) GetLeaderRedisAddress() string { - // redis advertise address used as leader id - _, id := node.inner.LeaderWithID() - return string(id) -} - -func (node *Node) GetNodes() ([]raft.Server, error) { - configFuture := node.inner.GetConfiguration() - if err := configFuture.Error(); err != nil { - return nil, fmt.Errorf("failed to get raft configuration: %v", err) - } - return configFuture.Configuration().Servers, nil -} - -// HandleJoin handles join request, node must be leader -func (node *Node) HandleJoin(redisAddr, raftAddr string) error { +// AddToRaft handles join request, node must be leader +func (node *Node) AddToRaft(redisAddr, raftAddr string) error { configFuture := node.inner.GetConfiguration() if err := configFuture.Error(); err != nil { return fmt.Errorf("failed to get raft configuration: %v", err) @@ -175,6 +161,23 @@ func (node *Node) HandleJoin(redisAddr, raftAddr string) error { return future.Error() } +func (node *Node) HandleEvict(redisAddr string) error { + configFuture := node.inner.GetConfiguration() + if err := configFuture.Error(); err != nil { + return fmt.Errorf("failed to get raft configuration: %v", err) + } + id := raft.ServerID(redisAddr) + for _, srv := range configFuture.Configuration().Servers { + if srv.ID == id { + err := node.inner.RemoveServer(srv.ID, 0, 0).Error() + if err != nil { + return err + } + } + } + return nil +} + func (node *Node) Propose(event *LogEntry) (uint64, error) { bin, err := json.Marshal(event) if err != nil { @@ -188,7 +191,21 @@ func (node *Node) Propose(event *LogEntry) (uint64, error) { return future.Index(), nil } -func (node *Node) Close() error { - future := node.inner.Shutdown() - return fmt.Errorf("raft shutdown %v", future.Error()) -} \ No newline at end of file +func (node *Node) setupWatch() { + node.watcher.watch = func(f *FSM) { + newMaster := f.SlaveMasters[node.Self()] + if newMaster != node.watcher.currentMaster { + node.watcher.currentMaster = newMaster + if node.watcher.onFailover != nil { + node.watcher.onFailover(newMaster) + } + } + } +} + +// SetOnFailover sets onFailover callback +// After a failover, onFailover will receive the new master +func (node *Node) SetOnFailover(fn func(newMaster string)) { + node.watcher.currentMaster = node.FSM.getMaster(node.Self()) + node.watcher.onFailover = fn +} diff --git a/cluster/raft/utils.go b/cluster/raft/utils.go new file mode 100644 index 0000000..a47792c --- /dev/null +++ b/cluster/raft/utils.go @@ -0,0 +1,42 @@ +package raft + +import ( + "fmt" + "strconv" + + "github.com/hashicorp/raft" +) + +func (node *Node) Self() string { + return node.Cfg.ID() +} + +func (node *Node) State() raft.RaftState { + return node.inner.State() +} + +func (node *Node) CommittedIndex() (uint64, error) { + stats := node.inner.Stats() + committedIndex0 := stats["commit_index"] + return strconv.ParseUint(committedIndex0, 10, 64) +} + +func (node *Node) GetLeaderRedisAddress() string { + // redis advertise address used as leader id + _, id := node.inner.LeaderWithID() + return string(id) +} + +func (node *Node) GetNodes() ([]raft.Server, error) { + configFuture := node.inner.GetConfiguration() + if err := configFuture.Error(); err != nil { + return nil, fmt.Errorf("failed to get raft configuration: %v", err) + } + return configFuture.Configuration().Servers, nil +} + +func (node *Node) GetSlaves(id string) *MasterSlave { + node.FSM.mu.RLock() + defer node.FSM.mu.RUnlock() + return node.FSM.MasterSlaves[id] +} diff --git a/config/config.go b/config/config.go index 9f02e77..6e458ec 100644 --- a/config/config.go +++ b/config/config.go @@ -46,6 +46,7 @@ type ServerProperties struct { ClusterSeed string `cfg:"cluster-seed"` RaftListenAddr string `cfg:"raft-listen-address"` RaftAdvertiseAddr string `cfg:"raft-advertise-address"` + MasterInCluster string `cfg:"master-in-cluster"` // config file path CfPath string `cfg:"cf,omitempty"` } diff --git a/database/persistence.go b/database/persistence.go index ba02036..6b5b5c6 100644 --- a/database/persistence.go +++ b/database/persistence.go @@ -105,8 +105,8 @@ func (server *Server) AddAof(dbIndex int, cmdLine CmdLine) { } } -func (server *Server) bindPersister(aofHandler *aof.Persister) { - server.persister = aofHandler +func (server *Server) bindPersister(persister *aof.Persister) { + server.persister = persister // bind SaveCmdLine for _, db := range server.dbSet { singleDB := db.Load().(*DB) diff --git a/database/replication_master.go b/database/replication_master.go index 801091a..d024ec0 100644 --- a/database/replication_master.go +++ b/database/replication_master.go @@ -311,7 +311,7 @@ func (server *Server) execPSync(c redis.Connection, args [][]byte) redis.Reply { if err == nil { return } - if err != nil && err != cannotPartialSync { + if err != cannotPartialSync { server.removeSlave(slave) logger.Errorf("masterTryPartialSyncWithSlave error: %v", err) return @@ -422,7 +422,7 @@ func (listener *replAofListener) Callback(cmdLines []CmdLine) { } } -func (server *Server) initMaster() { +func (server *Server) initMasterStatus() { server.masterStatus = &masterStatus{ mu: sync.RWMutex{}, replId: utils.RandHexString(40), diff --git a/database/replication_master_test.go b/database/replication_master_test.go index e8436ef..bd27f9f 100644 --- a/database/replication_master_test.go +++ b/database/replication_master_test.go @@ -11,13 +11,14 @@ import ( "testing" "time" + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" - rdb "github.com/hdt3213/rdb/parser" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/parser" "github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol/asserts" + rdb "github.com/hdt3213/rdb/parser" ) func mockServer() *Server { @@ -31,7 +32,7 @@ func mockServer() *Server { server.dbSet[i] = holder } server.slaveStatus = initReplSlaveStatus() - server.initMaster() + server.initMasterStatus() return server } @@ -212,6 +213,7 @@ func TestReplicationMasterRewriteRDB(t *testing.T) { Databases: 16, AppendOnly: true, AppendFilename: aofFilename, + AppendFsync: aof.FsyncAlways, } master := mockServer() aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true, config.Properties.AppendFsync) diff --git a/database/replication_slave.go b/database/replication_slave.go index d5bf735..a9e627a 100644 --- a/database/replication_slave.go +++ b/database/replication_slave.go @@ -272,6 +272,11 @@ func (server *Server) psyncHandshake() (bool, error) { if err != nil { return false, errors.New("send failed " + err.Error()) } + return server.parsePsyncHandshake() +} + +func (server *Server) parsePsyncHandshake() (bool, error) { + var err error psyncPayload := <-server.slaveStatus.masterChan if psyncPayload.Err != nil { return false, errors.New("read response failed: " + psyncPayload.Err.Error()) diff --git a/database/replication_slave_test.go b/database/replication_slave_test.go index 2bf07f7..15cb544 100644 --- a/database/replication_slave_test.go +++ b/database/replication_slave_test.go @@ -2,18 +2,21 @@ package database import ( "bytes" - "github.com/hdt3213/godis/aof" - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/client" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" - "github.com/hdt3213/godis/redis/protocol/asserts" + "context" "io/ioutil" "os" "path" "testing" "time" + + "github.com/hdt3213/godis/aof" + "github.com/hdt3213/godis/config" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/client" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/parser" + "github.com/hdt3213/godis/redis/protocol" + "github.com/hdt3213/godis/redis/protocol/asserts" ) func TestReplicationSlaveSide(t *testing.T) { @@ -151,12 +154,15 @@ func TestReplicationSlaveSide(t *testing.T) { } // check slave aof file - aofLoader := MakeAuxiliaryServer() - aofHandler2, err := NewPersister(aofLoader, config.Properties.AppendFilename, true, aof.FsyncNo) - aofLoader.bindPersister(aofHandler2) - ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "zz")) + aofCheckServer := MakeAuxiliaryServer() + aofHandler2, err := NewPersister(aofCheckServer, config.Properties.AppendFilename, true, aof.FsyncNo) + if err != nil { + t.Error("create persister failed") + } + aofCheckServer.bindPersister(aofHandler2) + ret = aofCheckServer.Exec(conn, utils.ToCmdLine("get", "zz")) asserts.AssertNullBulk(t, ret) - ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "1")) + ret = aofCheckServer.Exec(conn, utils.ToCmdLine("get", "1")) asserts.AssertBulkReply(t, ret, "4") err = server.slaveStatus.close() @@ -164,3 +170,82 @@ func TestReplicationSlaveSide(t *testing.T) { t.Error("cannot close") } } + +func TestReplicationFailover(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "godis") + if err != nil { + t.Error(err) + return + } + aofFilename := path.Join(tmpDir, "a.aof") + defer func() { + _ = os.Remove(aofFilename) + }() + config.Properties = &config.ServerProperties{ + Databases: 16, + AppendOnly: true, + AppendFilename: aofFilename, + } + conn := connection.NewFakeConn() + server := mockServer() + aofHandler, err := NewPersister(server, aofFilename, true, aof.FsyncAlways) + if err != nil { + t.Error(err) + return + } + server.bindPersister(aofHandler) + + masterCli, err := client.MakeClient("127.0.0.1:6379") + if err != nil { + t.Error(err) + return + } + masterCli.Start() + + // sync with master + ret := masterCli.Send(utils.ToCmdLine("set", "1", "1")) + asserts.AssertStatusReply(t, ret, "OK") + ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379")) + asserts.AssertStatusReply(t, ret, "OK") + success := false + for i := 0; i < 30; i++ { + // wait for sync + time.Sleep(time.Second) + ret = server.Exec(conn, utils.ToCmdLine("GET", "1")) + bulkRet, ok := ret.(*protocol.BulkReply) + if ok { + if bytes.Equal(bulkRet.Arg, []byte("1")) { + success = true + break + } + } + } + if !success { + t.Error("sync failed") + return + } + + t.Log("slave of no one") + ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "no", "one")) + asserts.AssertStatusReply(t, ret, "OK") + server.Exec(conn, utils.ToCmdLine("set", "2", "2")) + + replConn := connection.NewFakeConn() + server.Exec(replConn, utils.ToCmdLine("psync", "?", "-1")) + masterChan := parser.ParseStream(replConn) + serverB := mockServer() + serverB.slaveStatus.masterChan = masterChan + serverB.slaveStatus.configVersion = 0 + serverB.parsePsyncHandshake() + serverB.loadMasterRDB(0) + server.masterCron() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + go serverB.receiveAOF(ctx, 0) + + time.Sleep(3 * time.Second) + ret = serverB.Exec(conn, utils.ToCmdLine("get", "1")) + asserts.AssertBulkReply(t, ret, "1") + ret = serverB.Exec(conn, utils.ToCmdLine("get", "2")) + asserts.AssertBulkReply(t, ret, "2") +} \ No newline at end of file diff --git a/database/server.go b/database/server.go index 2653c85..9e5ca70 100644 --- a/database/server.go +++ b/database/server.go @@ -85,7 +85,7 @@ func NewStandaloneServer() *Server { } } server.slaveStatus = initReplSlaveStatus() - server.initMaster() + server.initMasterStatus() server.startReplCron() server.role = masterRole // The initialization process does not require atomicity return server diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 52a8f42..11b15ec 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -42,6 +42,7 @@ func New(factory func() (interface{}, error), finalizer func(x interface{}), cfg // getOnNoIdle try to create a new connection or waiting for connection being returned // invoker should have pool.mu func (pool *Pool) getOnNoIdle() (interface{}, error) { + pool.mu.Lock() if pool.activeCount >= pool.MaxActive { // waiting for connection being returned req := make(chan interface{}, 1) @@ -74,10 +75,10 @@ func (pool *Pool) Get() (interface{}, error) { pool.mu.Unlock() return nil, ErrClosed } + pool.mu.Unlock() select { case item := <-pool.idles: - pool.mu.Unlock() return item, nil default: // no pooled item, create one diff --git a/redis/connection/conn.go b/redis/connection/conn.go index 8e1755d..9a6ac20 100644 --- a/redis/connection/conn.go +++ b/redis/connection/conn.go @@ -57,7 +57,9 @@ func (c *Connection) RemoteAddr() string { // Close disconnect with the client func (c *Connection) Close() error { c.sendingData.WaitWithTimeout(10 * time.Second) - _ = c.conn.Close() + if c.conn != nil { // may be a fake conn for tests + _ = c.conn.Close() + } c.subs = nil c.password = "" c.queue = nil @@ -219,6 +221,7 @@ func (c *Connection) IsSlave() bool { return c.flags&flagSlave > 0 } +// SetMaster marks c as a connection with master func (c *Connection) SetMaster() { c.flags |= flagMaster }