mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-05 16:57:06 +08:00
381 lines
11 KiB
Go
381 lines
11 KiB
Go
package core
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hdt3213/godis/aof"
|
|
"github.com/hdt3213/godis/cluster/raft"
|
|
"github.com/hdt3213/godis/datastruct/set"
|
|
"github.com/hdt3213/godis/interface/database"
|
|
"github.com/hdt3213/godis/interface/redis"
|
|
"github.com/hdt3213/godis/lib/logger"
|
|
"github.com/hdt3213/godis/lib/utils"
|
|
"github.com/hdt3213/godis/redis/connection"
|
|
"github.com/hdt3213/godis/redis/protocol"
|
|
)
|
|
|
|
const exportCommand = "cluster.migration.export"
|
|
const migrationDoneCommand = "cluster.migration.done"
|
|
const startMigrationCommand = "cluster.migration.start"
|
|
|
|
func init() {
|
|
RegisterCmd(exportCommand, execExport)
|
|
RegisterCmd(migrationDoneCommand, execFinishExport)
|
|
RegisterCmd(startMigrationCommand, execStartMigration)
|
|
}
|
|
|
|
// slotsManager 负责管理当前 node 上的 slot
|
|
type slotsManager struct {
|
|
mu *sync.RWMutex
|
|
slots map[uint32]*slotStatus // 记录当前node上的 slot
|
|
importingTask *raft.MigratingTask
|
|
}
|
|
|
|
const (
|
|
slotStateHosting = iota
|
|
slotStateImporting
|
|
slotStateExporting
|
|
)
|
|
|
|
type slotStatus struct {
|
|
mu *sync.RWMutex
|
|
state int
|
|
keys *set.Set // 记录当前 slot 上的 key
|
|
|
|
exportSnapshot *set.Set // 开始传输时拷贝 slot 中的 key, 避免并发
|
|
dirtyKeys *set.Set // 传输开始后被修改的key, 在传输结束阶段需要重传一遍
|
|
}
|
|
|
|
func newSlotsManager() *slotsManager {
|
|
return &slotsManager{
|
|
mu: &sync.RWMutex{},
|
|
slots: map[uint32]*slotStatus{},
|
|
}
|
|
}
|
|
|
|
func (ssm *slotsManager) getSlot(index uint32) *slotStatus {
|
|
ssm.mu.RLock()
|
|
slot := ssm.slots[index]
|
|
ssm.mu.RUnlock()
|
|
if slot != nil {
|
|
return slot
|
|
}
|
|
ssm.mu.Lock()
|
|
defer ssm.mu.Unlock()
|
|
// check-lock-check
|
|
slot = ssm.slots[index]
|
|
if slot != nil {
|
|
return slot
|
|
}
|
|
slot = &slotStatus{
|
|
state: slotStateHosting,
|
|
keys: set.Make(),
|
|
mu: &sync.RWMutex{},
|
|
}
|
|
ssm.slots[index] = slot
|
|
return slot
|
|
}
|
|
|
|
func (sm *slotStatus) startExporting() protocol.ErrorReply {
|
|
sm.mu.Lock()
|
|
defer sm.mu.Unlock()
|
|
if sm.state != slotStateHosting {
|
|
return protocol.MakeErrReply("slot host is not hosting")
|
|
}
|
|
sm.state = slotStateExporting
|
|
sm.dirtyKeys = set.Make()
|
|
sm.exportSnapshot = sm.keys.ShallowCopy()
|
|
return nil
|
|
}
|
|
|
|
func (sm *slotStatus) finishExportingWithinLock() {
|
|
sm.state = slotStateHosting
|
|
sm.dirtyKeys = nil
|
|
sm.exportSnapshot = nil
|
|
}
|
|
|
|
func (cluster *Cluster) dropSlot(index uint32) {
|
|
cluster.slotsManager.mu.RLock()
|
|
slot := cluster.slotsManager.slots[index]
|
|
cluster.slotsManager.mu.RUnlock()
|
|
if slot == nil {
|
|
return
|
|
}
|
|
slot.mu.Lock()
|
|
defer slot.mu.Unlock()
|
|
c := connection.NewFakeConn()
|
|
slot.keys.ForEach(func(key string) bool {
|
|
cluster.LocalExec(c, utils.ToCmdLine("del", key))
|
|
return true
|
|
})
|
|
cluster.slotsManager.mu.Lock()
|
|
delete(cluster.slotsManager.slots, index)
|
|
cluster.slotsManager.mu.Unlock()
|
|
}
|
|
|
|
func (cluster *Cluster) injectInsertCallback() {
|
|
cb := func(dbIndex int, key string, entity *database.DataEntity) {
|
|
slotIndex := cluster.GetSlot(key)
|
|
slotManager := cluster.slotsManager.getSlot(slotIndex)
|
|
slotManager.mu.Lock()
|
|
defer slotManager.mu.Unlock()
|
|
slotManager.keys.Add(key)
|
|
if slotManager.state == slotStateExporting {
|
|
slotManager.dirtyKeys.Add(key)
|
|
}
|
|
}
|
|
cluster.db.SetKeyInsertedCallback(cb)
|
|
}
|
|
|
|
func (cluster *Cluster) injectDeleteCallback() {
|
|
cb := func(dbIndex int, key string, entity *database.DataEntity) {
|
|
slotIndex := cluster.GetSlot(key)
|
|
slotManager := cluster.slotsManager.getSlot(slotIndex)
|
|
slotManager.mu.Lock()
|
|
defer slotManager.mu.Unlock()
|
|
slotManager.keys.Remove(key)
|
|
if slotManager.state == slotStateExporting {
|
|
// 可能数据迁移后再进行了一次 delete, 所以这也是一个 dirty key
|
|
slotManager.dirtyKeys.Add(key)
|
|
}
|
|
}
|
|
cluster.db.SetKeyDeletedCallback(cb)
|
|
}
|
|
|
|
func (cluster *Cluster) dumpDataThroughConnection(c redis.Connection, keyset *set.Set) {
|
|
keyset.ForEach(func(key string) bool {
|
|
entity, ok := cluster.db.GetEntity(0, key)
|
|
if ok {
|
|
ret := aof.EntityToCmd(key, entity)
|
|
// todo: handle error and close connection
|
|
_, _ = c.Write(ret.ToBytes())
|
|
expire := cluster.db.GetExpiration(0, key)
|
|
if expire != nil {
|
|
ret = aof.MakeExpireCmd(key, *expire)
|
|
_, _ = c.Write(ret.ToBytes())
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
// execExport dump slots data to caller
|
|
// command line: cluster.export taskId
|
|
func execExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
|
if len(cmdLine) != 2 {
|
|
return protocol.MakeArgNumErrReply(exportCommand)
|
|
}
|
|
|
|
var task *raft.MigratingTask
|
|
taskId := string(cmdLine[1])
|
|
for i := 0; i < 50; i++ {
|
|
task = cluster.raftNode.FSM.GetMigratingTask(taskId)
|
|
if task == nil {
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
}
|
|
if task == nil {
|
|
return protocol.MakeErrReply("ERR get migrating task timeout")
|
|
}
|
|
cluster.slotsManager.mu.Lock()
|
|
if cluster.slotsManager.importingTask != nil {
|
|
cluster.slotsManager.mu.Unlock()
|
|
return protocol.MakeErrReply("ERR another migrating task in progress")
|
|
}
|
|
cluster.slotsManager.importingTask = task
|
|
cluster.slotsManager.mu.Unlock()
|
|
|
|
for _, slotId := range task.Slots {
|
|
slotManager := cluster.slotsManager.getSlot(slotId)
|
|
errReply := slotManager.startExporting()
|
|
if errReply != nil {
|
|
return errReply
|
|
}
|
|
cluster.dumpDataThroughConnection(c, slotManager.exportSnapshot)
|
|
logger.Info("finish dump slot ", slotId)
|
|
// send a ok reply to tell requesting node dump finished
|
|
}
|
|
c.Write(protocol.MakeOkReply().ToBytes())
|
|
return &protocol.NoReply{}
|
|
}
|
|
|
|
// execFinishExport
|
|
// command line: migrationDoneCommand taskId
|
|
// firstly dump dirty data from connection `c`, then returns an OK response
|
|
func execFinishExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
|
if len(cmdLine) != 2 {
|
|
return protocol.MakeArgNumErrReply(exportCommand)
|
|
}
|
|
// get MigratingTask from raft
|
|
var task *raft.MigratingTask
|
|
taskId := string(cmdLine[1])
|
|
logger.Info("finishing migration task: " + taskId)
|
|
for i := 0; i < 50; i++ {
|
|
task = cluster.raftNode.FSM.GetMigratingTask(taskId)
|
|
if task == nil {
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
}
|
|
if task == nil {
|
|
return protocol.MakeErrReply("ERR get migrating task timeout")
|
|
}
|
|
logger.Infof("finishing migration task %s, got task info", taskId)
|
|
|
|
// transport dirty keys within lock, lock will be released while migration done
|
|
var lockedSlots []uint32
|
|
defer func() {
|
|
for i := len(lockedSlots) - 1; i >= 0; i-- {
|
|
slotId := lockedSlots[i]
|
|
slotManager := cluster.slotsManager.getSlot(slotId)
|
|
slotManager.mu.Unlock()
|
|
}
|
|
}()
|
|
for _, slotId := range task.Slots {
|
|
slotManager := cluster.slotsManager.getSlot(slotId)
|
|
slotManager.mu.Lock()
|
|
lockedSlots = append(lockedSlots, slotId)
|
|
cluster.dumpDataThroughConnection(c, slotManager.dirtyKeys)
|
|
slotManager.finishExportingWithinLock()
|
|
}
|
|
logger.Infof("finishing migration task %s, dirty keys transported", taskId)
|
|
|
|
// propose migrate finish
|
|
leaderConn, err := cluster.BorrowLeaderClient()
|
|
if err != nil {
|
|
return protocol.MakeErrReply(err.Error())
|
|
}
|
|
defer cluster.connections.ReturnPeerClient(leaderConn)
|
|
reply := leaderConn.Send(utils.ToCmdLine(migrationChangeRouteCommand, taskId))
|
|
switch reply := reply.(type) {
|
|
case *protocol.StatusReply, *protocol.OkReply:
|
|
return protocol.MakeOkReply()
|
|
case *protocol.StandardErrReply:
|
|
logger.Infof("migration done command failed: %v", reply.Error())
|
|
default:
|
|
logger.Infof("finish migration request unknown response %s", string(reply.ToBytes()))
|
|
}
|
|
logger.Infof("finishing migration task %s, route changed", taskId)
|
|
|
|
// clean migrated slots
|
|
go func() {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
logger.Errorf("panic %v", e)
|
|
}
|
|
}()
|
|
for _, index := range task.Slots {
|
|
cluster.dropSlot(index)
|
|
}
|
|
}()
|
|
c.Write(protocol.MakeOkReply().ToBytes())
|
|
return &protocol.NoReply{}
|
|
}
|
|
|
|
// execStartMigration receives startMigrationCommand from leader and start migration job at background
|
|
// command line: startMigrationCommand taskId
|
|
func execStartMigration(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
|
if len(cmdLine) != 2 {
|
|
return protocol.MakeArgNumErrReply(startMigrationCommand)
|
|
}
|
|
taskId := string(cmdLine[1])
|
|
|
|
var task *raft.MigratingTask
|
|
for i := 0; i < 50; i++ {
|
|
task = cluster.raftNode.FSM.GetMigratingTask(taskId)
|
|
if task == nil {
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
}
|
|
if task == nil {
|
|
return protocol.MakeErrReply("ERR get migrating task timeout")
|
|
}
|
|
|
|
cluster.slotsManager.mu.Lock()
|
|
cluster.slotsManager.importingTask = task
|
|
cluster.slotsManager.mu.Unlock()
|
|
logger.Infof("received importing task %s, %d slots to import", task.ID, len(task.Slots))
|
|
go func() {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
logger.Errorf("panic: %v", e)
|
|
}
|
|
}()
|
|
cluster.doImports(task)
|
|
}()
|
|
return protocol.MakeOkReply()
|
|
}
|
|
|
|
func (cluster *Cluster) doImports(task *raft.MigratingTask) error {
|
|
/// STEP1: export
|
|
cmdLine := utils.ToCmdLine(exportCommand, task.ID)
|
|
stream, err := cluster.connections.NewStream(task.SrcNode, cmdLine)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stream.Close()
|
|
|
|
fakeConn := connection.NewFakeConn()
|
|
|
|
// todo: import 状态的 slots 只接受 srcNode 的写入
|
|
recvLoop:
|
|
for proto := range stream.Stream() {
|
|
if proto.Err != nil {
|
|
return fmt.Errorf("export error: %v", err)
|
|
}
|
|
switch reply := proto.Data.(type) {
|
|
case *protocol.MultiBulkReply:
|
|
_ = cluster.db.Exec(fakeConn, reply.Args)
|
|
case *protocol.StatusReply, *protocol.OkReply:
|
|
if protocol.IsOKReply(reply) {
|
|
logger.Info("importing task received OK reply, phase 1 done")
|
|
break recvLoop
|
|
} else {
|
|
msg := fmt.Sprintf("migrate error: %s", string(reply.ToBytes()))
|
|
logger.Errorf(msg)
|
|
return protocol.MakeErrReply(msg)
|
|
}
|
|
case protocol.ErrorReply:
|
|
// todo: return slot to former host node
|
|
msg := fmt.Sprintf("migrate error: %s", reply.Error())
|
|
logger.Errorf(msg)
|
|
return protocol.MakeErrReply(msg)
|
|
}
|
|
}
|
|
|
|
///STEP3: 通知 srcNode 进入结束流程
|
|
stream2, err := cluster.connections.NewStream(task.SrcNode, utils.ToCmdLine(migrationDoneCommand, task.ID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stream2.Close()
|
|
// receive dirty datas
|
|
recvLoop2:
|
|
for proto := range stream2.Stream() {
|
|
if proto.Err != nil {
|
|
return fmt.Errorf("export error: %v", err)
|
|
}
|
|
switch reply := proto.Data.(type) {
|
|
case *protocol.MultiBulkReply:
|
|
_ = cluster.db.Exec(fakeConn, reply.Args)
|
|
case *protocol.StatusReply, *protocol.OkReply:
|
|
if protocol.IsOKReply(reply) {
|
|
logger.Info("importing task received OK reply, phase 2 done")
|
|
break recvLoop2
|
|
} else {
|
|
msg := fmt.Sprintf("migrate error: %s", string(reply.ToBytes()))
|
|
logger.Errorf(msg)
|
|
return protocol.MakeErrReply(msg)
|
|
}
|
|
case protocol.ErrorReply:
|
|
// todo: return slot to former host node
|
|
msg := fmt.Sprintf("migrate error: %s", reply.Error())
|
|
logger.Errorf(msg)
|
|
return protocol.MakeErrReply(msg)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|