Files
redis-go/cluster/core/node_manager.go
2025-06-01 22:57:47 +08:00

279 lines
8.5 KiB
Go

package core
import (
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/hdt3213/godis/cluster/raft"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
)
/*
**Rebalance Procedure**
1. Invoke `triggerMigrationTask` on cluster Leader to start a migration task
2. Leader propose EventStartMigrate to raft group then send startMigrationCommand to TargetNode. (at `triggerMigrationTask`)
3. Target Node runs `doImports` after receiving startMigrationCommand
4. Target Node send exportCommand to Source Node.
6. Source Node get migrating task from raft (at `execExport`)
7. SourceNode set task into slotManager to start recording dirty keys during migration. (at `injectInsertCallback`)
8. Source Node dump old data to Target Node
9. Target node send migrationDoneCommand to Source Node. (at `doImports`)
10. Source Node runs `execFinishExport`, lock slots to stop writing
11. Source Node send dirty keys to Target Node
12. Source Node send migrationChangeRouteCommand to Leader
13. Leader porposes EventFinishMigrate to raft and waits Source Node and Target Node receives this entry(at `execMigrationChangeRoute`)
14. Source Node finish exporting, unlock slots, clean data
15. Target Node finish importing, unlock slots, start serve
*/
const joinClusterCommand = "cluster.join"
const migrationChangeRouteCommand = "cluster.migration.changeroute"
func init() {
RegisterCmd(joinClusterCommand, execJoin)
RegisterCmd(migrationChangeRouteCommand, execMigrationChangeRoute)
}
// execJoin handles cluster-join command as raft leader
// format: cluster-join redisAddress(advertised), raftAddress, masterId
func execJoin(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 3 {
return protocol.MakeArgNumErrReply(joinClusterCommand)
}
state := cluster.raftNode.State()
if state != raft.Leader {
// I am not leader, forward request to leader
leaderConn, err := cluster.BorrowLeaderClient()
if err != nil {
return protocol.MakeErrReply(err.Error())
}
defer cluster.connections.ReturnPeerClient(leaderConn)
return leaderConn.Send(cmdLine)
}
// self node is leader
redisAddr := string(cmdLine[1])
raftAddr := string(cmdLine[2])
err := cluster.raftNode.AddToRaft(redisAddr, raftAddr)
if err != nil {
return protocol.MakeErrReply(err.Error())
}
master := ""
if len(cmdLine) == 4 {
master = string(cmdLine[3])
}
_, err = cluster.raftNode.Propose(&raft.LogEntry{
Event: raft.EventJoin,
JoinTask: &raft.JoinTask{
NodeId: redisAddr,
Master: master,
},
})
if err != nil {
// todo: remove the node from raft
return protocol.MakeErrReply(err.Error())
}
// join sucees, rebalance node
return protocol.MakeOkReply()
}
type rebalanceManager struct {
mu *sync.Mutex
}
func newRebalanceManager() *rebalanceManager {
return &rebalanceManager{
mu: &sync.Mutex{},
}
}
func (cluster *Cluster) doRebalance() {
cluster.rebalanceManger.mu.Lock()
defer cluster.rebalanceManger.mu.Unlock()
pendingTasks, err := cluster.makeRebalancePlan()
if err != nil {
logger.Errorf("makeRebalancePlan err: %v", err)
return
}
if len(pendingTasks) == 0 {
return
}
logger.Infof("rebalance plan generated, contains %d tasks", len(pendingTasks))
for _, task := range pendingTasks {
err := cluster.triggerMigrationTask(task)
if err != nil {
logger.Errorf("triggerMigrationTask err: %v", err)
} else {
logger.Infof("triggerMigrationTask %s success", task.ID)
}
}
}
// triggerRebalanceTask start a rebalance task
// only leader can do
func (cluster *Cluster) triggerMigrationTask(task *raft.MigratingTask) error {
// propose migration
_, err := cluster.raftNode.Propose(&raft.LogEntry{
Event: raft.EventStartMigrate,
MigratingTask: task,
})
if err != nil {
return fmt.Errorf("propose EventStartMigrate %s failed: %v", task.ID, err)
}
logger.Infof("propose EventStartMigrate %s success", task.ID)
cmdLine := utils.ToCmdLine(startMigrationCommand, task.ID)
targetNodeConn, err := cluster.connections.BorrowPeerClient(task.TargetNode)
if err != nil {
return err
}
defer cluster.connections.ReturnPeerClient(targetNodeConn)
reply := targetNodeConn.Send(cmdLine)
if protocol.IsOKReply(reply) {
return nil
}
return protocol.MakeErrReply("")
}
func (cluster *Cluster) makeRebalancePlan() ([]*raft.MigratingTask, error) {
var migratings []*raft.MigratingTask
cluster.raftNode.FSM.WithReadLock(func(fsm *raft.FSM) {
avgSlot := int(math.Ceil(float64(SlotCount) / float64(len(fsm.MasterSlaves))))
var exportingNodes []string
var importingNodes []string
for _, ms := range fsm.MasterSlaves {
nodeId := ms.MasterId
nodeSlots := fsm.Node2Slot[nodeId]
if len(nodeSlots) > avgSlot+1 {
exportingNodes = append(exportingNodes, nodeId)
}
if len(nodeSlots) < avgSlot-1 {
importingNodes = append(importingNodes, nodeId)
}
}
importIndex := 0
exportIndex := 0
var exportSlots []uint32
for importIndex < len(importingNodes) && exportIndex < len(exportingNodes) {
exportNode := exportingNodes[exportIndex]
if len(exportSlots) == 0 {
exportNodeSlots := fsm.Node2Slot[exportNode]
exportCount := len(exportNodeSlots) - avgSlot
exportSlots = exportNodeSlots[0:exportCount]
}
importNode := importingNodes[importIndex]
importNodeCurrentIndex := fsm.Node2Slot[importNode]
requirements := avgSlot - len(importNodeCurrentIndex)
task := &raft.MigratingTask{
ID: utils.RandString(20),
SrcNode: exportNode,
TargetNode: importNode,
}
if requirements <= len(exportSlots) {
// exportSlots 可以提供足够的 slots, importingNode 处理完毕
task.Slots = exportSlots[0:requirements]
exportSlots = exportSlots[requirements:]
importIndex++
} else {
// exportSlots 无法提供足够的 slots, exportingNode 处理完毕
task.Slots = exportSlots
exportSlots = nil
exportIndex++
}
migratings = append(migratings, task)
}
})
return migratings, nil
}
func (cluster *Cluster) waitCommitted(peer string, logIndex uint64) error {
srcNodeConn, err := cluster.connections.BorrowPeerClient(peer)
if err != nil {
return err
}
defer cluster.connections.ReturnPeerClient(srcNodeConn)
var peerIndex uint64
for i := 0; i < 50; i++ {
reply := srcNodeConn.Send(utils.ToCmdLine(getCommittedIndexCommand))
switch reply := reply.(type) {
case *protocol.IntReply:
peerIndex = uint64(reply.Code)
if peerIndex >= logIndex {
return nil
}
case *protocol.StandardErrReply:
logger.Infof("get committed index failed: %v", reply.Error())
default:
logger.Infof("get committed index unknown responseL %+v", reply.ToBytes())
}
time.Sleep(time.Millisecond * 100)
}
return errors.New("wait committed timeout")
}
// execMigrationChangeRoute should be exectued at leader
// it proposes EventFinishMigrate through raft, to change the route to the new node
// it returns until the proposal has been accepted by the majority and two related nodes
// format: cluster.migration.changeroute taskid
func execMigrationChangeRoute(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeArgNumErrReply(migrationChangeRouteCommand)
}
state := cluster.raftNode.State()
if state != raft.Leader {
// I am not leader, forward request to leader
leaderConn, err := cluster.BorrowLeaderClient()
if err != nil {
return protocol.MakeErrReply(err.Error())
}
defer cluster.connections.ReturnPeerClient(leaderConn)
return leaderConn.Send(cmdLine)
}
taskId := string(cmdLine[1])
logger.Infof("change route for migration %s", taskId)
task := cluster.raftNode.FSM.GetMigratingTask(taskId)
if task == nil {
return protocol.MakeErrReply("ERR task not found")
}
logger.Infof("change route for migration %s, got task info", taskId)
// propose
logIndex, err := cluster.raftNode.Propose(&raft.LogEntry{
Event: raft.EventFinishMigrate,
MigratingTask: task,
})
if err != nil {
return protocol.MakeErrReply("ERR " + err.Error())
}
logger.Infof("change route for migration %s, raft proposed", taskId)
// confirm the 2 related node committed this log
err = cluster.waitCommitted(task.SrcNode, logIndex)
if err != nil {
return protocol.MakeErrReply("ERR " + err.Error())
}
logger.Infof("change route for migration %s, confirm source node finished", taskId)
err = cluster.waitCommitted(task.TargetNode, logIndex)
if err != nil {
return protocol.MakeErrReply("ERR " + err.Error())
}
logger.Infof("change route for migration %s, confirm target node finished", taskId)
return protocol.MakeOkReply()
}