diff --git a/cluster/commands/mset.go b/cluster/commands/mset.go index 49a7bf4..d3b228b 100644 --- a/cluster/commands/mset.go +++ b/cluster/commands/mset.go @@ -123,7 +123,7 @@ func execMGet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis. for node, ret := range nodeResults { nodeCmdLine := cmdLineMap[node] result := ret.(*protocol.MultiBulkReply) - if len(result.Args) != len(nodeCmdLine) - 1 { + if len(result.Args) != len(nodeCmdLine)-1 { return protocol.MakeErrReply("wrong response from node " + node) } for i := 1; i < len(nodeCmdLine); i++ { @@ -143,12 +143,12 @@ func execMGet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis. const someKeysExistsErr = "Some Keys Exists" func init() { - core.RegisterPreCheck("msetnx", msetNxPrecheck) + core.RegisterPrepareFunc("msetnx", msetNxPrecheck) } func msetNxPrecheck(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { var keys []string - for i := 1; i < len(cmdLine); i+=2 { + for i := 1; i < len(cmdLine); i += 2 { keys = append(keys, string(cmdLine[i])) } exists := cluster.LocalExists(keys) @@ -202,4 +202,4 @@ func execMSetNx(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redi return err } return protocol.MakeIntReply(1) -} \ No newline at end of file +} diff --git a/cluster/commands/rename.go b/cluster/commands/rename.go new file mode 100644 index 0000000..fa2391a --- /dev/null +++ b/cluster/commands/rename.go @@ -0,0 +1,88 @@ +package commands + +import ( + "github.com/hdt3213/godis/cluster/core" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/protocol" +) + +func init() { + core.RegisterCmd("rename_", execRenameInLocal) + core.RegisterCmd("rename", execRename) + core.RegisterPrepareFunc("RenameFrom", prepareRenameFrom) +} + +func execRenameInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + cmdLine[0] = []byte("rename") + return cluster.LocalExec(c, cmdLine) +} + +func execRename(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 3 { + return protocol.MakeArgNumErrReply("rename") + } + src := string(cmdLine[1]) + target := string(cmdLine[2]) + srcSlot := cluster.GetSlot(src) + srcNode := cluster.PickNode(srcSlot) + targetSlot := cluster.GetSlot(target) + targetNode := cluster.PickNode(targetSlot) + if srcNode == targetNode { + cmdLine[0] = []byte("rename") + return cluster.Relay(srcNode, c, cmdLine) + } + routeMap := RouteMap{ + srcNode: {src}, + targetNode: {target}, + } + + txID := utils.RandString(10) + srcPrepareResp := cluster.Relay(srcNode, c, utils.ToCmdLine("Prepare", txID, "RenameFrom", src)) + if protocol.IsErrorReply(srcPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}}) + return srcPrepareResp + } + srcPrepareResult, ok := srcPrepareResp.(*protocol.MultiBulkReply) + if !ok || len(srcPrepareResult.Args) < 2 { + requestRollback(cluster, c, txID, map[string][]string{srcNode: {src}}) + return protocol.MakeErrReply("ERR invalid prepare response") + } + // prepare rename to + targetPrepareResp := cluster.Relay(targetNode, c, utils.ToCmdLine3("Prepare", []byte(txID), + []byte("RenameTo"), []byte(target), srcPrepareResult.Args[0], srcPrepareResult.Args[1])) + if protocol.IsErrorReply(targetPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, routeMap) + return targetPrepareResp + } + + // commit + commitCmd := utils.ToCmdLine("commit", txID) + for node := range routeMap { + reply := cluster.Relay(node, c,commitCmd ) + if err := protocol.Try2ErrorReply(reply); err != nil { + requestRollback(cluster, c, txID, routeMap) + return protocol.MakeErrReply("commit failed: " + err.Error()) + } + } + return protocol.MakeOkReply() +} + +// prepareRenameFrom is prepare-function for RenameFrom +func prepareRenameFrom(cluster *core.Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 2 { + return protocol.MakeArgNumErrReply("RenameFrom") + } + key := string(cmdLine[1]) + existResp := cluster.LocalExec(conn, utils.ToCmdLine("Exists", key)) + if protocol.IsErrorReply(existResp) { + return existResp + } + existIntResp := existResp.(*protocol.IntReply) + if existIntResp.Code == 0 { + return protocol.MakeErrReply("ERR no such key") + } + return cluster.LocalExecWithinLock(conn, utils.ToCmdLine2("DumpKey", key)) +} \ No newline at end of file diff --git a/cluster/commands/rename_test.go b/cluster/commands/rename_test.go new file mode 100644 index 0000000..957cc30 --- /dev/null +++ b/cluster/commands/rename_test.go @@ -0,0 +1,28 @@ +package commands + +import ( + "testing" + + "github.com/hdt3213/godis/cluster/core" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/protocol/asserts" +) + +func TestRename(t *testing.T) { + id1 := "1" + id2 := "2" + nodes := core.MakeTestCluster([]string{id1, id2}) + node1 := nodes[id1] + c := connection.NewFakeConn() + core.RegisterDefaultCmd("set") + core.RegisterDefaultCmd("get") + node1.Exec(c, utils.ToCmdLine("set", "1", "1")) + + // 1, 2 will be routed to node1 and node2, see MakeTestCluster + res := execRename(node1, c, utils.ToCmdLine("rename", "1", "2")) + asserts.AssertStatusReply(t, res, "OK") + + res = node1.Exec(c, utils.ToCmdLine("get", "2")) + asserts.AssertBulkReply(t, res, "1") +} diff --git a/cluster/core/tcc.go b/cluster/core/tcc.go index e6f2cda..6ff1364 100644 --- a/cluster/core/tcc.go +++ b/cluster/core/tcc.go @@ -60,22 +60,22 @@ func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Re cluster.transactions.txs[txId] = tx cluster.transactions.mu.Unlock() - // pre-execute check - validator := preChecks[string(realCmdLine[0])] - if validator != nil { - validateResult := validator(cluster, c, realCmdLine) - if protocol.IsErrorReply(validateResult) { - return validateResult - } - } - // prepare lock and undo locks tx.writeKeys, tx.readKeys = database.GetRelatedKeys(realCmdLine) cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys) tx.undoLogs = cluster.db.GetUndoLogs(0, realCmdLine) tx.realCmdLine = realCmdLine - return protocol.MakeOkReply() + // execute prepare func + prepareFunc := prepareFuncs[strings.ToLower(string(realCmdLine[0]))] + var result redis.Reply + if prepareFunc != nil { + result = prepareFunc(cluster, c, realCmdLine) + } else { + result = protocol.MakeOkReply() + } + + return result } func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { @@ -128,7 +128,7 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R // rollback cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys) - for i := len(tx.undoLogs) - 1; i >= 0; i-- { + for i := len(tx.undoLogs) - 1; i >= 0; i-- { cmdline := tx.undoLogs[i] cluster.db.ExecWithLock(c, cmdline) } @@ -142,13 +142,10 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R return protocol.MakeOkReply() } -// PreCheckFunc do validation during tcc preparing period -type PreCheckFunc func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply +var prepareFuncs = make(map[string]CmdFunc) -var preChecks = make(map[string]PreCheckFunc) - -// RegisterCmd add tcc preparing validator -func RegisterPreCheck(name string, fn PreCheckFunc) { +// RegisterCmd add tcc preparing validator +func RegisterPrepareFunc(name string, fn CmdFunc) { name = strings.ToLower(name) - preChecks[name] = fn -} \ No newline at end of file + prepareFuncs[name] = fn +} diff --git a/cluster/core/utils.go b/cluster/core/utils.go index f9ce454..9b9816e 100644 --- a/cluster/core/utils.go +++ b/cluster/core/utils.go @@ -39,6 +39,11 @@ func (cluster *Cluster) LocalExec(c redis.Connection, cmdLine [][]byte) redis.Re return cluster.db.Exec(c, cmdLine) } +// LocalExec executes command at local node +func (cluster *Cluster) LocalExecWithinLock(c redis.Connection, cmdLine [][]byte) redis.Reply { + return cluster.db.ExecWithLock(c, cmdLine) +} + // GetPartitionKey extract hashtag func GetPartitionKey(key string) string { beg := strings.Index(key, "{")