tcc rename

This commit is contained in:
hdt3213
2022-04-10 16:29:49 +08:00
parent 37ef7d8a34
commit 374606f6f1
10 changed files with 247 additions and 58 deletions

View File

@@ -160,7 +160,7 @@ func makeArgs(cmd string, args ...string) [][]byte {
return result return result
} }
// return peer -> writeKeys // return node -> writeKeys
func (cluster *Cluster) groupBy(keys []string) map[string][]string { func (cluster *Cluster) groupBy(keys []string) map[string][]string {
result := make(map[string][]string) result := make(map[string][]string)
for _, key := range keys { for _, key := range keys {

View File

@@ -128,16 +128,16 @@ func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
txID := cluster.idGenerator.NextID() txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10) txIDStr := strconv.FormatInt(txID, 10)
rollback := false rollback := false
for peer, group := range groupMap { for node, group := range groupMap {
peerArgs := []string{txIDStr, "MSETNX"} nodeArgs := []string{txIDStr, "MSETNX"}
for _, k := range group { for _, k := range group {
peerArgs = append(peerArgs, k, valueMap[k]) nodeArgs = append(nodeArgs, k, valueMap[k])
} }
var resp redis.Reply var resp redis.Reply
if peer == cluster.self { if node == cluster.self {
resp = execPrepare(cluster, c, makeArgs("Prepare", peerArgs...)) resp = execPrepare(cluster, c, makeArgs("Prepare", nodeArgs...))
} else { } else {
resp = cluster.relay(peer, c, makeArgs("Prepare", peerArgs...)) resp = cluster.relay(node, c, makeArgs("Prepare", nodeArgs...))
} }
if protocol.IsErrorReply(resp) { if protocol.IsErrorReply(resp) {
re := resp.(protocol.ErrorReply) re := resp.(protocol.ErrorReply)
@@ -187,5 +187,5 @@ func prepareMSetNx(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) red
} }
func init() { func init() {
prepareFuncMap["msetnx"] = prepareMSetNx registerPrepareFunc("MSetNx", prepareMSetNx)
} }

View File

@@ -2,7 +2,9 @@ package cluster
import ( import (
"github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol"
"strconv"
) )
// Rename renames a key, the origin and the destination must within the same node // Rename renames a key, the origin and the destination must within the same node
@@ -10,16 +12,79 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 3 { if len(args) != 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'rename' command") return protocol.MakeErrReply("ERR wrong number of arguments for 'rename' command")
} }
src := string(args[1]) srcKey := string(args[1])
dest := string(args[2]) destKey := string(args[2])
srcPeer := cluster.peerPicker.PickNode(src) srcNode := cluster.peerPicker.PickNode(srcKey)
destPeer := cluster.peerPicker.PickNode(dest) destNode := cluster.peerPicker.PickNode(destKey)
if srcNode == destNode { // do fast
if srcPeer != destPeer { return cluster.relay(srcNode, c, args)
return protocol.MakeErrReply("ERR rename must within one slot in cluster mode")
} }
return cluster.relay(srcPeer, c, args)
groupMap := map[string][]string{
srcNode: {srcKey},
destNode: {destKey},
}
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
// prepare rename from
srcArgs := makeArgs("Prepare", txIDStr, "RenameFrom", srcKey)
var srcPrepareResp redis.Reply
if srcNode == cluster.self {
srcPrepareResp = execPrepare(cluster, c, srcArgs)
} else {
srcPrepareResp = cluster.relay(srcNode, c, srcArgs)
}
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return srcPrepareResp
}
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareMBR.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare rename to
destArgs := utils.ToCmdLine3("Prepare", []byte(txIDStr),
[]byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1])
var destPrepareResp redis.Reply
if destNode == cluster.self {
destPrepareResp = execPrepare(cluster, c, destArgs)
} else {
destPrepareResp = cluster.relay(destNode, c, destArgs)
}
if protocol.IsErrorReply(destPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, groupMap)
return destPrepareResp
}
_, errReply := requestCommit(cluster, c, txID, groupMap)
if errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return errReply
}
return protocol.MakeOkReply()
}
func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeArgNumErrReply("RenameFrom")
}
key := string(cmdLine[1])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 0 {
return protocol.MakeErrReply("ERR no such key")
}
return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key))
}
func init() {
registerPrepareFunc("RenameFrom", prepareRenameFrom)
} }
// RenameNx renames a key, only if the new key does not exist. // RenameNx renames a key, only if the new key does not exist.

View File

@@ -37,6 +37,7 @@ func TestRename(t *testing.T) {
t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) t.Errorf("expected ttl more than 0, actual: %d", intResult.Code)
return return
} }
// test no src key
} }
func TestRenameNx(t *testing.T) { func TestRenameNx(t *testing.T) {

View File

@@ -17,6 +17,10 @@ import (
// For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists // For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists
var prepareFuncMap = make(map[string]CmdFunc) var prepareFuncMap = make(map[string]CmdFunc)
func registerPrepareFunc(cmdName string, fn CmdFunc) {
prepareFuncMap[strings.ToLower(cmdName)] = fn
}
// Transaction stores state and data for a try-commit-catch distributed transaction // Transaction stores state and data for a try-commit-catch distributed transaction
type Transaction struct { type Transaction struct {
id string // transaction id id string // transaction id
@@ -195,16 +199,16 @@ func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Rep
} }
// requestCommit commands all node to commit transaction as coordinator // requestCommit commands all node to commit transaction as coordinator
func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) ([]redis.Reply, protocol.ErrorReply) { func requestCommit(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) ([]redis.Reply, protocol.ErrorReply) {
var errReply protocol.ErrorReply var errReply protocol.ErrorReply
txIDStr := strconv.FormatInt(txID, 10) txIDStr := strconv.FormatInt(txID, 10)
respList := make([]redis.Reply, 0, len(peers)) respList := make([]redis.Reply, 0, len(groupMap))
for peer := range peers { for node := range groupMap {
var resp redis.Reply var resp redis.Reply
if peer == cluster.self { if node == cluster.self {
resp = execCommit(cluster, c, makeArgs("commit", txIDStr)) resp = execCommit(cluster, c, makeArgs("commit", txIDStr))
} else { } else {
resp = cluster.relay(peer, c, makeArgs("commit", txIDStr)) resp = cluster.relay(node, c, makeArgs("commit", txIDStr))
} }
if protocol.IsErrorReply(resp) { if protocol.IsErrorReply(resp) {
errReply = resp.(protocol.ErrorReply) errReply = resp.(protocol.ErrorReply)
@@ -213,20 +217,21 @@ func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[s
respList = append(respList, resp) respList = append(respList, resp)
} }
if errReply != nil { if errReply != nil {
requestRollback(cluster, c, txID, peers) requestRollback(cluster, c, txID, groupMap)
return nil, errReply return nil, errReply
} }
return respList, nil return respList, nil
} }
// requestRollback requests all node rollback transaction as coordinator // requestRollback requests all node rollback transaction as coordinator
func requestRollback(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) { // groupMap: node -> keys
func requestRollback(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) {
txIDStr := strconv.FormatInt(txID, 10) txIDStr := strconv.FormatInt(txID, 10)
for peer := range peers { for node := range groupMap {
if peer == cluster.self { if node == cluster.self {
execRollback(cluster, c, makeArgs("rollback", txIDStr)) execRollback(cluster, c, makeArgs("rollback", txIDStr))
} else { } else {
cluster.relay(peer, c, makeArgs("rollback", txIDStr)) cluster.relay(node, c, makeArgs("rollback", txIDStr))
} }
} }
} }

View File

@@ -0,0 +1,91 @@
package database
import (
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
)
// execExistIn returns existing key in given keys
// example: ExistIn key1 key2 key3..., returns [key1, key2]
// custom command for MSetNX tcc transaction
func execExistIn(db *DB, args [][]byte) redis.Reply {
var result [][]byte
for _, arg := range args {
key := string(arg)
_, exists := db.GetEntity(key)
if exists {
result = append(result, []byte(key))
}
}
if len(result) == 0 {
return protocol.MakeEmptyMultiBulkReply()
}
return protocol.MakeMultiBulkReply(result)
}
// execDumpKey returns redis serialization protocol data of given key (see aof.EntityToCmd)
func execDumpKey(db *DB, args [][]byte) redis.Reply {
key := string(args[0])
entity, ok := db.GetEntity(key)
if !ok {
return protocol.MakeEmptyMultiBulkReply()
}
dumpCmd := aof.EntityToCmd(key, entity)
ttlCmd := toTTLCmd(db, key)
resp := protocol.MakeMultiBulkReply([][]byte{
dumpCmd.ToBytes(),
ttlCmd.ToBytes(),
})
return resp
}
// execRenameFrom exactly the same as execDel, used for cluster.Rename
func execRenameFrom(db *DB, args [][]byte) redis.Reply {
key := string(args[0])
db.Remove(key)
return protocol.MakeOkReply()
}
// execRenameTo accepts result of execDumpKey and load the dumped key
// args format: key dumpCmd ttlCmd
// execRenameTo may be partially successful, do not use it without transaction
func execRenameTo(db *DB, args [][]byte) redis.Reply {
key := args[0]
dumpRawCmd, err := parser.ParseOne(args[1])
if err != nil {
return protocol.MakeErrReply("illegal dump cmd: " + err.Error())
}
dumpCmd, ok := dumpRawCmd.(*protocol.MultiBulkReply)
if !ok {
return protocol.MakeErrReply("dump cmd is not multi bulk reply")
}
dumpCmd.Args[1] = key // change key
ttlRawCmd, err := parser.ParseOne(args[2])
if err != nil {
return protocol.MakeErrReply("illegal ttl cmd: " + err.Error())
}
ttlCmd, ok := ttlRawCmd.(*protocol.MultiBulkReply)
if !ok {
return protocol.MakeErrReply("ttl cmd is not multi bulk reply")
}
ttlCmd.Args[1] = key
db.Remove(string(key))
dumpResult := db.execWithLock(dumpCmd.Args)
if protocol.IsErrorReply(dumpResult) {
return dumpResult
}
ttlResult := db.execWithLock(ttlCmd.Args)
if protocol.IsErrorReply(ttlResult) {
return ttlResult
}
return protocol.MakeOkReply()
}
func init() {
RegisterCommand("DumpKey", execDumpKey, writeAllKeys, undoDel, 2)
RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1)
RegisterCommand("RenameFrom", execRenameFrom, readFirstKey, nil, 2)
RegisterCommand("RenameTo", execRenameTo, writeFirstKey, rollbackFirstKey, 4)
}

View File

@@ -0,0 +1,58 @@
package database
import (
"fmt"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)
func TestExistIn(t *testing.T) {
testDB.Flush()
key := utils.RandString(10)
value := utils.RandString(10)
key2 := utils.RandString(10)
testDB.Exec(nil, utils.ToCmdLine("set", key, value))
result := testDB.Exec(nil, utils.ToCmdLine("ExistIn", key, key2))
asserts.AssertMultiBulkReply(t, result, []string{key})
key3 := utils.RandString(10)
result = testDB.Exec(nil, utils.ToCmdLine("ExistIn", key2, key3))
asserts.AssertMultiBulkReplySize(t, result, 0)
}
func TestDumpKeyAndRenameTo(t *testing.T) {
testDB.Flush()
key := utils.RandString(10)
value := utils.RandString(10)
newKey := key + utils.RandString(2)
testDB.Exec(nil, utils.ToCmdLine("set", key, value, "ex", "1000"))
result := testDB.Exec(nil, utils.ToCmdLine("DumpKey", key))
if protocol.IsErrorReply(result) {
t.Error("dump key error")
return
}
dumpResult := result.(*protocol.MultiBulkReply)
result = testDB.Exec(nil, utils.ToCmdLine("RenameTo", newKey,
string(dumpResult.Args[0]), string(dumpResult.Args[1])))
asserts.AssertNotError(t, result)
result = testDB.Exec(nil, utils.ToCmdLine("RenameFrom", key))
asserts.AssertNotError(t, result)
result = testDB.Exec(nil, utils.ToCmdLine("exists", key))
asserts.AssertIntReply(t, result, 0)
result = testDB.Exec(nil, utils.ToCmdLine("exists", newKey))
asserts.AssertIntReply(t, result, 1)
// check ttl
result = testDB.Exec(nil, utils.ToCmdLine("ttl", newKey))
intResult, ok := result.(*protocol.IntReply)
if !ok {
t.Error(fmt.Sprintf("expected int protocol, actually %s", result.ToBytes()))
return
}
if intResult.Code <= 0 {
t.Errorf("expected ttl more than 0, actual: %d", intResult.Code)
return
}
}

View File

@@ -49,24 +49,6 @@ func execExists(db *DB, args [][]byte) redis.Reply {
return protocol.MakeIntReply(result) return protocol.MakeIntReply(result)
} }
// execExistIn returns existing key in given keys
// example: ExistIn key1 key2 key3..., returns [key1, key2]
// custom command for MSetNX tcc transaction
func execExistIn(db *DB, args [][]byte) redis.Reply {
var result [][]byte
for _, arg := range args {
key := string(arg)
_, exists := db.GetEntity(key)
if exists {
result = append(result, []byte(key))
}
}
if len(result) == 0 {
return protocol.MakeEmptyMultiBulkReply()
}
return protocol.MakeMultiBulkReply(result)
}
// execFlushDB removes all data in current db // execFlushDB removes all data in current db
func execFlushDB(db *DB, args [][]byte) redis.Reply { func execFlushDB(db *DB, args [][]byte) redis.Reply {
db.Flush() db.Flush()
@@ -336,7 +318,6 @@ func init() {
RegisterCommand("PTTL", execPTTL, readFirstKey, nil, 2) RegisterCommand("PTTL", execPTTL, readFirstKey, nil, 2)
RegisterCommand("Persist", execPersist, writeFirstKey, undoExpire, 2) RegisterCommand("Persist", execPersist, writeFirstKey, undoExpire, 2)
RegisterCommand("Exists", execExists, readAllKeys, nil, -2) RegisterCommand("Exists", execExists, readAllKeys, nil, -2)
RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1)
RegisterCommand("Type", execType, readFirstKey, nil, 2) RegisterCommand("Type", execType, readFirstKey, nil, 2)
RegisterCommand("Rename", execRename, prepareRename, undoRename, 3) RegisterCommand("Rename", execRename, prepareRename, undoRename, 3)
RegisterCommand("RenameNx", execRenameNx, prepareRename, undoRename, 3) RegisterCommand("RenameNx", execRenameNx, prepareRename, undoRename, 3)

View File

@@ -22,19 +22,6 @@ func TestExists(t *testing.T) {
asserts.AssertIntReply(t, result, 0) asserts.AssertIntReply(t, result, 0)
} }
func TestExistIn(t *testing.T) {
testDB.Flush()
key := utils.RandString(10)
value := utils.RandString(10)
key2 := utils.RandString(10)
testDB.Exec(nil, utils.ToCmdLine("set", key, value))
result := testDB.Exec(nil, utils.ToCmdLine("ExistIn", key, key2))
asserts.AssertMultiBulkReply(t, result, []string{key})
key3 := utils.RandString(10)
result = testDB.Exec(nil, utils.ToCmdLine("ExistIn", key2, key3))
asserts.AssertMultiBulkReplySize(t, result, 0)
}
func TestType(t *testing.T) { func TestType(t *testing.T) {
testDB.Flush() testDB.Flush()
key := utils.RandString(10) key := utils.RandString(10)

View File

@@ -87,6 +87,7 @@ func (mdb *MultiDB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Rep
return protocol.MakeErrReply("NOAUTH Authentication required") return protocol.MakeErrReply("NOAUTH Authentication required")
} }
// todo: merge special commands into router
// special commands // special commands
if cmdName == "subscribe" { if cmdName == "subscribe" {
if len(cmdLine) < 2 { if len(cmdLine) < 2 {