diff --git a/cluster/core/core.go b/cluster/core/core.go index d1f792c..dbf3627 100644 --- a/cluster/core/core.go +++ b/cluster/core/core.go @@ -113,6 +113,29 @@ func NewCluster(cfg *Config) (*Cluster, error) { if err != nil { return nil, err } + + cluster := &Cluster{ + raftNode: raftNode, + db: db, + connections: connections, + config: cfg, + rebalanceManger: newRebalanceManager(), + slotsManager: newSlotsManager(), + transactions: newTransactionManager(), + replicaManager: newReplicaManager(), + closeChan: make(chan struct{}), + } + cluster.pickNodeImpl = func(slotID uint32) string { + return defaultPickNodeImpl(cluster, slotID) + } + cluster.getSlotImpl = func(key string) uint32 { + return defaultGetSlotImpl(cluster, key) + } + cluster.injectInsertCallback() + cluster.injectDeleteCallback() + cluster.registerOnFailover() + + // setup hasState, err := raftNode.HasExistingState() if err != nil { return nil, err @@ -140,34 +163,23 @@ func NewCluster(cfg *Config) (*Cluster, error) { return nil, err } } + } else { + masterAddr := cluster.raftNode.FSM.GetMaster(cluster.SelfID()) + if masterAddr != "" { + err := cluster.SlaveOf(masterAddr) + if err != nil { + panic(err) + } + } } - cluster := &Cluster{ - raftNode: raftNode, - db: db, - connections: connections, - config: cfg, - rebalanceManger: newRebalanceManager(), - slotsManager: newSlotsManager(), - transactions: newTransactionManager(), - replicaManager: newReplicaManager(), - closeChan: make(chan struct{}), - } - cluster.pickNodeImpl = func(slotID uint32) string { - return defaultPickNodeImpl(cluster, slotID) - } - cluster.getSlotImpl = func(key string) uint32 { - return defaultGetSlotImpl(cluster, key) - } - cluster.injectInsertCallback() - cluster.injectDeleteCallback() - cluster.registerOnFailover() + go cluster.clusterCron() return cluster, nil } // AfterClientClose does some clean after client close connection func (cluster *Cluster) AfterClientClose(c redis.Connection) { - + cluster.db.AfterClientClose(c) } func (cluster *Cluster) Close() { diff --git a/cluster/core/replica_manager.go b/cluster/core/replica_manager.go index bcf428b..1ce5cec 100644 --- a/cluster/core/replica_manager.go +++ b/cluster/core/replica_manager.go @@ -13,6 +13,14 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) +/* +1. The master and slave are both added to the raft group, and failover does not involve changes of raft members. +2. Timer job `doFailoverCheck` finds timeout masters, then calls `triggerFailover` +3. Raft leader sends `slaveof no one` to new master +4. Raft proposes `EventFinishFailover` to change route. +Other slaves and old master will get this message from raft, and become slave of new master.(see cluster.registerOnFailover) +*/ + const heartbeatCommand = "cluster.heartbeat" func init() { @@ -93,19 +101,7 @@ func (cluster *Cluster) doFailoverCheck() { func (cluster *Cluster) triggerFailover(failed *raft.MasterSlave) error { newMaster := failed.Slaves[0] id := utils.RandString(20) - // propose change - _, err := cluster.raftNode.Propose(&raft.LogEntry{ - Event: raft.EventStartFailover, - FailoverTask: &raft.FailoverTask{ - ID: id, - OldMasterId: failed.MasterId, - NewMasterId: newMaster, - }, - }) - if err != nil { - return err - } - logger.Infof("proposed start failover id=%s, oldMaster=%s, newMaster=%s", id, failed.MasterId, newMaster) + logger.Infof("start failover id=%s, oldMaster=%s, newMaster=%s", id, failed.MasterId, newMaster) // send slave of to new master conn, err := cluster.connections.BorrowPeerClient(newMaster) if err != nil { diff --git a/cluster/core/utils.go b/cluster/core/utils.go index 9b9816e..dc74006 100644 --- a/cluster/core/utils.go +++ b/cluster/core/utils.go @@ -1,10 +1,14 @@ package core import ( + "errors" "hash/crc32" + "net" "strings" "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/protocol" ) @@ -44,6 +48,19 @@ func (cluster *Cluster) LocalExecWithinLock(c redis.Connection, cmdLine [][]byte return cluster.db.ExecWithLock(c, cmdLine) } +func (cluster *Cluster) SlaveOf(master string) error { + host, port, err := net.SplitHostPort(master) + if err != nil { + return errors.New("invalid master address") + } + c := connection.NewFakeConn() + reply := cluster.db.Exec(c, utils.ToCmdLine("slaveof", host, port)) + if err := protocol.Try2ErrorReply(reply); err != nil { + return err + } + return nil +} + // GetPartitionKey extract hashtag func GetPartitionKey(key string) string { beg := strings.Index(key, "{") diff --git a/cluster/raft/fsm.go b/cluster/raft/fsm.go index f0f61f5..180b0bf 100644 --- a/cluster/raft/fsm.go +++ b/cluster/raft/fsm.go @@ -67,7 +67,6 @@ const ( EventStartMigrate = iota + 1 EventFinishMigrate EventSeedStart - EventStartFailover EventFinishFailover EventJoin ) @@ -109,9 +108,6 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} { } fsm.Node2Slot[entry.InitTask.Leader] = slots fsm.addNode(entry.InitTask.Leader, "") - } else if entry.Event == EventStartFailover { - task := entry.FailoverTask - fsm.Failovers[task.ID] = task } else if entry.Event == EventFinishFailover { task := entry.FailoverTask // change route diff --git a/cluster/raft/fsm_utils.go b/cluster/raft/fsm_utils.go index 54514f8..6bf6c93 100644 --- a/cluster/raft/fsm_utils.go +++ b/cluster/raft/fsm_utils.go @@ -79,8 +79,9 @@ func (fsm *FSM) failover(oldMasterId, newMasterId string) { } } -// getMaster returns "" if id points to a master node -func (fsm *FSM) getMaster(id string) string { +// GetMaster returns the master's redis service addres +// Returns empty string ("") if id points to a master node +func (fsm *FSM) GetMaster(id string) string { master := "" fsm.WithReadLock(func(fsm *FSM) { master = fsm.SlaveMasters[id] @@ -111,4 +112,4 @@ func (fsm *FSM) addNode(id, masterId string) error { fsm.SlaveMasters[id] = masterId } return nil -} \ No newline at end of file +} diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index c6bc22a..63e0e9c 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -191,6 +191,7 @@ func (node *Node) Propose(event *LogEntry) (uint64, error) { return future.Index(), nil } +// setupWatch will be called after any changes of FSM within lock. func (node *Node) setupWatch() { node.watcher.watch = func(f *FSM) { newMaster := f.SlaveMasters[node.Self()] @@ -206,6 +207,6 @@ func (node *Node) setupWatch() { // SetOnFailover sets onFailover callback // After a failover, onFailover will receive the new master func (node *Node) SetOnFailover(fn func(newMaster string)) { - node.watcher.currentMaster = node.FSM.getMaster(node.Self()) + node.watcher.currentMaster = node.FSM.GetMaster(node.Self()) node.watcher.onFailover = fn }