diff --git a/README.md b/README.md index 5bc8046..50df8e6 100644 --- a/README.md +++ b/README.md @@ -15,15 +15,18 @@ middleware using golang. Key Features: -- support string, list, hash, set, sorted set -- ttl -- publish/subscribe -- geo -- aof and aof rewrite -- Transaction. The `multi` command is Atomic and Isolated. If any errors are encountered during execution, godis will rollback the executed commands -- server side cluster which is transparent to client. You can connect to any node in the cluster to +- Support string, list, hash, set, sorted set +- TTL +- Publish/Subscribe +- GEO +- AOF and AOF Rewrite +- MULTI Commands Transaction is Atomic and Isolated. If any errors are encountered during execution, godis will rollback the executed commands +- Server-side Cluster which is transparent to client. You can connect to any node in the cluster to access all data in the cluster. -- a concurrent core, so you don't have to worry about your commands blocking the server too much. + - `MSET`, `DEL` command is supported and atomically executed in cluster mode + - `Rename`, `RenameNX` command is supported within slot in cluster mode + - MULTI Commands Transaction is supported within slot in cluster mode +- Concurrent Core, so you don't have to worry about your commands blocking the server too much. If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html). diff --git a/README_CN.md b/README_CN.md index 37b4f7e..7c83203 100644 --- a/README_CN.md +++ b/README_CN.md @@ -15,8 +15,11 @@ Godis 是一个用 Go 语言实现的 Redis 服务器。本项目旨在为尝试 - 发布订阅 - 地理位置 - AOF 持久化及AOF重写 -- 事务. Multi 命令开启的事务具有`原子性`和`隔离性`. 若在执行过程中遇到错误, godis 会回滚已执行的命令 +- Multi 命令开启的事务具有`原子性`和`隔离性`. 若在执行过程中遇到错误, godis 会回滚已执行的命令 - 内置集群模式. 集群对客户端是透明的, 您可以像使用单机版 redis 一样使用 godis 集群 + - `MSET`, `DEL` 命令在集群模式下原子性执行 + - `Rename`, `RenameNX` 命令在集群模式下支持在同一个 slot 内执行 + - Multi 命令开启的事务在集群模式下支持在同一个 slot 内执行 - 并行引擎, 无需担心您的操作会阻塞整个服务器. 可以在[我的博客](https://www.cnblogs.com/Finley/category/1598973.html)了解更多关于 diff --git a/cluster/cluster.go b/cluster/cluster.go index f0e89e3..83ed7bf 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -91,25 +91,45 @@ func isAuthenticated(c redis.Connection) bool { } // Exec executes command on cluster -func (cluster *Cluster) Exec(c redis.Connection, cmdArgs [][]byte) (result redis.Reply) { +func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) { defer func() { if err := recover(); err != nil { logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) result = &reply.UnknownErrReply{} } }() - cmd := strings.ToLower(string(cmdArgs[0])) - if cmd == "auth" { - return godis.Auth(cluster.db, c, cmdArgs[1:]) + cmdName := strings.ToLower(string(cmdLine[0])) + if cmdName == "auth" { + return godis.Auth(cluster.db, c, cmdLine[1:]) } if !isAuthenticated(c) { return reply.MakeErrReply("NOAUTH Authentication required") } - cmdFunc, ok := router[cmd] - if !ok { - return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode") + + if cmdName == "multi" { + if len(cmdLine) != 1 { + return reply.MakeArgNumErrReply(cmdName) + } + return godis.StartMulti(cluster.db, c) + } else if cmdName == "discard" { + if len(cmdLine) != 1 { + return reply.MakeArgNumErrReply(cmdName) + } + return godis.DiscardMulti(cluster.db, c) + } else if cmdName == "exec" { + if len(cmdLine) != 1 { + return reply.MakeArgNumErrReply(cmdName) + } + return execMulti(cluster, c, nil) } - result = cmdFunc(cluster, c, cmdArgs) + if c != nil && c.InMultiState() { + return godis.EnqueueCmd(cluster.db, c, cmdLine) + } + cmdFunc, ok := router[cmdName] + if !ok { + return reply.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode") + } + result = cmdFunc(cluster, c, cmdLine) return } diff --git a/cluster/multi.go b/cluster/multi.go new file mode 100644 index 0000000..c9e8ce8 --- /dev/null +++ b/cluster/multi.go @@ -0,0 +1,95 @@ +package cluster + +import ( + "github.com/hdt3213/godis" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/redis/reply" +) + +const relayMulti = "_multi" + +var relayMultiBytes = []byte(relayMulti) + +// cmdLine == []string{"exec"} +func execMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + if !conn.InMultiState() { + return reply.MakeErrReply("ERR EXEC without MULTI") + } + defer conn.SetMultiState(false) + cmdLines := conn.GetQueuedCmdLine() + + // analysis related keys + keys := make([]string, 0) // may contains duplicate + for _, cl := range cmdLines { + wKeys, rKeys := cluster.db.GetRelatedKeys(cl) + keys = append(keys, wKeys...) + keys = append(keys, rKeys...) + } + if len(keys) == 0 { + // empty transaction or only `PING`s + return godis.ExecMulti(cluster.db, cmdLines) + } + groupMap := cluster.groupBy(keys) + if len(groupMap) > 1 { + return reply.MakeErrReply("ERR MULTI commands transaction must within one slot in cluster mode") + } + var peer string + // assert len(groupMap) == 1 + for p := range groupMap { + peer = p + } + + // out parser not support reply.MultiRawReply, so we have to encode it + if peer == cluster.self { + return godis.ExecMulti(cluster.db, cmdLines) + } + return execMultiOnOtherNode(cluster, conn, peer, cmdLines) +} + +func execMultiOnOtherNode(cluster *Cluster, conn redis.Connection, peer string, cmdLines []CmdLine) redis.Reply { + defer func() { + conn.ClearQueuedCmds() + conn.SetMultiState(false) + }() + relayCmdLine := [][]byte{ // relay it to executing node + relayMultiBytes, + } + relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...) + rawRelayResult := cluster.relay(peer, conn, relayCmdLine) + if reply.IsErrorReply(rawRelayResult) { + return rawRelayResult + } + relayResult, ok := rawRelayResult.(*reply.MultiBulkReply) + if !ok { + return reply.MakeErrReply("execute failed") + } + rep, err := parseEncodedMultiRawReply(relayResult.Args) + if err != nil { + return reply.MakeErrReply(err.Error()) + } + return rep +} + +// execRelayedMulti execute relayed multi commands transaction +// cmdLine format: _multi base64ed-cmdLine +// result format: base64ed-reply list +func execRelayedMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply { + decoded, err := parseEncodedMultiRawReply(cmdLine[1:]) + if err != nil { + return reply.MakeErrReply(err.Error()) + } + var cmdLines []CmdLine + for _, rep := range decoded.Replies { + mbr, ok := rep.(*reply.MultiBulkReply) + if !ok { + return reply.MakeErrReply("exec failed") + } + cmdLines = append(cmdLines, mbr.Args) + } + rawResult := godis.ExecMulti(cluster.db, cmdLines) + resultMBR, ok := rawResult.(*reply.MultiRawReply) + if !ok { + return reply.MakeErrReply("exec failed") + } + return encodeMultiRawReply(resultMBR) +} diff --git a/cluster/multi_helper.go b/cluster/multi_helper.go new file mode 100644 index 0000000..10a7367 --- /dev/null +++ b/cluster/multi_helper.go @@ -0,0 +1,47 @@ +package cluster + +import ( + "bytes" + "encoding/base64" + "github.com/hdt3213/godis/redis/parser" + "github.com/hdt3213/godis/redis/reply" +) + +func encodeCmdLine(cmdLines []CmdLine) [][]byte { + var result [][]byte + for _, line := range cmdLines { + raw := reply.MakeMultiBulkReply(line).ToBytes() + encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw))) + base64.StdEncoding.Encode(encoded, raw) + result = append(result, encoded) + } + return result +} + +func parseEncodedMultiRawReply(args [][]byte) (*reply.MultiRawReply, error) { + cmdBuf := new(bytes.Buffer) + for _, arg := range args { + dbuf := make([]byte, base64.StdEncoding.DecodedLen(len(arg))) + n, err := base64.StdEncoding.Decode(dbuf, arg) + if err != nil { + continue + } + cmdBuf.Write(dbuf[:n]) + } + cmds, err := parser.ParseBytes(cmdBuf.Bytes()) + if err != nil { + return nil, reply.MakeErrReply(err.Error()) + } + return reply.MakeMultiRawReply(cmds), nil +} + +func encodeMultiRawReply(src *reply.MultiRawReply) *reply.MultiBulkReply { + args := make([][]byte, 0, len(src.Replies)) + for _, rep := range src.Replies { + raw := rep.ToBytes() + encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw))) + base64.StdEncoding.Encode(encoded, raw) + args = append(args, encoded) + } + return reply.MakeMultiBulkReply(args) +} diff --git a/cluster/multi_test.go b/cluster/multi_test.go new file mode 100644 index 0000000..92155b3 --- /dev/null +++ b/cluster/multi_test.go @@ -0,0 +1,73 @@ +package cluster + +import ( + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/reply" + "github.com/hdt3213/godis/redis/reply/asserts" + "testing" +) + +func TestMultiExecOnSelf(t *testing.T) { + testCluster.db.Flush() + conn := new(connection.FakeConn) + result := testCluster.Exec(conn, toArgs("MULTI")) + asserts.AssertNotError(t, result) + key := utils.RandString(10) + value := utils.RandString(10) + testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) + key2 := utils.RandString(10) + testCluster.Exec(conn, utils.ToCmdLine("rpush", key2, value)) + result = testCluster.Exec(conn, utils.ToCmdLine("exec")) + asserts.AssertNotError(t, result) + result = testCluster.Exec(conn, utils.ToCmdLine("get", key)) + asserts.AssertBulkReply(t, result, value) + result = testCluster.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1")) + asserts.AssertMultiBulkReply(t, result, []string{value}) +} + +func TestEmptyMulti(t *testing.T) { + testCluster.db.Flush() + conn := new(connection.FakeConn) + result := testCluster.Exec(conn, toArgs("MULTI")) + asserts.AssertNotError(t, result) + result = testCluster.Exec(conn, utils.ToCmdLine("PING")) + asserts.AssertNotError(t, result) + result = testCluster.Exec(conn, utils.ToCmdLine("EXEC")) + asserts.AssertNotError(t, result) + mbr := result.(*reply.MultiRawReply) + asserts.AssertStatusReply(t, mbr.Replies[0], "PONG") +} + +func TestMultiExecOnOthers(t *testing.T) { + testCluster.db.Flush() + conn := new(connection.FakeConn) + result := testCluster.Exec(conn, toArgs("MULTI")) + asserts.AssertNotError(t, result) + key := utils.RandString(10) + value := utils.RandString(10) + testCluster.Exec(conn, utils.ToCmdLine("rpush", key, value)) + testCluster.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1")) + + cmdLines := conn.GetQueuedCmdLine() + relayCmdLine := [][]byte{ // relay it to executing node + relayMultiBytes, + } + relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...) + rawRelayResult := execRelayedMulti(testCluster, conn, relayCmdLine) + if reply.IsErrorReply(rawRelayResult) { + t.Error() + } + relayResult, ok := rawRelayResult.(*reply.MultiBulkReply) + if !ok { + t.Error() + } + rep, err := parseEncodedMultiRawReply(relayResult.Args) + if err != nil { + t.Error() + } + if len(rep.Replies) != 2 { + t.Errorf("expect 2 replies actual %d", len(rep.Replies)) + } + asserts.AssertMultiBulkReply(t, rep.Replies[1], []string{value}) +} diff --git a/cluster/router.go b/cluster/router.go index 249c6cf..918a812 100644 --- a/cluster/router.go +++ b/cluster/router.go @@ -109,7 +109,7 @@ func makeRouter() map[string]CmdFunc { routerMap["flushdb"] = FlushDB routerMap["flushall"] = FlushAll - //routerMap["writeKeys"] = Keys + routerMap[relayMulti] = execRelayedMulti return routerMap } diff --git a/cluster/transaction.go b/cluster/tcc.go similarity index 100% rename from cluster/transaction.go rename to cluster/tcc.go diff --git a/cluster/transaction_test.go b/cluster/tcc_test.go similarity index 100% rename from cluster/transaction_test.go rename to cluster/tcc_test.go diff --git a/exec.go b/exec.go index 7cc1d15..0298e49 100644 --- a/exec.go +++ b/exec.go @@ -36,7 +36,7 @@ func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) { return result } if c != nil && c.InMultiState() { - return enqueueCmd(db, c, cmdLine) + return EnqueueCmd(db, c, cmdLine) } // normal commands @@ -60,12 +60,12 @@ func execSpecialCmd(c redis.Connection, cmdLine [][]byte, cmdName string, db *DB if len(cmdLine) != 1 { return reply.MakeArgNumErrReply(cmdName), true } - return startMulti(db, c), true + return StartMulti(db, c), true } else if cmdName == "discard" { if len(cmdLine) != 1 { return reply.MakeArgNumErrReply(cmdName), true } - return discardMulti(db, c), true + return DiscardMulti(db, c), true } else if cmdName == "exec" { if len(cmdLine) != 1 { return reply.MakeArgNumErrReply(cmdName), true diff --git a/geo.go b/geo.go index 62a8f68..bf1192d 100644 --- a/geo.go +++ b/geo.go @@ -82,12 +82,12 @@ func execGeoPos(db *DB, args [][]byte) redis.Reply { return &reply.NullBulkReply{} } - positions := make([][]byte, len(args)-1) + positions := make([]redis.Reply, len(args)-1) for i := 0; i < len(args)-1; i++ { member := string(args[i+1]) elem, exists := sortedSet.Get(member) if !exists { - positions[i] = (&reply.EmptyMultiBulkReply{}).ToBytes() + positions[i] = (&reply.EmptyMultiBulkReply{}) continue } lat, lng := geohash.Decode(uint64(elem.Score)) @@ -95,7 +95,7 @@ func execGeoPos(db *DB, args [][]byte) redis.Reply { latStr := strconv.FormatFloat(lat, 'f', -1, 64) positions[i] = reply.MakeMultiBulkReply([][]byte{ []byte(lngStr), []byte(latStr), - }).ToBytes() + }) } return reply.MakeMultiRawReply(positions) } diff --git a/multi.go b/multi.go index de47791..1fb0716 100644 --- a/multi.go +++ b/multi.go @@ -1,12 +1,19 @@ package godis import ( + "github.com/hdt3213/godis/datastruct/set" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/redis/reply" "strings" ) -func startMulti(db *DB, conn redis.Connection) redis.Reply { +var forbiddenInMulti = set.Make( + "flushdb", + "flushall", +) + +// StartMulti starts multi-command-transaction +func StartMulti(db *DB, conn redis.Connection) redis.Reply { if conn.InMultiState() { return reply.MakeErrReply("ERR MULTI calls can not be nested") } @@ -14,12 +21,16 @@ func startMulti(db *DB, conn redis.Connection) redis.Reply { return reply.MakeOkReply() } -func enqueueCmd(db *DB, conn redis.Connection, cmdLine [][]byte) redis.Reply { +// EnqueueCmd puts command line into `multi` pending queue +func EnqueueCmd(db *DB, conn redis.Connection, cmdLine [][]byte) redis.Reply { cmdName := strings.ToLower(string(cmdLine[0])) cmd, ok := cmdTable[cmdName] if !ok { return reply.MakeErrReply("ERR unknown command '" + cmdName + "'") } + if forbiddenInMulti.Has(cmdName) { + return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI") + } if cmd.prepare == nil { return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI") } @@ -37,7 +48,11 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply { } defer conn.SetMultiState(false) cmdLines := conn.GetQueuedCmdLine() + return ExecMulti(db, cmdLines) +} +// ExecMulti executes multi commands transaction Atomically and Isolated +func ExecMulti(db *DB, cmdLines []CmdLine) redis.Reply { // prepare writeKeys := make([]string, 0) // may contains duplicate readKeys := make([]string, 0) @@ -53,7 +68,7 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply { defer db.RWUnLocks(writeKeys, readKeys) // execute - results := make([][]byte, 0, len(cmdLines)) + results := make([]redis.Reply, 0, len(cmdLines)) aborted := false undoCmdLines := make([][]CmdLine, 0, len(cmdLines)) for _, cmdLine := range cmdLines { @@ -65,7 +80,7 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply { undoCmdLines = undoCmdLines[:len(undoCmdLines)-1] break } - results = append(results, result.ToBytes()) + results = append(results, result) } if !aborted { return reply.MakeMultiRawReply(results) @@ -84,7 +99,8 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply { return reply.MakeErrReply("EXECABORT Transaction discarded because of previous errors.") } -func discardMulti(db *DB, conn redis.Connection) redis.Reply { +// DiscardMulti drops MULTI pending commands +func DiscardMulti(db *DB, conn redis.Connection) redis.Reply { if !conn.InMultiState() { return reply.MakeErrReply("ERR DISCARD without MULTI") } diff --git a/redis/parser/parser.go b/redis/parser/parser.go index a4691f5..a268a76 100644 --- a/redis/parser/parser.go +++ b/redis/parser/parser.go @@ -26,6 +26,27 @@ func ParseStream(reader io.Reader) <-chan *Payload { return ch } +// ParseBytes reads data from []byte and return all replies +func ParseBytes(data []byte) ([]redis.Reply, error) { + ch := make(chan *Payload) + reader := bytes.NewReader(data) + go parse0(reader, ch) + var results []redis.Reply + for payload := range ch { + if payload == nil { + return nil, errors.New("no reply") + } + if payload.Err != nil { + if payload.Err == io.EOF { + break + } + return nil, payload.Err + } + results = append(results, payload.Data) + } + return results, nil +} + // ParseOne reads data from []byte and return the first payload func ParseOne(data []byte) (redis.Reply, error) { ch := make(chan *Payload) diff --git a/redis/reply/reply.go b/redis/reply/reply.go index c9c09da..d8c9500 100644 --- a/redis/reply/reply.go +++ b/redis/reply/reply.go @@ -68,23 +68,23 @@ func (r *MultiBulkReply) ToBytes() []byte { // MultiRawReply store complex list structure, for example GeoPos command type MultiRawReply struct { - Args [][]byte + Replies []redis.Reply } // MakeMultiRawReply creates MultiRawReply -func MakeMultiRawReply(args [][]byte) *MultiRawReply { +func MakeMultiRawReply(replies []redis.Reply) *MultiRawReply { return &MultiRawReply{ - Args: args, + Replies: replies, } } // ToBytes marshal redis.Reply func (r *MultiRawReply) ToBytes() []byte { - argLen := len(r.Args) + argLen := len(r.Replies) var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) - for _, arg := range r.Args { - buf.Write(arg) + for _, arg := range r.Replies { + buf.Write(arg.ToBytes()) } return buf.Bytes() }