mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-08 02:01:07 +08:00
tcc rename nx
This commit is contained in:
@@ -133,12 +133,7 @@ func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||
for _, k := range group {
|
||||
nodeArgs = append(nodeArgs, k, valueMap[k])
|
||||
}
|
||||
var resp redis.Reply
|
||||
if node == cluster.self {
|
||||
resp = execPrepare(cluster, c, makeArgs("Prepare", nodeArgs...))
|
||||
} else {
|
||||
resp = cluster.relay(node, c, makeArgs("Prepare", nodeArgs...))
|
||||
}
|
||||
resp := cluster.relayPrepare(node, c, makeArgs("Prepare", nodeArgs...))
|
||||
if protocol.IsErrorReply(resp) {
|
||||
re := resp.(protocol.ErrorReply)
|
||||
if re.Error() == keyExistsErr {
|
||||
|
@@ -14,7 +14,6 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
}
|
||||
srcKey := string(args[1])
|
||||
destKey := string(args[2])
|
||||
|
||||
srcNode := cluster.peerPicker.PickNode(srcKey)
|
||||
destNode := cluster.peerPicker.PickNode(destKey)
|
||||
if srcNode == destNode { // do fast
|
||||
@@ -28,13 +27,7 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
txID := cluster.idGenerator.NextID()
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
// prepare rename from
|
||||
srcArgs := makeArgs("Prepare", txIDStr, "RenameFrom", srcKey)
|
||||
var srcPrepareResp redis.Reply
|
||||
if srcNode == cluster.self {
|
||||
srcPrepareResp = execPrepare(cluster, c, srcArgs)
|
||||
} else {
|
||||
srcPrepareResp = cluster.relay(srcNode, c, srcArgs)
|
||||
}
|
||||
srcPrepareResp := cluster.relayPrepare(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey))
|
||||
if protocol.IsErrorReply(srcPrepareResp) {
|
||||
// rollback src node
|
||||
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
|
||||
@@ -46,27 +39,21 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
return protocol.MakeErrReply("ERR invalid prepare response")
|
||||
}
|
||||
// prepare rename to
|
||||
destArgs := utils.ToCmdLine3("Prepare", []byte(txIDStr),
|
||||
[]byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1])
|
||||
var destPrepareResp redis.Reply
|
||||
if destNode == cluster.self {
|
||||
destPrepareResp = execPrepare(cluster, c, destArgs)
|
||||
} else {
|
||||
destPrepareResp = cluster.relay(destNode, c, destArgs)
|
||||
}
|
||||
destPrepareResp := cluster.relayPrepare(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
|
||||
[]byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]))
|
||||
if protocol.IsErrorReply(destPrepareResp) {
|
||||
// rollback src node
|
||||
requestRollback(cluster, c, txID, groupMap)
|
||||
return destPrepareResp
|
||||
}
|
||||
_, errReply := requestCommit(cluster, c, txID, groupMap)
|
||||
if errReply != nil {
|
||||
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
|
||||
requestRollback(cluster, c, txID, groupMap)
|
||||
return errReply
|
||||
}
|
||||
return protocol.MakeOkReply()
|
||||
}
|
||||
|
||||
// prepareRenameFrom is prepare-function for RenameFrom, see prepareFuncMap
|
||||
func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||
if len(cmdLine) != 2 {
|
||||
return protocol.MakeArgNumErrReply("RenameFrom")
|
||||
@@ -83,8 +70,25 @@ func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine)
|
||||
return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key))
|
||||
}
|
||||
|
||||
func prepareRenameNxTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||
if len(cmdLine) != 4 {
|
||||
return protocol.MakeArgNumErrReply("RenameNxTo")
|
||||
}
|
||||
key := string(cmdLine[1])
|
||||
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
|
||||
if protocol.IsErrorReply(existResp) {
|
||||
return existResp
|
||||
}
|
||||
existIntResp := existResp.(*protocol.IntReply)
|
||||
if existIntResp.Code == 1 {
|
||||
return protocol.MakeErrReply(keyExistsErr)
|
||||
}
|
||||
return protocol.MakeOkReply()
|
||||
}
|
||||
|
||||
func init() {
|
||||
registerPrepareFunc("RenameFrom", prepareRenameFrom)
|
||||
registerPrepareFunc("RenameNxTo", prepareRenameNxTo)
|
||||
}
|
||||
|
||||
// RenameNx renames a key, only if the new key does not exist.
|
||||
@@ -93,14 +97,45 @@ func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||
if len(args) != 3 {
|
||||
return protocol.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
|
||||
}
|
||||
src := string(args[1])
|
||||
dest := string(args[2])
|
||||
|
||||
srcPeer := cluster.peerPicker.PickNode(src)
|
||||
destPeer := cluster.peerPicker.PickNode(dest)
|
||||
|
||||
if srcPeer != destPeer {
|
||||
return protocol.MakeErrReply("ERR rename must within one slot in cluster mode")
|
||||
srcKey := string(args[1])
|
||||
destKey := string(args[2])
|
||||
srcNode := cluster.peerPicker.PickNode(srcKey)
|
||||
destNode := cluster.peerPicker.PickNode(destKey)
|
||||
if srcNode == destNode {
|
||||
return cluster.relay(srcNode, c, args)
|
||||
}
|
||||
return cluster.relay(srcPeer, c, args)
|
||||
groupMap := map[string][]string{
|
||||
srcNode: {srcKey},
|
||||
destNode: {destKey},
|
||||
}
|
||||
txID := cluster.idGenerator.NextID()
|
||||
txIDStr := strconv.FormatInt(txID, 10)
|
||||
// prepare rename from
|
||||
srcPrepareResp := cluster.relayPrepare(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey))
|
||||
if protocol.IsErrorReply(srcPrepareResp) {
|
||||
// rollback src node
|
||||
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
|
||||
return srcPrepareResp
|
||||
}
|
||||
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
|
||||
if !ok || len(srcPrepareMBR.Args) < 2 {
|
||||
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
|
||||
return protocol.MakeErrReply("ERR invalid prepare response")
|
||||
}
|
||||
// prepare rename to
|
||||
destPrepareResp := cluster.relayPrepare(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
|
||||
[]byte("RenameNxTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]))
|
||||
if protocol.IsErrorReply(destPrepareResp) {
|
||||
// rollback src node
|
||||
requestRollback(cluster, c, txID, groupMap)
|
||||
if re := destPrepareResp.(protocol.ErrorReply); re.Error() == keyExistsErr {
|
||||
return protocol.MakeIntReply(0)
|
||||
}
|
||||
return destPrepareResp
|
||||
}
|
||||
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
|
||||
requestRollback(cluster, c, txID, groupMap)
|
||||
return errReply
|
||||
}
|
||||
return protocol.MakeIntReply(1)
|
||||
}
|
||||
|
@@ -22,8 +22,8 @@ func TestRename(t *testing.T) {
|
||||
t.Error("expect ok")
|
||||
return
|
||||
}
|
||||
result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key))
|
||||
asserts.AssertIntReply(t, result, 0)
|
||||
//result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key))
|
||||
//asserts.AssertIntReply(t, result, 0)
|
||||
result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
|
||||
asserts.AssertIntReply(t, result, 1)
|
||||
// check ttl
|
||||
|
@@ -13,7 +13,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// prepareFuncMap executed after related key locked, and use additional logic to determine whether the transaction can be committed
|
||||
// prepareFunc executed after related key locked, and use additional logic to determine whether the transaction can be committed
|
||||
// For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists
|
||||
var prepareFuncMap = make(map[string]CmdFunc)
|
||||
|
||||
@@ -235,3 +235,11 @@ func requestRollback(cluster *Cluster, c redis.Connection, txID int64, groupMap
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cluster *Cluster) relayPrepare(node string, c redis.Connection, cmdLine CmdLine) redis.Reply {
|
||||
if node == cluster.self {
|
||||
return execPrepare(cluster, c, cmdLine)
|
||||
} else {
|
||||
return cluster.relay(node, c, cmdLine)
|
||||
}
|
||||
}
|
||||
|
@@ -41,7 +41,7 @@ func execDumpKey(db *DB, args [][]byte) redis.Reply {
|
||||
return resp
|
||||
}
|
||||
|
||||
// execRenameFrom exactly the same as execDel, used for cluster.Rename
|
||||
// execRenameFrom is exactly same as execDel, used for cluster.Rename
|
||||
func execRenameFrom(db *DB, args [][]byte) redis.Reply {
|
||||
key := string(args[0])
|
||||
db.Remove(key)
|
||||
@@ -83,9 +83,16 @@ func execRenameTo(db *DB, args [][]byte) redis.Reply {
|
||||
return protocol.MakeOkReply()
|
||||
}
|
||||
|
||||
// execRenameNxTo is exactly same as execRenameTo, used for cluster.RenameNx, not exists check in cluster.prepareRenameNxTo
|
||||
func execRenameNxTo(db *DB, args [][]byte) redis.Reply {
|
||||
return execRenameTo(db, args)
|
||||
}
|
||||
|
||||
func init() {
|
||||
RegisterCommand("DumpKey", execDumpKey, writeAllKeys, undoDel, 2)
|
||||
RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1)
|
||||
RegisterCommand("RenameFrom", execRenameFrom, readFirstKey, nil, 2)
|
||||
RegisterCommand("RenameTo", execRenameTo, writeFirstKey, rollbackFirstKey, 4)
|
||||
RegisterCommand("RenameNxTo", execRenameTo, writeFirstKey, rollbackFirstKey, 4)
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user