diff --git a/cluster/mset.go b/cluster/mset.go index 2a216ef..62d987b 100644 --- a/cluster/mset.go +++ b/cluster/mset.go @@ -133,12 +133,7 @@ func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { for _, k := range group { nodeArgs = append(nodeArgs, k, valueMap[k]) } - var resp redis.Reply - if node == cluster.self { - resp = execPrepare(cluster, c, makeArgs("Prepare", nodeArgs...)) - } else { - resp = cluster.relay(node, c, makeArgs("Prepare", nodeArgs...)) - } + resp := cluster.relayPrepare(node, c, makeArgs("Prepare", nodeArgs...)) if protocol.IsErrorReply(resp) { re := resp.(protocol.ErrorReply) if re.Error() == keyExistsErr { diff --git a/cluster/rename.go b/cluster/rename.go index c168445..d0ba48a 100644 --- a/cluster/rename.go +++ b/cluster/rename.go @@ -14,7 +14,6 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } srcKey := string(args[1]) destKey := string(args[2]) - srcNode := cluster.peerPicker.PickNode(srcKey) destNode := cluster.peerPicker.PickNode(destKey) if srcNode == destNode { // do fast @@ -28,13 +27,7 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { txID := cluster.idGenerator.NextID() txIDStr := strconv.FormatInt(txID, 10) // prepare rename from - srcArgs := makeArgs("Prepare", txIDStr, "RenameFrom", srcKey) - var srcPrepareResp redis.Reply - if srcNode == cluster.self { - srcPrepareResp = execPrepare(cluster, c, srcArgs) - } else { - srcPrepareResp = cluster.relay(srcNode, c, srcArgs) - } + srcPrepareResp := cluster.relayPrepare(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey)) if protocol.IsErrorReply(srcPrepareResp) { // rollback src node requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) @@ -46,27 +39,21 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { return protocol.MakeErrReply("ERR invalid prepare response") } // prepare rename to - destArgs := utils.ToCmdLine3("Prepare", []byte(txIDStr), - []byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]) - var destPrepareResp redis.Reply - if destNode == cluster.self { - destPrepareResp = execPrepare(cluster, c, destArgs) - } else { - destPrepareResp = cluster.relay(destNode, c, destArgs) - } + destPrepareResp := cluster.relayPrepare(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr), + []byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1])) if protocol.IsErrorReply(destPrepareResp) { // rollback src node requestRollback(cluster, c, txID, groupMap) return destPrepareResp } - _, errReply := requestCommit(cluster, c, txID, groupMap) - if errReply != nil { + if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil { requestRollback(cluster, c, txID, groupMap) return errReply } return protocol.MakeOkReply() } +// prepareRenameFrom is prepare-function for RenameFrom, see prepareFuncMap func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) != 2 { return protocol.MakeArgNumErrReply("RenameFrom") @@ -83,8 +70,25 @@ func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key)) } +func prepareRenameNxTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 4 { + return protocol.MakeArgNumErrReply("RenameNxTo") + } + key := string(cmdLine[1]) + existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key)) + if protocol.IsErrorReply(existResp) { + return existResp + } + existIntResp := existResp.(*protocol.IntReply) + if existIntResp.Code == 1 { + return protocol.MakeErrReply(keyExistsErr) + } + return protocol.MakeOkReply() +} + func init() { registerPrepareFunc("RenameFrom", prepareRenameFrom) + registerPrepareFunc("RenameNxTo", prepareRenameNxTo) } // RenameNx renames a key, only if the new key does not exist. @@ -93,14 +97,45 @@ func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 3 { return protocol.MakeErrReply("ERR wrong number of arguments for 'renamenx' command") } - src := string(args[1]) - dest := string(args[2]) - - srcPeer := cluster.peerPicker.PickNode(src) - destPeer := cluster.peerPicker.PickNode(dest) - - if srcPeer != destPeer { - return protocol.MakeErrReply("ERR rename must within one slot in cluster mode") + srcKey := string(args[1]) + destKey := string(args[2]) + srcNode := cluster.peerPicker.PickNode(srcKey) + destNode := cluster.peerPicker.PickNode(destKey) + if srcNode == destNode { + return cluster.relay(srcNode, c, args) } - return cluster.relay(srcPeer, c, args) + groupMap := map[string][]string{ + srcNode: {srcKey}, + destNode: {destKey}, + } + txID := cluster.idGenerator.NextID() + txIDStr := strconv.FormatInt(txID, 10) + // prepare rename from + srcPrepareResp := cluster.relayPrepare(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey)) + if protocol.IsErrorReply(srcPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) + return srcPrepareResp + } + srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply) + if !ok || len(srcPrepareMBR.Args) < 2 { + requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) + return protocol.MakeErrReply("ERR invalid prepare response") + } + // prepare rename to + destPrepareResp := cluster.relayPrepare(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr), + []byte("RenameNxTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1])) + if protocol.IsErrorReply(destPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, groupMap) + if re := destPrepareResp.(protocol.ErrorReply); re.Error() == keyExistsErr { + return protocol.MakeIntReply(0) + } + return destPrepareResp + } + if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil { + requestRollback(cluster, c, txID, groupMap) + return errReply + } + return protocol.MakeIntReply(1) } diff --git a/cluster/rename_test.go b/cluster/rename_test.go index d679686..fa0d94a 100644 --- a/cluster/rename_test.go +++ b/cluster/rename_test.go @@ -22,8 +22,8 @@ func TestRename(t *testing.T) { t.Error("expect ok") return } - result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key)) - asserts.AssertIntReply(t, result, 0) + //result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key)) + //asserts.AssertIntReply(t, result, 0) result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) asserts.AssertIntReply(t, result, 1) // check ttl diff --git a/cluster/tcc.go b/cluster/tcc.go index d0d6215..33baa9b 100644 --- a/cluster/tcc.go +++ b/cluster/tcc.go @@ -13,7 +13,7 @@ import ( "time" ) -// prepareFuncMap executed after related key locked, and use additional logic to determine whether the transaction can be committed +// prepareFunc executed after related key locked, and use additional logic to determine whether the transaction can be committed // For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists var prepareFuncMap = make(map[string]CmdFunc) @@ -235,3 +235,11 @@ func requestRollback(cluster *Cluster, c redis.Connection, txID int64, groupMap } } } + +func (cluster *Cluster) relayPrepare(node string, c redis.Connection, cmdLine CmdLine) redis.Reply { + if node == cluster.self { + return execPrepare(cluster, c, cmdLine) + } else { + return cluster.relay(node, c, cmdLine) + } +} diff --git a/database/cluster_helper.go b/database/cluster_helper.go index b9cf451..d5204d9 100644 --- a/database/cluster_helper.go +++ b/database/cluster_helper.go @@ -41,7 +41,7 @@ func execDumpKey(db *DB, args [][]byte) redis.Reply { return resp } -// execRenameFrom exactly the same as execDel, used for cluster.Rename +// execRenameFrom is exactly same as execDel, used for cluster.Rename func execRenameFrom(db *DB, args [][]byte) redis.Reply { key := string(args[0]) db.Remove(key) @@ -83,9 +83,16 @@ func execRenameTo(db *DB, args [][]byte) redis.Reply { return protocol.MakeOkReply() } +// execRenameNxTo is exactly same as execRenameTo, used for cluster.RenameNx, not exists check in cluster.prepareRenameNxTo +func execRenameNxTo(db *DB, args [][]byte) redis.Reply { + return execRenameTo(db, args) +} + func init() { RegisterCommand("DumpKey", execDumpKey, writeAllKeys, undoDel, 2) RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1) RegisterCommand("RenameFrom", execRenameFrom, readFirstKey, nil, 2) RegisterCommand("RenameTo", execRenameTo, writeFirstKey, rollbackFirstKey, 4) + RegisterCommand("RenameNxTo", execRenameTo, writeFirstKey, rollbackFirstKey, 4) + }