diff --git a/cluster/cluster.go b/cluster/cluster.go index 8fb1db0..686097f 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -160,7 +160,7 @@ func makeArgs(cmd string, args ...string) [][]byte { return result } -// return peer -> writeKeys +// return node -> writeKeys func (cluster *Cluster) groupBy(keys []string) map[string][]string { result := make(map[string][]string) for _, key := range keys { diff --git a/cluster/mset.go b/cluster/mset.go index 6aea5f6..2a216ef 100644 --- a/cluster/mset.go +++ b/cluster/mset.go @@ -128,16 +128,16 @@ func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { txID := cluster.idGenerator.NextID() txIDStr := strconv.FormatInt(txID, 10) rollback := false - for peer, group := range groupMap { - peerArgs := []string{txIDStr, "MSETNX"} + for node, group := range groupMap { + nodeArgs := []string{txIDStr, "MSETNX"} for _, k := range group { - peerArgs = append(peerArgs, k, valueMap[k]) + nodeArgs = append(nodeArgs, k, valueMap[k]) } var resp redis.Reply - if peer == cluster.self { - resp = execPrepare(cluster, c, makeArgs("Prepare", peerArgs...)) + if node == cluster.self { + resp = execPrepare(cluster, c, makeArgs("Prepare", nodeArgs...)) } else { - resp = cluster.relay(peer, c, makeArgs("Prepare", peerArgs...)) + resp = cluster.relay(node, c, makeArgs("Prepare", nodeArgs...)) } if protocol.IsErrorReply(resp) { re := resp.(protocol.ErrorReply) @@ -187,5 +187,5 @@ func prepareMSetNx(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) red } func init() { - prepareFuncMap["msetnx"] = prepareMSetNx + registerPrepareFunc("MSetNx", prepareMSetNx) } diff --git a/cluster/rename.go b/cluster/rename.go index 3ef8895..c168445 100644 --- a/cluster/rename.go +++ b/cluster/rename.go @@ -2,7 +2,9 @@ package cluster import ( "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" + "strconv" ) // Rename renames a key, the origin and the destination must within the same node @@ -10,16 +12,79 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if len(args) != 3 { return protocol.MakeErrReply("ERR wrong number of arguments for 'rename' command") } - src := string(args[1]) - dest := string(args[2]) + srcKey := string(args[1]) + destKey := string(args[2]) - srcPeer := cluster.peerPicker.PickNode(src) - destPeer := cluster.peerPicker.PickNode(dest) - - if srcPeer != destPeer { - return protocol.MakeErrReply("ERR rename must within one slot in cluster mode") + srcNode := cluster.peerPicker.PickNode(srcKey) + destNode := cluster.peerPicker.PickNode(destKey) + if srcNode == destNode { // do fast + return cluster.relay(srcNode, c, args) } - return cluster.relay(srcPeer, c, args) + + groupMap := map[string][]string{ + srcNode: {srcKey}, + destNode: {destKey}, + } + txID := cluster.idGenerator.NextID() + txIDStr := strconv.FormatInt(txID, 10) + // prepare rename from + srcArgs := makeArgs("Prepare", txIDStr, "RenameFrom", srcKey) + var srcPrepareResp redis.Reply + if srcNode == cluster.self { + srcPrepareResp = execPrepare(cluster, c, srcArgs) + } else { + srcPrepareResp = cluster.relay(srcNode, c, srcArgs) + } + if protocol.IsErrorReply(srcPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) + return srcPrepareResp + } + srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply) + if !ok || len(srcPrepareMBR.Args) < 2 { + requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}}) + return protocol.MakeErrReply("ERR invalid prepare response") + } + // prepare rename to + destArgs := utils.ToCmdLine3("Prepare", []byte(txIDStr), + []byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]) + var destPrepareResp redis.Reply + if destNode == cluster.self { + destPrepareResp = execPrepare(cluster, c, destArgs) + } else { + destPrepareResp = cluster.relay(destNode, c, destArgs) + } + if protocol.IsErrorReply(destPrepareResp) { + // rollback src node + requestRollback(cluster, c, txID, groupMap) + return destPrepareResp + } + _, errReply := requestCommit(cluster, c, txID, groupMap) + if errReply != nil { + requestRollback(cluster, c, txID, groupMap) + return errReply + } + return protocol.MakeOkReply() +} + +func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) != 2 { + return protocol.MakeArgNumErrReply("RenameFrom") + } + key := string(cmdLine[1]) + existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key)) + if protocol.IsErrorReply(existResp) { + return existResp + } + existIntResp := existResp.(*protocol.IntReply) + if existIntResp.Code == 0 { + return protocol.MakeErrReply("ERR no such key") + } + return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key)) +} + +func init() { + registerPrepareFunc("RenameFrom", prepareRenameFrom) } // RenameNx renames a key, only if the new key does not exist. diff --git a/cluster/rename_test.go b/cluster/rename_test.go index 20853a4..d679686 100644 --- a/cluster/rename_test.go +++ b/cluster/rename_test.go @@ -37,6 +37,7 @@ func TestRename(t *testing.T) { t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) return } + // test no src key } func TestRenameNx(t *testing.T) { diff --git a/cluster/tcc.go b/cluster/tcc.go index 8a9ef8d..d0d6215 100644 --- a/cluster/tcc.go +++ b/cluster/tcc.go @@ -17,6 +17,10 @@ import ( // For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists var prepareFuncMap = make(map[string]CmdFunc) +func registerPrepareFunc(cmdName string, fn CmdFunc) { + prepareFuncMap[strings.ToLower(cmdName)] = fn +} + // Transaction stores state and data for a try-commit-catch distributed transaction type Transaction struct { id string // transaction id @@ -195,16 +199,16 @@ func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Rep } // requestCommit commands all node to commit transaction as coordinator -func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) ([]redis.Reply, protocol.ErrorReply) { +func requestCommit(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) ([]redis.Reply, protocol.ErrorReply) { var errReply protocol.ErrorReply txIDStr := strconv.FormatInt(txID, 10) - respList := make([]redis.Reply, 0, len(peers)) - for peer := range peers { + respList := make([]redis.Reply, 0, len(groupMap)) + for node := range groupMap { var resp redis.Reply - if peer == cluster.self { + if node == cluster.self { resp = execCommit(cluster, c, makeArgs("commit", txIDStr)) } else { - resp = cluster.relay(peer, c, makeArgs("commit", txIDStr)) + resp = cluster.relay(node, c, makeArgs("commit", txIDStr)) } if protocol.IsErrorReply(resp) { errReply = resp.(protocol.ErrorReply) @@ -213,20 +217,21 @@ func requestCommit(cluster *Cluster, c redis.Connection, txID int64, peers map[s respList = append(respList, resp) } if errReply != nil { - requestRollback(cluster, c, txID, peers) + requestRollback(cluster, c, txID, groupMap) return nil, errReply } return respList, nil } // requestRollback requests all node rollback transaction as coordinator -func requestRollback(cluster *Cluster, c redis.Connection, txID int64, peers map[string][]string) { +// groupMap: node -> keys +func requestRollback(cluster *Cluster, c redis.Connection, txID int64, groupMap map[string][]string) { txIDStr := strconv.FormatInt(txID, 10) - for peer := range peers { - if peer == cluster.self { + for node := range groupMap { + if node == cluster.self { execRollback(cluster, c, makeArgs("rollback", txIDStr)) } else { - cluster.relay(peer, c, makeArgs("rollback", txIDStr)) + cluster.relay(node, c, makeArgs("rollback", txIDStr)) } } } diff --git a/database/cluster_helper.go b/database/cluster_helper.go new file mode 100644 index 0000000..b9cf451 --- /dev/null +++ b/database/cluster_helper.go @@ -0,0 +1,91 @@ +package database + +import ( + "github.com/hdt3213/godis/aof" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/redis/parser" + "github.com/hdt3213/godis/redis/protocol" +) + +// execExistIn returns existing key in given keys +// example: ExistIn key1 key2 key3..., returns [key1, key2] +// custom command for MSetNX tcc transaction +func execExistIn(db *DB, args [][]byte) redis.Reply { + var result [][]byte + for _, arg := range args { + key := string(arg) + _, exists := db.GetEntity(key) + if exists { + result = append(result, []byte(key)) + } + } + if len(result) == 0 { + return protocol.MakeEmptyMultiBulkReply() + } + return protocol.MakeMultiBulkReply(result) +} + +// execDumpKey returns redis serialization protocol data of given key (see aof.EntityToCmd) +func execDumpKey(db *DB, args [][]byte) redis.Reply { + key := string(args[0]) + entity, ok := db.GetEntity(key) + if !ok { + return protocol.MakeEmptyMultiBulkReply() + } + dumpCmd := aof.EntityToCmd(key, entity) + ttlCmd := toTTLCmd(db, key) + resp := protocol.MakeMultiBulkReply([][]byte{ + dumpCmd.ToBytes(), + ttlCmd.ToBytes(), + }) + return resp +} + +// execRenameFrom exactly the same as execDel, used for cluster.Rename +func execRenameFrom(db *DB, args [][]byte) redis.Reply { + key := string(args[0]) + db.Remove(key) + return protocol.MakeOkReply() +} + +// execRenameTo accepts result of execDumpKey and load the dumped key +// args format: key dumpCmd ttlCmd +// execRenameTo may be partially successful, do not use it without transaction +func execRenameTo(db *DB, args [][]byte) redis.Reply { + key := args[0] + dumpRawCmd, err := parser.ParseOne(args[1]) + if err != nil { + return protocol.MakeErrReply("illegal dump cmd: " + err.Error()) + } + dumpCmd, ok := dumpRawCmd.(*protocol.MultiBulkReply) + if !ok { + return protocol.MakeErrReply("dump cmd is not multi bulk reply") + } + dumpCmd.Args[1] = key // change key + ttlRawCmd, err := parser.ParseOne(args[2]) + if err != nil { + return protocol.MakeErrReply("illegal ttl cmd: " + err.Error()) + } + ttlCmd, ok := ttlRawCmd.(*protocol.MultiBulkReply) + if !ok { + return protocol.MakeErrReply("ttl cmd is not multi bulk reply") + } + ttlCmd.Args[1] = key + db.Remove(string(key)) + dumpResult := db.execWithLock(dumpCmd.Args) + if protocol.IsErrorReply(dumpResult) { + return dumpResult + } + ttlResult := db.execWithLock(ttlCmd.Args) + if protocol.IsErrorReply(ttlResult) { + return ttlResult + } + return protocol.MakeOkReply() +} + +func init() { + RegisterCommand("DumpKey", execDumpKey, writeAllKeys, undoDel, 2) + RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1) + RegisterCommand("RenameFrom", execRenameFrom, readFirstKey, nil, 2) + RegisterCommand("RenameTo", execRenameTo, writeFirstKey, rollbackFirstKey, 4) +} diff --git a/database/cluster_helper_test.go b/database/cluster_helper_test.go new file mode 100644 index 0000000..df49f9b --- /dev/null +++ b/database/cluster_helper_test.go @@ -0,0 +1,58 @@ +package database + +import ( + "fmt" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/protocol" + "github.com/hdt3213/godis/redis/protocol/asserts" + "testing" +) + +func TestExistIn(t *testing.T) { + testDB.Flush() + key := utils.RandString(10) + value := utils.RandString(10) + key2 := utils.RandString(10) + testDB.Exec(nil, utils.ToCmdLine("set", key, value)) + result := testDB.Exec(nil, utils.ToCmdLine("ExistIn", key, key2)) + asserts.AssertMultiBulkReply(t, result, []string{key}) + key3 := utils.RandString(10) + result = testDB.Exec(nil, utils.ToCmdLine("ExistIn", key2, key3)) + asserts.AssertMultiBulkReplySize(t, result, 0) +} + +func TestDumpKeyAndRenameTo(t *testing.T) { + testDB.Flush() + key := utils.RandString(10) + value := utils.RandString(10) + newKey := key + utils.RandString(2) + testDB.Exec(nil, utils.ToCmdLine("set", key, value, "ex", "1000")) + + result := testDB.Exec(nil, utils.ToCmdLine("DumpKey", key)) + if protocol.IsErrorReply(result) { + t.Error("dump key error") + return + } + dumpResult := result.(*protocol.MultiBulkReply) + result = testDB.Exec(nil, utils.ToCmdLine("RenameTo", newKey, + string(dumpResult.Args[0]), string(dumpResult.Args[1]))) + asserts.AssertNotError(t, result) + result = testDB.Exec(nil, utils.ToCmdLine("RenameFrom", key)) + asserts.AssertNotError(t, result) + + result = testDB.Exec(nil, utils.ToCmdLine("exists", key)) + asserts.AssertIntReply(t, result, 0) + result = testDB.Exec(nil, utils.ToCmdLine("exists", newKey)) + asserts.AssertIntReply(t, result, 1) + // check ttl + result = testDB.Exec(nil, utils.ToCmdLine("ttl", newKey)) + intResult, ok := result.(*protocol.IntReply) + if !ok { + t.Error(fmt.Sprintf("expected int protocol, actually %s", result.ToBytes())) + return + } + if intResult.Code <= 0 { + t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) + return + } +} diff --git a/database/keys.go b/database/keys.go index 77f3b13..d739224 100644 --- a/database/keys.go +++ b/database/keys.go @@ -49,24 +49,6 @@ func execExists(db *DB, args [][]byte) redis.Reply { return protocol.MakeIntReply(result) } -// execExistIn returns existing key in given keys -// example: ExistIn key1 key2 key3..., returns [key1, key2] -// custom command for MSetNX tcc transaction -func execExistIn(db *DB, args [][]byte) redis.Reply { - var result [][]byte - for _, arg := range args { - key := string(arg) - _, exists := db.GetEntity(key) - if exists { - result = append(result, []byte(key)) - } - } - if len(result) == 0 { - return protocol.MakeEmptyMultiBulkReply() - } - return protocol.MakeMultiBulkReply(result) -} - // execFlushDB removes all data in current db func execFlushDB(db *DB, args [][]byte) redis.Reply { db.Flush() @@ -336,7 +318,6 @@ func init() { RegisterCommand("PTTL", execPTTL, readFirstKey, nil, 2) RegisterCommand("Persist", execPersist, writeFirstKey, undoExpire, 2) RegisterCommand("Exists", execExists, readAllKeys, nil, -2) - RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1) RegisterCommand("Type", execType, readFirstKey, nil, 2) RegisterCommand("Rename", execRename, prepareRename, undoRename, 3) RegisterCommand("RenameNx", execRenameNx, prepareRename, undoRename, 3) diff --git a/database/keys_test.go b/database/keys_test.go index 24872f1..9f2539d 100644 --- a/database/keys_test.go +++ b/database/keys_test.go @@ -22,19 +22,6 @@ func TestExists(t *testing.T) { asserts.AssertIntReply(t, result, 0) } -func TestExistIn(t *testing.T) { - testDB.Flush() - key := utils.RandString(10) - value := utils.RandString(10) - key2 := utils.RandString(10) - testDB.Exec(nil, utils.ToCmdLine("set", key, value)) - result := testDB.Exec(nil, utils.ToCmdLine("ExistIn", key, key2)) - asserts.AssertMultiBulkReply(t, result, []string{key}) - key3 := utils.RandString(10) - result = testDB.Exec(nil, utils.ToCmdLine("ExistIn", key2, key3)) - asserts.AssertMultiBulkReplySize(t, result, 0) -} - func TestType(t *testing.T) { testDB.Flush() key := utils.RandString(10) diff --git a/database/server.go b/database/server.go index b872fa3..cf17be8 100644 --- a/database/server.go +++ b/database/server.go @@ -87,6 +87,7 @@ func (mdb *MultiDB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Rep return protocol.MakeErrReply("NOAUTH Authentication required") } + // todo: merge special commands into router // special commands if cmdName == "subscribe" { if len(cmdLine) < 2 {