diff --git a/cluster/commands/rename.go b/cluster/commands/rename.go index fa2391a..a0a10fa 100644 --- a/cluster/commands/rename.go +++ b/cluster/commands/rename.go @@ -9,8 +9,12 @@ import ( func init() { core.RegisterCmd("rename_", execRenameInLocal) + core.RegisterCmd("renamenx_", execRenameNxInLocal) core.RegisterCmd("rename", execRename) + core.RegisterCmd("renamenx", execRenameNx) core.RegisterPrepareFunc("RenameFrom", prepareRenameFrom) + core.RegisterPrepareFunc("RenameNxTo", prepareRenameNxTo) + } func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { @@ -18,6 +22,11 @@ func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLin return cluster.LocalExec(c, cmdLine) } +func execRenameNxInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + cmdLine[0] = []byte("renamenx") + return cluster.LocalExec(c, cmdLine) +} + func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) != 3 { return protocol.MakeArgNumErrReply("rename") @@ -33,7 +42,7 @@ func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi return cluster.Relay(srcNode, c, cmdLine) } routeMap := RouteMap{ - srcNode: {src}, + srcNode: {src}, targetNode: {target}, } @@ -58,10 +67,10 @@ func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi return targetPrepareResp } - // commit + // commit commitCmd := utils.ToCmdLine("commit", txID) for node := range routeMap { - reply := cluster.Relay(node, c,commitCmd ) + reply := cluster.Relay(node, c, commitCmd) if err := protocol.Try2ErrorReply(reply); err != nil { requestRollback(cluster, c, txID, routeMap) return protocol.MakeErrReply("commit failed: " + err.Error()) @@ -70,6 +79,61 @@ func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi return protocol.MakeOkReply() } +func execRenameNx(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 3 { + return protocol.MakeArgNumErrReply("rename") + } + src := string(cmdLine[1]) + target := string(cmdLine[2]) + srcSlot := cluster.GetSlot(src) + srcNode := cluster.PickNode(srcSlot) + targetSlot := cluster.GetSlot(target) + targetNode := cluster.PickNode(targetSlot) + if srcNode == targetNode { + cmdLine[0] = []byte("rename") + return cluster.Relay(srcNode, c, cmdLine) + } + routeMap := RouteMap{ + srcNode: {src}, + targetNode: {target}, + } + + txID := utils.RandString(10) + srcPrepareResp := cluster.Relay(srcNode, c, utils.ToCmdLine("Prepare", txID, "RenameFrom", src)) + if protocol.IsErrorReply(srcPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}}) + return srcPrepareResp + } + srcPrepareResult, ok := srcPrepareResp.(*protocol.MultiBulkReply) + if !ok || len(srcPrepareResult.Args) < 2 { + requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}}) + return protocol.MakeErrReply("ERR invalid prepare response") + } + // prepare rename to + targetPrepareResp := cluster.Relay(targetNode, c, utils.ToCmdLine3("Prepare", []byte(txID), + []byte("RenameNxTo"), []byte(target), srcPrepareResult.Args[0], srcPrepareResult.Args[1])) + if res, ok := targetPrepareResp.(protocol.ErrorReply); ok { + // rollback src node + requestRollback(cluster, c, txID, routeMap) + if res.Error() == keyExistsErr { + return protocol.MakeIntReply(0) + } + return res + } + + // commit + commitCmd := utils.ToCmdLine("commit", txID) + for node := range routeMap { + reply := cluster.Relay(node, c, commitCmd) + if err := protocol.Try2ErrorReply(reply); err != nil { + requestRollback(cluster, c, txID, routeMap) + return protocol.MakeErrReply("commit failed: " + err.Error()) + } + } + return protocol.MakeIntReply(1) +} + // prepareRenameFrom is prepare-function for RenameFrom func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) != 2 { @@ -85,4 +149,18 @@ func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine Cmd return protocol.MakeErrReply("ERR no such key") } return cluster.LocalExecWithinLock(conn, utils.ToCmdLine2("DumpKey", key)) -} \ No newline at end of file +} + +const keyExistsErr = "key exists" + +func prepareRenameNxTo(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 4 { + return protocol.MakeArgNumErrReply("RenameNxTo") + } + key := string(cmdLine[1]) + exists := cluster.LocalExists([]string{key}) + if len(exists) > 0 { + return protocol.MakeErrReply(keyExistsErr) + } + return protocol.MakeOkReply() +} diff --git a/cluster/commands/rename_test.go b/cluster/commands/rename_test.go index 957cc30..2882aa9 100644 --- a/cluster/commands/rename_test.go +++ b/cluster/commands/rename_test.go @@ -26,3 +26,25 @@ func TestRename(t *testing.T) { res = node1.Exec(c, utils.ToCmdLine("get", "2")) asserts.AssertBulkReply(t, res, "1") } + +func TestRenameNx(t *testing.T) { + id1 := "1" + id2 := "2" + nodes := core.MakeTestCluster([]string{id1, id2}) + node1 := nodes[id1] + c := connection.NewFakeConn() + core.RegisterDefaultCmd("set") + core.RegisterDefaultCmd("get") + node1.Exec(c, utils.ToCmdLine("set", "1", "1")) + + // 1, 2 will be routed to node1 and node2, see MakeTestCluster + res := execRenameNx(node1, c, utils.ToCmdLine("rename", "1", "2")) + asserts.AssertIntReply(t, res, 1) + + res = node1.Exec(c, utils.ToCmdLine("get", "2")) + asserts.AssertBulkReply(t, res, "1") + + node1.Exec(c, utils.ToCmdLine("set", "3", "3")) + res = execRenameNx(node1, c, utils.ToCmdLine("rename", "2", "3")) + asserts.AssertIntReply(t, res, 0) +} \ No newline at end of file diff --git a/cluster/commands/tcccore.go b/cluster/commands/tcc_utils.go similarity index 100% rename from cluster/commands/tcccore.go rename to cluster/commands/tcc_utils.go diff --git a/cluster/core/tcc.go b/cluster/core/tcc.go index 6ff1364..2eab52b 100644 --- a/cluster/core/tcc.go +++ b/cluster/core/tcc.go @@ -74,7 +74,10 @@ func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Re } else { result = protocol.MakeOkReply() } - + if protocol.IsErrorReply(result) { + // prepare for rollback + cluster.db.RWUnLocks(0, tx.writeKeys, tx.readKeys) + } return result } @@ -145,6 +148,8 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R var prepareFuncs = make(map[string]CmdFunc) // RegisterCmd add tcc preparing validator +// prepareFunc will be executed within lock +// If prepareFunc returns an error reply, the transaction should be rolledback- func RegisterPrepareFunc(name string, fn CmdFunc) { name = strings.ToLower(name) prepareFuncs[name] = fn