mirror of
https://github.com/HDT3213/godis.git
synced 2025-11-03 11:02:17 +08:00
optimize migration; clean migrated data
This commit is contained in:
@@ -2,7 +2,7 @@ package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hdt3213/godis/aof"
|
||||
@@ -26,6 +26,58 @@ func init() {
|
||||
RegisterCmd(startMigrationCommand, execStartMigration)
|
||||
}
|
||||
|
||||
// slotsManager 负责管理当前 node 上的 slot
|
||||
type slotsManager struct {
|
||||
mu *sync.RWMutex
|
||||
slots map[uint32]*slotStatus // 记录当前node上的 slot
|
||||
importingTask *raft.MigratingTask
|
||||
}
|
||||
|
||||
const (
|
||||
slotStateHosting = iota
|
||||
slotStateImporting
|
||||
slotStateExporting
|
||||
)
|
||||
|
||||
type slotStatus struct {
|
||||
mu *sync.RWMutex
|
||||
state int
|
||||
keys *set.Set // 记录当前 slot 上的 key
|
||||
|
||||
exportSnapshot *set.Set // 开始传输时拷贝 slot 中的 key, 避免并发
|
||||
dirtyKeys *set.Set // 传输开始后被修改的key, 在传输结束阶段需要重传一遍
|
||||
}
|
||||
|
||||
func newSlotsManager() *slotsManager {
|
||||
return &slotsManager{
|
||||
mu: &sync.RWMutex{},
|
||||
slots: map[uint32]*slotStatus{},
|
||||
}
|
||||
}
|
||||
|
||||
func (ssm *slotsManager) getSlot(index uint32) *slotStatus {
|
||||
ssm.mu.RLock()
|
||||
slot := ssm.slots[index]
|
||||
ssm.mu.RUnlock()
|
||||
if slot != nil {
|
||||
return slot
|
||||
}
|
||||
ssm.mu.Lock()
|
||||
defer ssm.mu.Unlock()
|
||||
// check-lock-check
|
||||
slot = ssm.slots[index]
|
||||
if slot != nil {
|
||||
return slot
|
||||
}
|
||||
slot = &slotStatus{
|
||||
state: slotStateHosting,
|
||||
keys: set.Make(),
|
||||
mu: &sync.RWMutex{},
|
||||
}
|
||||
ssm.slots[index] = slot
|
||||
return slot
|
||||
}
|
||||
|
||||
func (sm *slotStatus) startExporting() protocol.ErrorReply {
|
||||
sm.mu.Lock()
|
||||
defer sm.mu.Unlock()
|
||||
@@ -44,6 +96,25 @@ func (sm *slotStatus) finishExportingWithinLock() {
|
||||
sm.exportSnapshot = nil
|
||||
}
|
||||
|
||||
func (cluster *Cluster) dropSlot(index uint32) {
|
||||
cluster.slotsManager.mu.RLock()
|
||||
slot := cluster.slotsManager.slots[index]
|
||||
cluster.slotsManager.mu.RUnlock()
|
||||
if slot == nil {
|
||||
return
|
||||
}
|
||||
slot.mu.Lock()
|
||||
defer slot.mu.Unlock()
|
||||
c := connection.NewFakeConn()
|
||||
slot.keys.ForEach(func(key string) bool {
|
||||
cluster.LocalExec(c, utils.ToCmdLine("del", key))
|
||||
return true
|
||||
})
|
||||
cluster.slotsManager.mu.Lock()
|
||||
delete(cluster.slotsManager.slots, index)
|
||||
cluster.slotsManager.mu.Unlock()
|
||||
}
|
||||
|
||||
func (cluster *Cluster) injectInsertCallback() {
|
||||
cb := func(dbIndex int, key string, entity *database.DataEntity) {
|
||||
slotIndex := cluster.GetSlot(key)
|
||||
@@ -187,32 +258,40 @@ func execFinishExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) red
|
||||
}
|
||||
logger.Infof("finishing migration task %s, route changed", taskId)
|
||||
|
||||
// clean migrated slots
|
||||
go func() {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
logger.Errorf("panic %v", e)
|
||||
}
|
||||
}()
|
||||
for _, index := range task.Slots {
|
||||
cluster.dropSlot(index)
|
||||
}
|
||||
}()
|
||||
c.Write(protocol.MakeOkReply().ToBytes())
|
||||
return &protocol.NoReply{}
|
||||
}
|
||||
|
||||
// execStartMigration receives startMigrationCommand from leader and start migration job at background
|
||||
// command line: startMigrationCommand taskId srcNode slotId1 [slotId2]...
|
||||
// command line: startMigrationCommand taskId
|
||||
func execStartMigration(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||
if len(cmdLine) < 4 {
|
||||
if len(cmdLine) != 2 {
|
||||
return protocol.MakeArgNumErrReply(startMigrationCommand)
|
||||
}
|
||||
taskId := string(cmdLine[1])
|
||||
srcNode := string(cmdLine[2])
|
||||
var slotIds []uint32
|
||||
for _, slotIdStr := range cmdLine[3:] {
|
||||
slotId, err := strconv.Atoi(string(slotIdStr))
|
||||
if err != nil {
|
||||
return protocol.MakeErrReply("illegal slot id: " + string(slotIdStr))
|
||||
|
||||
var task *raft.MigratingTask
|
||||
for i := 0; i < 50; i++ {
|
||||
task = cluster.raftNode.FSM.GetMigratingTask(taskId)
|
||||
if task == nil {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
slotIds = append(slotIds, uint32(slotId))
|
||||
}
|
||||
task := &raft.MigratingTask{
|
||||
ID: taskId,
|
||||
SrcNode: srcNode,
|
||||
TargetNode: cluster.SelfID(),
|
||||
Slots: slotIds,
|
||||
if task == nil {
|
||||
return protocol.MakeErrReply("ERR get migrating task timeout")
|
||||
}
|
||||
|
||||
cluster.slotsManager.mu.Lock()
|
||||
cluster.slotsManager.importingTask = task
|
||||
cluster.slotsManager.mu.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user