renamenx in cluster

This commit is contained in:
finley
2025-03-23 21:51:07 +08:00
parent ef4e09972c
commit afd06103cc
4 changed files with 110 additions and 5 deletions

View File

@@ -9,8 +9,12 @@ import (
func init() { func init() {
core.RegisterCmd("rename_", execRenameInLocal) core.RegisterCmd("rename_", execRenameInLocal)
core.RegisterCmd("renamenx_", execRenameNxInLocal)
core.RegisterCmd("rename", execRename) core.RegisterCmd("rename", execRename)
core.RegisterCmd("renamenx", execRenameNx)
core.RegisterPrepareFunc("RenameFrom", prepareRenameFrom) core.RegisterPrepareFunc("RenameFrom", prepareRenameFrom)
core.RegisterPrepareFunc("RenameNxTo", prepareRenameNxTo)
} }
func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
@@ -18,6 +22,11 @@ func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLin
return cluster.LocalExec(c, cmdLine) return cluster.LocalExec(c, cmdLine)
} }
func execRenameNxInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
cmdLine[0] = []byte("renamenx")
return cluster.LocalExec(c, cmdLine)
}
func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 3 { if len(cmdLine) != 3 {
return protocol.MakeArgNumErrReply("rename") return protocol.MakeArgNumErrReply("rename")
@@ -61,7 +70,7 @@ func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi
// commit // commit
commitCmd := utils.ToCmdLine("commit", txID) commitCmd := utils.ToCmdLine("commit", txID)
for node := range routeMap { for node := range routeMap {
reply := cluster.Relay(node, c,commitCmd ) reply := cluster.Relay(node, c, commitCmd)
if err := protocol.Try2ErrorReply(reply); err != nil { if err := protocol.Try2ErrorReply(reply); err != nil {
requestRollback(cluster, c, txID, routeMap) requestRollback(cluster, c, txID, routeMap)
return protocol.MakeErrReply("commit failed: " + err.Error()) return protocol.MakeErrReply("commit failed: " + err.Error())
@@ -70,6 +79,61 @@ func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi
return protocol.MakeOkReply() return protocol.MakeOkReply()
} }
func execRenameNx(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 3 {
return protocol.MakeArgNumErrReply("rename")
}
src := string(cmdLine[1])
target := string(cmdLine[2])
srcSlot := cluster.GetSlot(src)
srcNode := cluster.PickNode(srcSlot)
targetSlot := cluster.GetSlot(target)
targetNode := cluster.PickNode(targetSlot)
if srcNode == targetNode {
cmdLine[0] = []byte("rename")
return cluster.Relay(srcNode, c, cmdLine)
}
routeMap := RouteMap{
srcNode: {src},
targetNode: {target},
}
txID := utils.RandString(10)
srcPrepareResp := cluster.Relay(srcNode, c, utils.ToCmdLine("Prepare", txID, "RenameFrom", src))
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}})
return srcPrepareResp
}
srcPrepareResult, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareResult.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare rename to
targetPrepareResp := cluster.Relay(targetNode, c, utils.ToCmdLine3("Prepare", []byte(txID),
[]byte("RenameNxTo"), []byte(target), srcPrepareResult.Args[0], srcPrepareResult.Args[1]))
if res, ok := targetPrepareResp.(protocol.ErrorReply); ok {
// rollback src node
requestRollback(cluster, c, txID, routeMap)
if res.Error() == keyExistsErr {
return protocol.MakeIntReply(0)
}
return res
}
// commit
commitCmd := utils.ToCmdLine("commit", txID)
for node := range routeMap {
reply := cluster.Relay(node, c, commitCmd)
if err := protocol.Try2ErrorReply(reply); err != nil {
requestRollback(cluster, c, txID, routeMap)
return protocol.MakeErrReply("commit failed: " + err.Error())
}
}
return protocol.MakeIntReply(1)
}
// prepareRenameFrom is prepare-function for RenameFrom // prepareRenameFrom is prepare-function for RenameFrom
func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 { if len(cmdLine) != 2 {
@@ -86,3 +150,17 @@ func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine Cmd
} }
return cluster.LocalExecWithinLock(conn, utils.ToCmdLine2("DumpKey", key)) return cluster.LocalExecWithinLock(conn, utils.ToCmdLine2("DumpKey", key))
} }
const keyExistsErr = "key exists"
func prepareRenameNxTo(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 4 {
return protocol.MakeArgNumErrReply("RenameNxTo")
}
key := string(cmdLine[1])
exists := cluster.LocalExists([]string{key})
if len(exists) > 0 {
return protocol.MakeErrReply(keyExistsErr)
}
return protocol.MakeOkReply()
}

View File

@@ -26,3 +26,25 @@ func TestRename(t *testing.T) {
res = node1.Exec(c, utils.ToCmdLine("get", "2")) res = node1.Exec(c, utils.ToCmdLine("get", "2"))
asserts.AssertBulkReply(t, res, "1") asserts.AssertBulkReply(t, res, "1")
} }
func TestRenameNx(t *testing.T) {
id1 := "1"
id2 := "2"
nodes := core.MakeTestCluster([]string{id1, id2})
node1 := nodes[id1]
c := connection.NewFakeConn()
core.RegisterDefaultCmd("set")
core.RegisterDefaultCmd("get")
node1.Exec(c, utils.ToCmdLine("set", "1", "1"))
// 1, 2 will be routed to node1 and node2, see MakeTestCluster
res := execRenameNx(node1, c, utils.ToCmdLine("rename", "1", "2"))
asserts.AssertIntReply(t, res, 1)
res = node1.Exec(c, utils.ToCmdLine("get", "2"))
asserts.AssertBulkReply(t, res, "1")
node1.Exec(c, utils.ToCmdLine("set", "3", "3"))
res = execRenameNx(node1, c, utils.ToCmdLine("rename", "2", "3"))
asserts.AssertIntReply(t, res, 0)
}

View File

@@ -74,7 +74,10 @@ func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Re
} else { } else {
result = protocol.MakeOkReply() result = protocol.MakeOkReply()
} }
if protocol.IsErrorReply(result) {
// prepare for rollback
cluster.db.RWUnLocks(0, tx.writeKeys, tx.readKeys)
}
return result return result
} }
@@ -145,6 +148,8 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R
var prepareFuncs = make(map[string]CmdFunc) var prepareFuncs = make(map[string]CmdFunc)
// RegisterCmd add tcc preparing validator // RegisterCmd add tcc preparing validator
// prepareFunc will be executed within lock
// If prepareFunc returns an error reply, the transaction should be rolledback-
func RegisterPrepareFunc(name string, fn CmdFunc) { func RegisterPrepareFunc(name string, fn CmdFunc) {
name = strings.ToLower(name) name = strings.ToLower(name)
prepareFuncs[name] = fn prepareFuncs[name] = fn