mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-07 01:32:56 +08:00
recover master slave relationship after restart
This commit is contained in:
@@ -113,6 +113,29 @@ func NewCluster(cfg *Config) (*Cluster, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cluster := &Cluster{
|
||||||
|
raftNode: raftNode,
|
||||||
|
db: db,
|
||||||
|
connections: connections,
|
||||||
|
config: cfg,
|
||||||
|
rebalanceManger: newRebalanceManager(),
|
||||||
|
slotsManager: newSlotsManager(),
|
||||||
|
transactions: newTransactionManager(),
|
||||||
|
replicaManager: newReplicaManager(),
|
||||||
|
closeChan: make(chan struct{}),
|
||||||
|
}
|
||||||
|
cluster.pickNodeImpl = func(slotID uint32) string {
|
||||||
|
return defaultPickNodeImpl(cluster, slotID)
|
||||||
|
}
|
||||||
|
cluster.getSlotImpl = func(key string) uint32 {
|
||||||
|
return defaultGetSlotImpl(cluster, key)
|
||||||
|
}
|
||||||
|
cluster.injectInsertCallback()
|
||||||
|
cluster.injectDeleteCallback()
|
||||||
|
cluster.registerOnFailover()
|
||||||
|
|
||||||
|
// setup
|
||||||
hasState, err := raftNode.HasExistingState()
|
hasState, err := raftNode.HasExistingState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -140,34 +163,23 @@ func NewCluster(cfg *Config) (*Cluster, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
masterAddr := cluster.raftNode.FSM.GetMaster(cluster.SelfID())
|
||||||
|
if masterAddr != "" {
|
||||||
|
err := cluster.SlaveOf(masterAddr)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
cluster := &Cluster{
|
|
||||||
raftNode: raftNode,
|
|
||||||
db: db,
|
|
||||||
connections: connections,
|
|
||||||
config: cfg,
|
|
||||||
rebalanceManger: newRebalanceManager(),
|
|
||||||
slotsManager: newSlotsManager(),
|
|
||||||
transactions: newTransactionManager(),
|
|
||||||
replicaManager: newReplicaManager(),
|
|
||||||
closeChan: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
cluster.pickNodeImpl = func(slotID uint32) string {
|
|
||||||
return defaultPickNodeImpl(cluster, slotID)
|
|
||||||
}
|
}
|
||||||
cluster.getSlotImpl = func(key string) uint32 {
|
|
||||||
return defaultGetSlotImpl(cluster, key)
|
|
||||||
}
|
|
||||||
cluster.injectInsertCallback()
|
|
||||||
cluster.injectDeleteCallback()
|
|
||||||
cluster.registerOnFailover()
|
|
||||||
go cluster.clusterCron()
|
go cluster.clusterCron()
|
||||||
return cluster, nil
|
return cluster, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AfterClientClose does some clean after client close connection
|
// AfterClientClose does some clean after client close connection
|
||||||
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
|
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
|
||||||
|
cluster.db.AfterClientClose(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cluster *Cluster) Close() {
|
func (cluster *Cluster) Close() {
|
||||||
|
@@ -13,6 +13,14 @@ import (
|
|||||||
"github.com/hdt3213/godis/redis/protocol"
|
"github.com/hdt3213/godis/redis/protocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
1. The master and slave are both added to the raft group, and failover does not involve changes of raft members.
|
||||||
|
2. Timer job `doFailoverCheck` finds timeout masters, then calls `triggerFailover`
|
||||||
|
3. Raft leader sends `slaveof no one` to new master
|
||||||
|
4. Raft proposes `EventFinishFailover` to change route.
|
||||||
|
Other slaves and old master will get this message from raft, and become slave of new master.(see cluster.registerOnFailover)
|
||||||
|
*/
|
||||||
|
|
||||||
const heartbeatCommand = "cluster.heartbeat"
|
const heartbeatCommand = "cluster.heartbeat"
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -93,19 +101,7 @@ func (cluster *Cluster) doFailoverCheck() {
|
|||||||
func (cluster *Cluster) triggerFailover(failed *raft.MasterSlave) error {
|
func (cluster *Cluster) triggerFailover(failed *raft.MasterSlave) error {
|
||||||
newMaster := failed.Slaves[0]
|
newMaster := failed.Slaves[0]
|
||||||
id := utils.RandString(20)
|
id := utils.RandString(20)
|
||||||
// propose change
|
logger.Infof("start failover id=%s, oldMaster=%s, newMaster=%s", id, failed.MasterId, newMaster)
|
||||||
_, err := cluster.raftNode.Propose(&raft.LogEntry{
|
|
||||||
Event: raft.EventStartFailover,
|
|
||||||
FailoverTask: &raft.FailoverTask{
|
|
||||||
ID: id,
|
|
||||||
OldMasterId: failed.MasterId,
|
|
||||||
NewMasterId: newMaster,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
logger.Infof("proposed start failover id=%s, oldMaster=%s, newMaster=%s", id, failed.MasterId, newMaster)
|
|
||||||
// send slave of to new master
|
// send slave of to new master
|
||||||
conn, err := cluster.connections.BorrowPeerClient(newMaster)
|
conn, err := cluster.connections.BorrowPeerClient(newMaster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -1,10 +1,14 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hdt3213/godis/interface/redis"
|
"github.com/hdt3213/godis/interface/redis"
|
||||||
|
"github.com/hdt3213/godis/lib/utils"
|
||||||
|
"github.com/hdt3213/godis/redis/connection"
|
||||||
"github.com/hdt3213/godis/redis/protocol"
|
"github.com/hdt3213/godis/redis/protocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -44,6 +48,19 @@ func (cluster *Cluster) LocalExecWithinLock(c redis.Connection, cmdLine [][]byte
|
|||||||
return cluster.db.ExecWithLock(c, cmdLine)
|
return cluster.db.ExecWithLock(c, cmdLine)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cluster *Cluster) SlaveOf(master string) error {
|
||||||
|
host, port, err := net.SplitHostPort(master)
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("invalid master address")
|
||||||
|
}
|
||||||
|
c := connection.NewFakeConn()
|
||||||
|
reply := cluster.db.Exec(c, utils.ToCmdLine("slaveof", host, port))
|
||||||
|
if err := protocol.Try2ErrorReply(reply); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetPartitionKey extract hashtag
|
// GetPartitionKey extract hashtag
|
||||||
func GetPartitionKey(key string) string {
|
func GetPartitionKey(key string) string {
|
||||||
beg := strings.Index(key, "{")
|
beg := strings.Index(key, "{")
|
||||||
|
@@ -67,7 +67,6 @@ const (
|
|||||||
EventStartMigrate = iota + 1
|
EventStartMigrate = iota + 1
|
||||||
EventFinishMigrate
|
EventFinishMigrate
|
||||||
EventSeedStart
|
EventSeedStart
|
||||||
EventStartFailover
|
|
||||||
EventFinishFailover
|
EventFinishFailover
|
||||||
EventJoin
|
EventJoin
|
||||||
)
|
)
|
||||||
@@ -109,9 +108,6 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} {
|
|||||||
}
|
}
|
||||||
fsm.Node2Slot[entry.InitTask.Leader] = slots
|
fsm.Node2Slot[entry.InitTask.Leader] = slots
|
||||||
fsm.addNode(entry.InitTask.Leader, "")
|
fsm.addNode(entry.InitTask.Leader, "")
|
||||||
} else if entry.Event == EventStartFailover {
|
|
||||||
task := entry.FailoverTask
|
|
||||||
fsm.Failovers[task.ID] = task
|
|
||||||
} else if entry.Event == EventFinishFailover {
|
} else if entry.Event == EventFinishFailover {
|
||||||
task := entry.FailoverTask
|
task := entry.FailoverTask
|
||||||
// change route
|
// change route
|
||||||
|
@@ -79,8 +79,9 @@ func (fsm *FSM) failover(oldMasterId, newMasterId string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getMaster returns "" if id points to a master node
|
// GetMaster returns the master's redis service addres
|
||||||
func (fsm *FSM) getMaster(id string) string {
|
// Returns empty string ("") if id points to a master node
|
||||||
|
func (fsm *FSM) GetMaster(id string) string {
|
||||||
master := ""
|
master := ""
|
||||||
fsm.WithReadLock(func(fsm *FSM) {
|
fsm.WithReadLock(func(fsm *FSM) {
|
||||||
master = fsm.SlaveMasters[id]
|
master = fsm.SlaveMasters[id]
|
||||||
|
@@ -191,6 +191,7 @@ func (node *Node) Propose(event *LogEntry) (uint64, error) {
|
|||||||
return future.Index(), nil
|
return future.Index(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setupWatch will be called after any changes of FSM within lock.
|
||||||
func (node *Node) setupWatch() {
|
func (node *Node) setupWatch() {
|
||||||
node.watcher.watch = func(f *FSM) {
|
node.watcher.watch = func(f *FSM) {
|
||||||
newMaster := f.SlaveMasters[node.Self()]
|
newMaster := f.SlaveMasters[node.Self()]
|
||||||
@@ -206,6 +207,6 @@ func (node *Node) setupWatch() {
|
|||||||
// SetOnFailover sets onFailover callback
|
// SetOnFailover sets onFailover callback
|
||||||
// After a failover, onFailover will receive the new master
|
// After a failover, onFailover will receive the new master
|
||||||
func (node *Node) SetOnFailover(fn func(newMaster string)) {
|
func (node *Node) SetOnFailover(fn func(newMaster string)) {
|
||||||
node.watcher.currentMaster = node.FSM.getMaster(node.Self())
|
node.watcher.currentMaster = node.FSM.GetMaster(node.Self())
|
||||||
node.watcher.onFailover = fn
|
node.watcher.onFailover = fn
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user