From c97f3aae6e32cc908d52df7b27d2a25a1697289c Mon Sep 17 00:00:00 2001 From: hdt3213 Date: Fri, 8 Apr 2022 20:28:13 +0800 Subject: [PATCH] using TCC for MSetNX in cluster --- README.md | 4 +- README_CN.md | 2 +- cluster/cluster.go | 2 +- cluster/mset.go | 118 +++++++++++++++++++++++++++++++-------- cluster/mset_test.go | 4 ++ cluster/tcc.go | 14 ++++- database/keys.go | 21 ++++++- database/keys_test.go | 13 +++++ interface/database/db.go | 4 +- 9 files changed, 150 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 231b8fd..3efcbc4 100644 --- a/README.md +++ b/README.md @@ -142,8 +142,8 @@ I suggest focusing on the following directories: - set: a hash set based on map - sortedset: a sorted set implements based on skiplist - database: the core of storage engine - - server.go: a standalone redis server, - - db.go: the basement function of database + - server.go: a standalone redis server, with multiple database + - db.go: data structure and base functions of single database - exec.go: the gateway of database - router.go: the command table - keys.go: handlers for keys commands diff --git a/README_CN.md b/README_CN.md index 76643ff..f7956e5 100644 --- a/README_CN.md +++ b/README_CN.md @@ -132,7 +132,7 @@ MSET (10 keys): 65487.89 requests per second - set: 基于hash表的集合 - sortedset: 基于跳表实现的有序集合 - database: 存储引擎核心 - - db.go: 引擎的基础功能 + - db.go: 单个 database 的数据结构和基本功能 - router.go: 将命令路由给响应的处理函数 - keys.go: del、ttl、expire 等通用命令实现 - string.go: get、set 等字符串命令实现 diff --git a/cluster/cluster.go b/cluster/cluster.go index 11986f6..24fb371 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -76,7 +76,7 @@ func MakeCluster() *Cluster { } // CmdFunc represents the handler of a redis command -type CmdFunc func(cluster *Cluster, c redis.Connection, cmdAndArgs [][]byte) redis.Reply +type CmdFunc func(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply // Close stops current node of cluster func (cluster *Cluster) Close() { diff --git a/cluster/mset.go b/cluster/mset.go index 39ac5c5..9b8491f 100644 --- a/cluster/mset.go +++ b/cluster/mset.go @@ -3,18 +3,21 @@ package cluster import ( "fmt" "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/reply" "strconv" ) +const keyExistsErr = "key exists" + // MGet atomically get multi key-value from cluster, writeKeys can be distributed on any node -func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - if len(args) < 2 { +func MGet(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) < 2 { return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command") } - keys := make([]string, len(args)-1) - for i := 1; i < len(args); i++ { - keys[i-1] = string(args[i]) + keys := make([]string, len(cmdLine)-1) + for i := 1; i < len(cmdLine); i++ { + keys[i-1] = string(cmdLine[i]) } resultMap := make(map[string][]byte) @@ -39,8 +42,8 @@ func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } // MSet atomically sets multi key-value in cluster, writeKeys can be distributed on any node -func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - argCount := len(args) - 1 +func MSet(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + argCount := len(cmdLine) - 1 if argCount%2 != 0 || argCount < 1 { return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command") } @@ -49,14 +52,14 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { keys := make([]string, size) valueMap := make(map[string]string) for i := 0; i < size; i++ { - keys[i] = string(args[2*i+1]) - valueMap[keys[i]] = string(args[2*i+2]) + keys[i] = string(cmdLine[2*i+1]) + valueMap[keys[i]] = string(cmdLine[2*i+2]) } groupMap := cluster.groupBy(keys) if len(groupMap) == 1 && allowFastTransaction { // do fast for peer := range groupMap { - return cluster.relay(peer, c, args) + return cluster.relay(peer, c, cmdLine) } } @@ -97,23 +100,92 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { } // MSetNX sets multi key-value in database, only if none of the given writeKeys exist and all given writeKeys are on the same node -func MSetNX(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { - argCount := len(args) - 1 +func MSetNX(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + argCount := len(cmdLine) - 1 if argCount%2 != 0 || argCount < 1 { - return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command") + return reply.MakeErrReply("ERR wrong number of arguments for 'msetnx' command") } - var peer string + size := argCount / 2 + keys := make([]string, size) + valueMap := make(map[string]string) for i := 0; i < size; i++ { - key := string(args[2*i]) - currentPeer := cluster.peerPicker.PickNode(key) - if peer == "" { - peer = currentPeer - } else { - if peer != currentPeer { - return reply.MakeErrReply("ERR msetnx must within one slot in cluster mode") - } + keys[i] = string(cmdLine[2*i+1]) + valueMap[keys[i]] = string(cmdLine[2*i+2]) + } + + groupMap := cluster.groupBy(keys) + if len(groupMap) == 1 && allowFastTransaction { // do fast + for peer := range groupMap { + return cluster.relay(peer, c, cmdLine) } } - return cluster.relay(peer, c, args) + + // prepare procedure: + // 1. Normal tcc preparation (undo log and lock related keys) + // 2. Peer checks whether any key already exists, If so it will return keyExistsErr. Then coordinator will request rollback over all participated nodes + var errReply redis.Reply + txID := cluster.idGenerator.NextID() + txIDStr := strconv.FormatInt(txID, 10) + rollback := false + for peer, group := range groupMap { + peerArgs := []string{txIDStr, "MSETNX"} + for _, k := range group { + peerArgs = append(peerArgs, k, valueMap[k]) + } + var resp redis.Reply + if peer == cluster.self { + resp = execPrepare(cluster, c, makeArgs("Prepare", peerArgs...)) + } else { + resp = cluster.relay(peer, c, makeArgs("Prepare", peerArgs...)) + } + if reply.IsErrorReply(resp) { + re := resp.(reply.ErrorReply) + if re.Error() == keyExistsErr { + errReply = reply.MakeIntReply(0) + } else { + errReply = resp + } + rollback = true + break + } + } + if rollback { + // rollback + requestRollback(cluster, c, txID, groupMap) + return errReply + } + _, errReply = requestCommit(cluster, c, txID, groupMap) + rollback = errReply != nil + if !rollback { + return reply.MakeIntReply(1) + } + return errReply +} + +func prepareMSetNx(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + args := cmdLine[1:] + if len(args)%2 != 0 { + return reply.MakeSyntaxErrReply() + } + size := len(args) / 2 + values := make([][]byte, size) + keys := make([]string, size) + for i := 0; i < size; i++ { + keys[i] = string(args[2*i]) + values[i] = args[2*i+1] + } + re := cluster.db.ExecWithLock(conn, utils.ToCmdLine2("ExistIn", keys...)) + if reply.IsErrorReply(re) { + return re + } + _, ok := re.(*reply.EmptyMultiBulkReply) + if !ok { + return reply.MakeErrReply(keyExistsErr) + } + return reply.MakeOkReply() +} + +func init() { + prepareFuncMap["msetnx"] = prepareMSetNx } diff --git a/cluster/mset_test.go b/cluster/mset_test.go index 95309aa..a14fe50 100644 --- a/cluster/mset_test.go +++ b/cluster/mset_test.go @@ -21,4 +21,8 @@ func TestMSetNx(t *testing.T) { FlushAll(testCluster, conn, toArgs("FLUSHALL")) ret := MSetNX(testCluster, conn, toArgs("MSETNX", "a", "a", "b", "b")) asserts.AssertNotError(t, ret) + ret = MSetNX(testCluster, conn, toArgs("MSETNX", "a", "a", "c", "c")) + asserts.AssertNotError(t, ret) + ret = testCluster.Exec(conn, toArgs("MGET", "a", "b", "c")) + asserts.AssertMultiBulkReply(t, ret, []string{"a", "b", ""}) } diff --git a/cluster/tcc.go b/cluster/tcc.go index c36ee5f..a1a4bf9 100644 --- a/cluster/tcc.go +++ b/cluster/tcc.go @@ -8,10 +8,15 @@ import ( "github.com/hdt3213/godis/lib/timewheel" "github.com/hdt3213/godis/redis/reply" "strconv" + "strings" "sync" "time" ) +// prepareFuncMap executed after related key locked, and use additional logic to determine whether the transaction can be committed +// For example, prepareMSetNX will return error to prevent MSetNx transaction from committing if any related key already exists +var prepareFuncMap = make(map[string]CmdFunc) + // Transaction stores state and data for a try-commit-catch distributed transaction type Transaction struct { id string // transaction id @@ -72,7 +77,7 @@ func (tx *Transaction) unLockKeys() { } } -// t should contains Keys and Id field +// t should contain Keys and ID field func (tx *Transaction) prepare() error { tx.mu.Lock() defer tx.mu.Unlock() @@ -117,15 +122,20 @@ func (tx *Transaction) rollback() error { // cmdLine: Prepare id cmdName args... func execPrepare(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) < 3 { - return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command") + return reply.MakeErrReply("ERR wrong number of arguments for 'prepare' command") } txID := string(cmdLine[1]) + cmdName := strings.ToLower(string(cmdLine[2])) tx := NewTransaction(cluster, c, txID, cmdLine[2:]) cluster.transactions.Put(txID, tx) err := tx.prepare() if err != nil { return reply.MakeErrReply(err.Error()) } + prepareFunc, ok := prepareFuncMap[cmdName] + if ok { + return prepareFunc(cluster, c, cmdLine[2:]) + } return &reply.OkReply{} } diff --git a/database/keys.go b/database/keys.go index 52ce88b..9e98edb 100644 --- a/database/keys.go +++ b/database/keys.go @@ -36,7 +36,7 @@ func undoDel(db *DB, args [][]byte) []CmdLine { return rollbackGivenKeys(db, keys...) } -// execExists checks if a is existed in db +// execExists checks if given key is existed in db func execExists(db *DB, args [][]byte) redis.Reply { result := int64(0) for _, arg := range args { @@ -49,6 +49,24 @@ func execExists(db *DB, args [][]byte) redis.Reply { return reply.MakeIntReply(result) } +// execExistIn returns existing key in given keys +// example: ExistIn key1 key2 key3..., returns [key1, key2] +// custom command for MSetNX tcc transaction +func execExistIn(db *DB, args [][]byte) redis.Reply { + var result [][]byte + for _, arg := range args { + key := string(arg) + _, exists := db.GetEntity(key) + if exists { + result = append(result, []byte(key)) + } + } + if len(result) == 0 { + return reply.MakeEmptyMultiBulkReply() + } + return reply.MakeMultiBulkReply(result) +} + // execFlushDB removes all data in current db func execFlushDB(db *DB, args [][]byte) redis.Reply { db.Flush() @@ -318,6 +336,7 @@ func init() { RegisterCommand("PTTL", execPTTL, readFirstKey, nil, 2) RegisterCommand("Persist", execPersist, writeFirstKey, undoExpire, 2) RegisterCommand("Exists", execExists, readAllKeys, nil, -2) + RegisterCommand("ExistIn", execExistIn, readAllKeys, nil, -1) RegisterCommand("Type", execType, readFirstKey, nil, 2) RegisterCommand("Rename", execRename, prepareRename, undoRename, 3) RegisterCommand("RenameNx", execRenameNx, prepareRename, undoRename, 3) diff --git a/database/keys_test.go b/database/keys_test.go index 5fc0d2c..0906c40 100644 --- a/database/keys_test.go +++ b/database/keys_test.go @@ -22,6 +22,19 @@ func TestExists(t *testing.T) { asserts.AssertIntReply(t, result, 0) } +func TestExistIn(t *testing.T) { + testDB.Flush() + key := utils.RandString(10) + value := utils.RandString(10) + key2 := utils.RandString(10) + testDB.Exec(nil, utils.ToCmdLine("set", key, value)) + result := testDB.Exec(nil, utils.ToCmdLine("ExistIn", key, key2)) + asserts.AssertMultiBulkReply(t, result, []string{key}) + key3 := utils.RandString(10) + result = testDB.Exec(nil, utils.ToCmdLine("ExistIn", key2, key3)) + asserts.AssertMultiBulkReplySize(t, result, 0) +} + func TestType(t *testing.T) { testDB.Flush() key := utils.RandString(10) diff --git a/interface/database/db.go b/interface/database/db.go index f39c9a7..98d866d 100644 --- a/interface/database/db.go +++ b/interface/database/db.go @@ -10,7 +10,7 @@ type CmdLine = [][]byte // DB is the interface for redis style storage engine type DB interface { - Exec(client redis.Connection, args [][]byte) redis.Reply + Exec(client redis.Connection, cmdLine [][]byte) redis.Reply AfterClientClose(c redis.Connection) Close() } @@ -18,7 +18,7 @@ type DB interface { // EmbedDB is the embedding storage engine exposing more methods for complex application type EmbedDB interface { DB - ExecWithLock(conn redis.Connection, args [][]byte) redis.Reply + ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool)