diff --git a/cluster/core/core.go b/cluster/core/core.go index dbf3627..9589796 100644 --- a/cluster/core/core.go +++ b/cluster/core/core.go @@ -1,11 +1,9 @@ package core import ( - "sync" "github.com/hdt3213/godis/cluster/raft" dbimpl "github.com/hdt3213/godis/database" - "github.com/hdt3213/godis/datastruct/set" "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" @@ -49,57 +47,6 @@ func (c *Cluster) SelfID() string { return c.raftNode.Cfg.ID() } -// slotsManager 负责管理当前 node 上的 slot -type slotsManager struct { - mu *sync.RWMutex - slots map[uint32]*slotStatus // 记录当前node上的 slot - importingTask *raft.MigratingTask -} - -const ( - slotStateHosting = iota - slotStateImporting - slotStateExporting -) - -type slotStatus struct { - mu *sync.RWMutex - state int - keys *set.Set // 记录当前 slot 上的 key - - exportSnapshot *set.Set // 开始传输时拷贝 slot 中的 key, 避免并发并发 - dirtyKeys *set.Set // 传输开始后被修改的key, 在传输结束阶段需要重传一遍 -} - -func newSlotsManager() *slotsManager { - return &slotsManager{ - mu: &sync.RWMutex{}, - slots: map[uint32]*slotStatus{}, - } -} - -func (ssm *slotsManager) getSlot(index uint32) *slotStatus { - ssm.mu.RLock() - slot := ssm.slots[index] - ssm.mu.RUnlock() - if slot != nil { - return slot - } - ssm.mu.Lock() - defer ssm.mu.Unlock() - // check-lock-check - slot = ssm.slots[index] - if slot != nil { - return slot - } - slot = &slotStatus{ - state: slotStateHosting, - keys: set.Make(), - mu: &sync.RWMutex{}, - } - ssm.slots[index] = slot - return slot -} func NewCluster(cfg *Config) (*Cluster, error) { var connections ConnectionFactory diff --git a/cluster/core/migration.go b/cluster/core/migration.go index 0ffec8c..8d9f2e2 100644 --- a/cluster/core/migration.go +++ b/cluster/core/migration.go @@ -2,7 +2,7 @@ package core import ( "fmt" - "strconv" + "sync" "time" "github.com/hdt3213/godis/aof" @@ -26,6 +26,58 @@ func init() { RegisterCmd(startMigrationCommand, execStartMigration) } +// slotsManager 负责管理当前 node 上的 slot +type slotsManager struct { + mu *sync.RWMutex + slots map[uint32]*slotStatus // 记录当前node上的 slot + importingTask *raft.MigratingTask +} + +const ( + slotStateHosting = iota + slotStateImporting + slotStateExporting +) + +type slotStatus struct { + mu *sync.RWMutex + state int + keys *set.Set // 记录当前 slot 上的 key + + exportSnapshot *set.Set // 开始传输时拷贝 slot 中的 key, 避免并发 + dirtyKeys *set.Set // 传输开始后被修改的key, 在传输结束阶段需要重传一遍 +} + +func newSlotsManager() *slotsManager { + return &slotsManager{ + mu: &sync.RWMutex{}, + slots: map[uint32]*slotStatus{}, + } +} + +func (ssm *slotsManager) getSlot(index uint32) *slotStatus { + ssm.mu.RLock() + slot := ssm.slots[index] + ssm.mu.RUnlock() + if slot != nil { + return slot + } + ssm.mu.Lock() + defer ssm.mu.Unlock() + // check-lock-check + slot = ssm.slots[index] + if slot != nil { + return slot + } + slot = &slotStatus{ + state: slotStateHosting, + keys: set.Make(), + mu: &sync.RWMutex{}, + } + ssm.slots[index] = slot + return slot +} + func (sm *slotStatus) startExporting() protocol.ErrorReply { sm.mu.Lock() defer sm.mu.Unlock() @@ -44,6 +96,25 @@ func (sm *slotStatus) finishExportingWithinLock() { sm.exportSnapshot = nil } +func (cluster *Cluster) dropSlot(index uint32) { + cluster.slotsManager.mu.RLock() + slot := cluster.slotsManager.slots[index] + cluster.slotsManager.mu.RUnlock() + if slot == nil { + return + } + slot.mu.Lock() + defer slot.mu.Unlock() + c := connection.NewFakeConn() + slot.keys.ForEach(func(key string) bool { + cluster.LocalExec(c, utils.ToCmdLine("del", key)) + return true + }) + cluster.slotsManager.mu.Lock() + delete(cluster.slotsManager.slots, index) + cluster.slotsManager.mu.Unlock() +} + func (cluster *Cluster) injectInsertCallback() { cb := func(dbIndex int, key string, entity *database.DataEntity) { slotIndex := cluster.GetSlot(key) @@ -187,32 +258,40 @@ func execFinishExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) red } logger.Infof("finishing migration task %s, route changed", taskId) + // clean migrated slots + go func() { + defer func() { + if e := recover(); e != nil { + logger.Errorf("panic %v", e) + } + }() + for _, index := range task.Slots { + cluster.dropSlot(index) + } + }() c.Write(protocol.MakeOkReply().ToBytes()) return &protocol.NoReply{} } // execStartMigration receives startMigrationCommand from leader and start migration job at background -// command line: startMigrationCommand taskId srcNode slotId1 [slotId2]... +// command line: startMigrationCommand taskId func execStartMigration(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - if len(cmdLine) < 4 { + if len(cmdLine) != 2 { return protocol.MakeArgNumErrReply(startMigrationCommand) } taskId := string(cmdLine[1]) - srcNode := string(cmdLine[2]) - var slotIds []uint32 - for _, slotIdStr := range cmdLine[3:] { - slotId, err := strconv.Atoi(string(slotIdStr)) - if err != nil { - return protocol.MakeErrReply("illegal slot id: " + string(slotIdStr)) + + var task *raft.MigratingTask + for i := 0; i < 50; i++ { + task = cluster.raftNode.FSM.GetMigratingTask(taskId) + if task == nil { + time.Sleep(time.Millisecond * 100) } - slotIds = append(slotIds, uint32(slotId)) } - task := &raft.MigratingTask{ - ID: taskId, - SrcNode: srcNode, - TargetNode: cluster.SelfID(), - Slots: slotIds, + if task == nil { + return protocol.MakeErrReply("ERR get migrating task timeout") } + cluster.slotsManager.mu.Lock() cluster.slotsManager.importingTask = task cluster.slotsManager.mu.Unlock() diff --git a/cluster/core/node_manager.go b/cluster/core/node_manager.go index 414710d..c669ffa 100644 --- a/cluster/core/node_manager.go +++ b/cluster/core/node_manager.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "math" - "strconv" "sync" "time" @@ -15,6 +14,29 @@ import ( "github.com/hdt3213/godis/redis/protocol" ) +/* + +**Rebalance Procedure** +1. Invoke `triggerMigrationTask` on cluster Leader to start a migration task +2. Leader propose EventStartMigrate to raft group then send startMigrationCommand to TargetNode. (at `triggerMigrationTask`) + +3. Target Node runs `doImports` after receiving startMigrationCommand +4. Target Node send exportCommand to Source Node. + +6. Source Node get migrating task from raft (at `execExport`) +7. SourceNode set task into slotManager to start recording dirty keys during migration. (at `injectInsertCallback`) +8. Source Node dump old data to Target Node + +9. Target node send migrationDoneCommand to Source Node. (at `doImports`) +10. Source Node runs `execFinishExport`, lock slots to stop writing +11. Source Node send dirty keys to Target Node + +12. Source Node send migrationChangeRouteCommand to Leader +13. Leader porposes EventFinishMigrate to raft and waits Source Node and Target Node receives this entry(at `execMigrationChangeRoute`) +14. Source Node finish exporting, unlock slots, clean data +15. Target Node finish importing, unlock slots, start serve +*/ + const joinClusterCommand = "cluster.join" const migrationChangeRouteCommand = "cluster.migration.changeroute" @@ -113,11 +135,7 @@ func (cluster *Cluster) triggerMigrationTask(task *raft.MigratingTask) error { } logger.Infof("propose EventStartMigrate %s success", task.ID) - cmdLine := utils.ToCmdLine(startMigrationCommand, task.ID, task.SrcNode) - for _, slotId := range task.Slots { - slotIdStr := strconv.Itoa(int(slotId)) - cmdLine = append(cmdLine, []byte(slotIdStr)) - } + cmdLine := utils.ToCmdLine(startMigrationCommand, task.ID) targetNodeConn, err := cluster.connections.BorrowPeerClient(task.TargetNode) if err != nil { return err