mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-06 17:26:52 +08:00
support rename in cluster
This commit is contained in:
@@ -123,7 +123,7 @@ func execMGet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.
|
|||||||
for node, ret := range nodeResults {
|
for node, ret := range nodeResults {
|
||||||
nodeCmdLine := cmdLineMap[node]
|
nodeCmdLine := cmdLineMap[node]
|
||||||
result := ret.(*protocol.MultiBulkReply)
|
result := ret.(*protocol.MultiBulkReply)
|
||||||
if len(result.Args) != len(nodeCmdLine) - 1 {
|
if len(result.Args) != len(nodeCmdLine)-1 {
|
||||||
return protocol.MakeErrReply("wrong response from node " + node)
|
return protocol.MakeErrReply("wrong response from node " + node)
|
||||||
}
|
}
|
||||||
for i := 1; i < len(nodeCmdLine); i++ {
|
for i := 1; i < len(nodeCmdLine); i++ {
|
||||||
@@ -143,12 +143,12 @@ func execMGet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.
|
|||||||
const someKeysExistsErr = "Some Keys Exists"
|
const someKeysExistsErr = "Some Keys Exists"
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
core.RegisterPreCheck("msetnx", msetNxPrecheck)
|
core.RegisterPrepareFunc("msetnx", msetNxPrecheck)
|
||||||
}
|
}
|
||||||
|
|
||||||
func msetNxPrecheck(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
func msetNxPrecheck(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||||
var keys []string
|
var keys []string
|
||||||
for i := 1; i < len(cmdLine); i+=2 {
|
for i := 1; i < len(cmdLine); i += 2 {
|
||||||
keys = append(keys, string(cmdLine[i]))
|
keys = append(keys, string(cmdLine[i]))
|
||||||
}
|
}
|
||||||
exists := cluster.LocalExists(keys)
|
exists := cluster.LocalExists(keys)
|
||||||
@@ -202,4 +202,4 @@ func execMSetNx(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return protocol.MakeIntReply(1)
|
return protocol.MakeIntReply(1)
|
||||||
}
|
}
|
||||||
|
88
cluster/commands/rename.go
Normal file
88
cluster/commands/rename.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package commands
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/cluster/core"
|
||||||
|
"github.com/hdt3213/godis/interface/redis"
|
||||||
|
"github.com/hdt3213/godis/lib/utils"
|
||||||
|
"github.com/hdt3213/godis/redis/protocol"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
core.RegisterCmd("rename_", execRenameInLocal)
|
||||||
|
core.RegisterCmd("rename", execRename)
|
||||||
|
core.RegisterPrepareFunc("RenameFrom", prepareRenameFrom)
|
||||||
|
}
|
||||||
|
|
||||||
|
func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||||
|
cmdLine[0] = []byte("rename")
|
||||||
|
return cluster.LocalExec(c, cmdLine)
|
||||||
|
}
|
||||||
|
|
||||||
|
func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||||
|
if len(cmdLine) != 3 {
|
||||||
|
return protocol.MakeArgNumErrReply("rename")
|
||||||
|
}
|
||||||
|
src := string(cmdLine[1])
|
||||||
|
target := string(cmdLine[2])
|
||||||
|
srcSlot := cluster.GetSlot(src)
|
||||||
|
srcNode := cluster.PickNode(srcSlot)
|
||||||
|
targetSlot := cluster.GetSlot(target)
|
||||||
|
targetNode := cluster.PickNode(targetSlot)
|
||||||
|
if srcNode == targetNode {
|
||||||
|
cmdLine[0] = []byte("rename")
|
||||||
|
return cluster.Relay(srcNode, c, cmdLine)
|
||||||
|
}
|
||||||
|
routeMap := RouteMap{
|
||||||
|
srcNode: {src},
|
||||||
|
targetNode: {target},
|
||||||
|
}
|
||||||
|
|
||||||
|
txID := utils.RandString(10)
|
||||||
|
srcPrepareResp := cluster.Relay(srcNode, c, utils.ToCmdLine("Prepare", txID, "RenameFrom", src))
|
||||||
|
if protocol.IsErrorReply(srcPrepareResp) {
|
||||||
|
// rollback src node
|
||||||
|
requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}})
|
||||||
|
return srcPrepareResp
|
||||||
|
}
|
||||||
|
srcPrepareResult, ok := srcPrepareResp.(*protocol.MultiBulkReply)
|
||||||
|
if !ok || len(srcPrepareResult.Args) < 2 {
|
||||||
|
requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}})
|
||||||
|
return protocol.MakeErrReply("ERR invalid prepare response")
|
||||||
|
}
|
||||||
|
// prepare rename to
|
||||||
|
targetPrepareResp := cluster.Relay(targetNode, c, utils.ToCmdLine3("Prepare", []byte(txID),
|
||||||
|
[]byte("RenameTo"), []byte(target), srcPrepareResult.Args[0], srcPrepareResult.Args[1]))
|
||||||
|
if protocol.IsErrorReply(targetPrepareResp) {
|
||||||
|
// rollback src node
|
||||||
|
requestRollback(cluster, c, txID, routeMap)
|
||||||
|
return targetPrepareResp
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit
|
||||||
|
commitCmd := utils.ToCmdLine("commit", txID)
|
||||||
|
for node := range routeMap {
|
||||||
|
reply := cluster.Relay(node, c,commitCmd )
|
||||||
|
if err := protocol.Try2ErrorReply(reply); err != nil {
|
||||||
|
requestRollback(cluster, c, txID, routeMap)
|
||||||
|
return protocol.MakeErrReply("commit failed: " + err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return protocol.MakeOkReply()
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepareRenameFrom is prepare-function for RenameFrom
|
||||||
|
func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||||
|
if len(cmdLine) != 2 {
|
||||||
|
return protocol.MakeArgNumErrReply("RenameFrom")
|
||||||
|
}
|
||||||
|
key := string(cmdLine[1])
|
||||||
|
existResp := cluster.LocalExec(conn, utils.ToCmdLine("Exists", key))
|
||||||
|
if protocol.IsErrorReply(existResp) {
|
||||||
|
return existResp
|
||||||
|
}
|
||||||
|
existIntResp := existResp.(*protocol.IntReply)
|
||||||
|
if existIntResp.Code == 0 {
|
||||||
|
return protocol.MakeErrReply("ERR no such key")
|
||||||
|
}
|
||||||
|
return cluster.LocalExecWithinLock(conn, utils.ToCmdLine2("DumpKey", key))
|
||||||
|
}
|
28
cluster/commands/rename_test.go
Normal file
28
cluster/commands/rename_test.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
package commands
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hdt3213/godis/cluster/core"
|
||||||
|
"github.com/hdt3213/godis/lib/utils"
|
||||||
|
"github.com/hdt3213/godis/redis/connection"
|
||||||
|
"github.com/hdt3213/godis/redis/protocol/asserts"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRename(t *testing.T) {
|
||||||
|
id1 := "1"
|
||||||
|
id2 := "2"
|
||||||
|
nodes := core.MakeTestCluster([]string{id1, id2})
|
||||||
|
node1 := nodes[id1]
|
||||||
|
c := connection.NewFakeConn()
|
||||||
|
core.RegisterDefaultCmd("set")
|
||||||
|
core.RegisterDefaultCmd("get")
|
||||||
|
node1.Exec(c, utils.ToCmdLine("set", "1", "1"))
|
||||||
|
|
||||||
|
// 1, 2 will be routed to node1 and node2, see MakeTestCluster
|
||||||
|
res := execRename(node1, c, utils.ToCmdLine("rename", "1", "2"))
|
||||||
|
asserts.AssertStatusReply(t, res, "OK")
|
||||||
|
|
||||||
|
res = node1.Exec(c, utils.ToCmdLine("get", "2"))
|
||||||
|
asserts.AssertBulkReply(t, res, "1")
|
||||||
|
}
|
@@ -60,22 +60,22 @@ func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Re
|
|||||||
cluster.transactions.txs[txId] = tx
|
cluster.transactions.txs[txId] = tx
|
||||||
cluster.transactions.mu.Unlock()
|
cluster.transactions.mu.Unlock()
|
||||||
|
|
||||||
// pre-execute check
|
|
||||||
validator := preChecks[string(realCmdLine[0])]
|
|
||||||
if validator != nil {
|
|
||||||
validateResult := validator(cluster, c, realCmdLine)
|
|
||||||
if protocol.IsErrorReply(validateResult) {
|
|
||||||
return validateResult
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// prepare lock and undo locks
|
// prepare lock and undo locks
|
||||||
tx.writeKeys, tx.readKeys = database.GetRelatedKeys(realCmdLine)
|
tx.writeKeys, tx.readKeys = database.GetRelatedKeys(realCmdLine)
|
||||||
cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys)
|
cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys)
|
||||||
tx.undoLogs = cluster.db.GetUndoLogs(0, realCmdLine)
|
tx.undoLogs = cluster.db.GetUndoLogs(0, realCmdLine)
|
||||||
tx.realCmdLine = realCmdLine
|
tx.realCmdLine = realCmdLine
|
||||||
|
|
||||||
return protocol.MakeOkReply()
|
// execute prepare func
|
||||||
|
prepareFunc := prepareFuncs[strings.ToLower(string(realCmdLine[0]))]
|
||||||
|
var result redis.Reply
|
||||||
|
if prepareFunc != nil {
|
||||||
|
result = prepareFunc(cluster, c, realCmdLine)
|
||||||
|
} else {
|
||||||
|
result = protocol.MakeOkReply()
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||||
@@ -128,7 +128,7 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R
|
|||||||
|
|
||||||
// rollback
|
// rollback
|
||||||
cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys)
|
cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys)
|
||||||
for i := len(tx.undoLogs) - 1; i >= 0; i-- {
|
for i := len(tx.undoLogs) - 1; i >= 0; i-- {
|
||||||
cmdline := tx.undoLogs[i]
|
cmdline := tx.undoLogs[i]
|
||||||
cluster.db.ExecWithLock(c, cmdline)
|
cluster.db.ExecWithLock(c, cmdline)
|
||||||
}
|
}
|
||||||
@@ -142,13 +142,10 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R
|
|||||||
return protocol.MakeOkReply()
|
return protocol.MakeOkReply()
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreCheckFunc do validation during tcc preparing period
|
var prepareFuncs = make(map[string]CmdFunc)
|
||||||
type PreCheckFunc func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply
|
|
||||||
|
|
||||||
var preChecks = make(map[string]PreCheckFunc)
|
// RegisterCmd add tcc preparing validator
|
||||||
|
func RegisterPrepareFunc(name string, fn CmdFunc) {
|
||||||
// RegisterCmd add tcc preparing validator
|
|
||||||
func RegisterPreCheck(name string, fn PreCheckFunc) {
|
|
||||||
name = strings.ToLower(name)
|
name = strings.ToLower(name)
|
||||||
preChecks[name] = fn
|
prepareFuncs[name] = fn
|
||||||
}
|
}
|
||||||
|
@@ -39,6 +39,11 @@ func (cluster *Cluster) LocalExec(c redis.Connection, cmdLine [][]byte) redis.Re
|
|||||||
return cluster.db.Exec(c, cmdLine)
|
return cluster.db.Exec(c, cmdLine)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LocalExec executes command at local node
|
||||||
|
func (cluster *Cluster) LocalExecWithinLock(c redis.Connection, cmdLine [][]byte) redis.Reply {
|
||||||
|
return cluster.db.ExecWithLock(c, cmdLine)
|
||||||
|
}
|
||||||
|
|
||||||
// GetPartitionKey extract hashtag
|
// GetPartitionKey extract hashtag
|
||||||
func GetPartitionKey(key string) string {
|
func GetPartitionKey(key string) string {
|
||||||
beg := strings.Index(key, "{")
|
beg := strings.Index(key, "{")
|
||||||
|
Reference in New Issue
Block a user