support multi commands transaction in cluster mode

This commit is contained in:
hdt3213
2021-06-20 09:07:16 +08:00
parent ef5639f40b
commit 5d05e2e613
14 changed files with 313 additions and 35 deletions

View File

@@ -15,15 +15,18 @@ middleware using golang.
Key Features: Key Features:
- support string, list, hash, set, sorted set - Support string, list, hash, set, sorted set
- ttl - TTL
- publish/subscribe - Publish/Subscribe
- geo - GEO
- aof and aof rewrite - AOF and AOF Rewrite
- Transaction. The `multi` command is Atomic and Isolated. If any errors are encountered during execution, godis will rollback the executed commands - MULTI Commands Transaction is Atomic and Isolated. If any errors are encountered during execution, godis will rollback the executed commands
- server side cluster which is transparent to client. You can connect to any node in the cluster to - Server-side Cluster which is transparent to client. You can connect to any node in the cluster to
access all data in the cluster. access all data in the cluster.
- a concurrent core, so you don't have to worry about your commands blocking the server too much. - `MSET`, `DEL` command is supported and atomically executed in cluster mode
- `Rename`, `RenameNX` command is supported within slot in cluster mode
- MULTI Commands Transaction is supported within slot in cluster mode
- Concurrent Core, so you don't have to worry about your commands blocking the server too much.
If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html). If you could read Chinese, you can find more details in [My Blog](https://www.cnblogs.com/Finley/category/1598973.html).

View File

@@ -15,8 +15,11 @@ Godis 是一个用 Go 语言实现的 Redis 服务器。本项目旨在为尝试
- 发布订阅 - 发布订阅
- 地理位置 - 地理位置
- AOF 持久化及AOF重写 - AOF 持久化及AOF重写
- 事务. Multi 命令开启的事务具有`原子性``隔离性`. 若在执行过程中遇到错误, godis 会回滚已执行的命令 - Multi 命令开启的事务具有`原子性``隔离性`. 若在执行过程中遇到错误, godis 会回滚已执行的命令
- 内置集群模式. 集群对客户端是透明的, 您可以像使用单机版 redis 一样使用 godis 集群 - 内置集群模式. 集群对客户端是透明的, 您可以像使用单机版 redis 一样使用 godis 集群
- `MSET`, `DEL` 命令在集群模式下原子性执行
- `Rename`, `RenameNX` 命令在集群模式下支持在同一个 slot 内执行
- Multi 命令开启的事务在集群模式下支持在同一个 slot 内执行
- 并行引擎, 无需担心您的操作会阻塞整个服务器. - 并行引擎, 无需担心您的操作会阻塞整个服务器.
可以在[我的博客](https://www.cnblogs.com/Finley/category/1598973.html)了解更多关于 可以在[我的博客](https://www.cnblogs.com/Finley/category/1598973.html)了解更多关于

View File

@@ -91,25 +91,45 @@ func isAuthenticated(c redis.Connection) bool {
} }
// Exec executes command on cluster // Exec executes command on cluster
func (cluster *Cluster) Exec(c redis.Connection, cmdArgs [][]byte) (result redis.Reply) { func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack()))) logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &reply.UnknownErrReply{} result = &reply.UnknownErrReply{}
} }
}() }()
cmd := strings.ToLower(string(cmdArgs[0])) cmdName := strings.ToLower(string(cmdLine[0]))
if cmd == "auth" { if cmdName == "auth" {
return godis.Auth(cluster.db, c, cmdArgs[1:]) return godis.Auth(cluster.db, c, cmdLine[1:])
} }
if !isAuthenticated(c) { if !isAuthenticated(c) {
return reply.MakeErrReply("NOAUTH Authentication required") return reply.MakeErrReply("NOAUTH Authentication required")
} }
cmdFunc, ok := router[cmd]
if !ok { if cmdName == "multi" {
return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode") if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
}
return godis.StartMulti(cluster.db, c)
} else if cmdName == "discard" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
}
return godis.DiscardMulti(cluster.db, c)
} else if cmdName == "exec" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
}
return execMulti(cluster, c, nil)
} }
result = cmdFunc(cluster, c, cmdArgs) if c != nil && c.InMultiState() {
return godis.EnqueueCmd(cluster.db, c, cmdLine)
}
cmdFunc, ok := router[cmdName]
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode")
}
result = cmdFunc(cluster, c, cmdLine)
return return
} }

95
cluster/multi.go Normal file
View File

@@ -0,0 +1,95 @@
package cluster
import (
"github.com/hdt3213/godis"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/reply"
)
const relayMulti = "_multi"
var relayMultiBytes = []byte(relayMulti)
// cmdLine == []string{"exec"}
func execMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if !conn.InMultiState() {
return reply.MakeErrReply("ERR EXEC without MULTI")
}
defer conn.SetMultiState(false)
cmdLines := conn.GetQueuedCmdLine()
// analysis related keys
keys := make([]string, 0) // may contains duplicate
for _, cl := range cmdLines {
wKeys, rKeys := cluster.db.GetRelatedKeys(cl)
keys = append(keys, wKeys...)
keys = append(keys, rKeys...)
}
if len(keys) == 0 {
// empty transaction or only `PING`s
return godis.ExecMulti(cluster.db, cmdLines)
}
groupMap := cluster.groupBy(keys)
if len(groupMap) > 1 {
return reply.MakeErrReply("ERR MULTI commands transaction must within one slot in cluster mode")
}
var peer string
// assert len(groupMap) == 1
for p := range groupMap {
peer = p
}
// out parser not support reply.MultiRawReply, so we have to encode it
if peer == cluster.self {
return godis.ExecMulti(cluster.db, cmdLines)
}
return execMultiOnOtherNode(cluster, conn, peer, cmdLines)
}
func execMultiOnOtherNode(cluster *Cluster, conn redis.Connection, peer string, cmdLines []CmdLine) redis.Reply {
defer func() {
conn.ClearQueuedCmds()
conn.SetMultiState(false)
}()
relayCmdLine := [][]byte{ // relay it to executing node
relayMultiBytes,
}
relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...)
rawRelayResult := cluster.relay(peer, conn, relayCmdLine)
if reply.IsErrorReply(rawRelayResult) {
return rawRelayResult
}
relayResult, ok := rawRelayResult.(*reply.MultiBulkReply)
if !ok {
return reply.MakeErrReply("execute failed")
}
rep, err := parseEncodedMultiRawReply(relayResult.Args)
if err != nil {
return reply.MakeErrReply(err.Error())
}
return rep
}
// execRelayedMulti execute relayed multi commands transaction
// cmdLine format: _multi base64ed-cmdLine
// result format: base64ed-reply list
func execRelayedMulti(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
decoded, err := parseEncodedMultiRawReply(cmdLine[1:])
if err != nil {
return reply.MakeErrReply(err.Error())
}
var cmdLines []CmdLine
for _, rep := range decoded.Replies {
mbr, ok := rep.(*reply.MultiBulkReply)
if !ok {
return reply.MakeErrReply("exec failed")
}
cmdLines = append(cmdLines, mbr.Args)
}
rawResult := godis.ExecMulti(cluster.db, cmdLines)
resultMBR, ok := rawResult.(*reply.MultiRawReply)
if !ok {
return reply.MakeErrReply("exec failed")
}
return encodeMultiRawReply(resultMBR)
}

47
cluster/multi_helper.go Normal file
View File

@@ -0,0 +1,47 @@
package cluster
import (
"bytes"
"encoding/base64"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/reply"
)
func encodeCmdLine(cmdLines []CmdLine) [][]byte {
var result [][]byte
for _, line := range cmdLines {
raw := reply.MakeMultiBulkReply(line).ToBytes()
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(encoded, raw)
result = append(result, encoded)
}
return result
}
func parseEncodedMultiRawReply(args [][]byte) (*reply.MultiRawReply, error) {
cmdBuf := new(bytes.Buffer)
for _, arg := range args {
dbuf := make([]byte, base64.StdEncoding.DecodedLen(len(arg)))
n, err := base64.StdEncoding.Decode(dbuf, arg)
if err != nil {
continue
}
cmdBuf.Write(dbuf[:n])
}
cmds, err := parser.ParseBytes(cmdBuf.Bytes())
if err != nil {
return nil, reply.MakeErrReply(err.Error())
}
return reply.MakeMultiRawReply(cmds), nil
}
func encodeMultiRawReply(src *reply.MultiRawReply) *reply.MultiBulkReply {
args := make([][]byte, 0, len(src.Replies))
for _, rep := range src.Replies {
raw := rep.ToBytes()
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(raw)))
base64.StdEncoding.Encode(encoded, raw)
args = append(args, encoded)
}
return reply.MakeMultiBulkReply(args)
}

73
cluster/multi_test.go Normal file
View File

@@ -0,0 +1,73 @@
package cluster
import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/reply/asserts"
"testing"
)
func TestMultiExecOnSelf(t *testing.T) {
testCluster.db.Flush()
conn := new(connection.FakeConn)
result := testCluster.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
key := utils.RandString(10)
value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("set", key, value))
key2 := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("rpush", key2, value))
result = testCluster.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key))
asserts.AssertBulkReply(t, result, value)
result = testCluster.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1"))
asserts.AssertMultiBulkReply(t, result, []string{value})
}
func TestEmptyMulti(t *testing.T) {
testCluster.db.Flush()
conn := new(connection.FakeConn)
result := testCluster.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("PING"))
asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("EXEC"))
asserts.AssertNotError(t, result)
mbr := result.(*reply.MultiRawReply)
asserts.AssertStatusReply(t, mbr.Replies[0], "PONG")
}
func TestMultiExecOnOthers(t *testing.T) {
testCluster.db.Flush()
conn := new(connection.FakeConn)
result := testCluster.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result)
key := utils.RandString(10)
value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("rpush", key, value))
testCluster.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1"))
cmdLines := conn.GetQueuedCmdLine()
relayCmdLine := [][]byte{ // relay it to executing node
relayMultiBytes,
}
relayCmdLine = append(relayCmdLine, encodeCmdLine(cmdLines)...)
rawRelayResult := execRelayedMulti(testCluster, conn, relayCmdLine)
if reply.IsErrorReply(rawRelayResult) {
t.Error()
}
relayResult, ok := rawRelayResult.(*reply.MultiBulkReply)
if !ok {
t.Error()
}
rep, err := parseEncodedMultiRawReply(relayResult.Args)
if err != nil {
t.Error()
}
if len(rep.Replies) != 2 {
t.Errorf("expect 2 replies actual %d", len(rep.Replies))
}
asserts.AssertMultiBulkReply(t, rep.Replies[1], []string{value})
}

View File

@@ -109,7 +109,7 @@ func makeRouter() map[string]CmdFunc {
routerMap["flushdb"] = FlushDB routerMap["flushdb"] = FlushDB
routerMap["flushall"] = FlushAll routerMap["flushall"] = FlushAll
//routerMap["writeKeys"] = Keys routerMap[relayMulti] = execRelayedMulti
return routerMap return routerMap
} }

View File

@@ -36,7 +36,7 @@ func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
return result return result
} }
if c != nil && c.InMultiState() { if c != nil && c.InMultiState() {
return enqueueCmd(db, c, cmdLine) return EnqueueCmd(db, c, cmdLine)
} }
// normal commands // normal commands
@@ -60,12 +60,12 @@ func execSpecialCmd(c redis.Connection, cmdLine [][]byte, cmdName string, db *DB
if len(cmdLine) != 1 { if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName), true return reply.MakeArgNumErrReply(cmdName), true
} }
return startMulti(db, c), true return StartMulti(db, c), true
} else if cmdName == "discard" { } else if cmdName == "discard" {
if len(cmdLine) != 1 { if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName), true return reply.MakeArgNumErrReply(cmdName), true
} }
return discardMulti(db, c), true return DiscardMulti(db, c), true
} else if cmdName == "exec" { } else if cmdName == "exec" {
if len(cmdLine) != 1 { if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName), true return reply.MakeArgNumErrReply(cmdName), true

6
geo.go
View File

@@ -82,12 +82,12 @@ func execGeoPos(db *DB, args [][]byte) redis.Reply {
return &reply.NullBulkReply{} return &reply.NullBulkReply{}
} }
positions := make([][]byte, len(args)-1) positions := make([]redis.Reply, len(args)-1)
for i := 0; i < len(args)-1; i++ { for i := 0; i < len(args)-1; i++ {
member := string(args[i+1]) member := string(args[i+1])
elem, exists := sortedSet.Get(member) elem, exists := sortedSet.Get(member)
if !exists { if !exists {
positions[i] = (&reply.EmptyMultiBulkReply{}).ToBytes() positions[i] = (&reply.EmptyMultiBulkReply{})
continue continue
} }
lat, lng := geohash.Decode(uint64(elem.Score)) lat, lng := geohash.Decode(uint64(elem.Score))
@@ -95,7 +95,7 @@ func execGeoPos(db *DB, args [][]byte) redis.Reply {
latStr := strconv.FormatFloat(lat, 'f', -1, 64) latStr := strconv.FormatFloat(lat, 'f', -1, 64)
positions[i] = reply.MakeMultiBulkReply([][]byte{ positions[i] = reply.MakeMultiBulkReply([][]byte{
[]byte(lngStr), []byte(latStr), []byte(lngStr), []byte(latStr),
}).ToBytes() })
} }
return reply.MakeMultiRawReply(positions) return reply.MakeMultiRawReply(positions)
} }

View File

@@ -1,12 +1,19 @@
package godis package godis
import ( import (
"github.com/hdt3213/godis/datastruct/set"
"github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/reply" "github.com/hdt3213/godis/redis/reply"
"strings" "strings"
) )
func startMulti(db *DB, conn redis.Connection) redis.Reply { var forbiddenInMulti = set.Make(
"flushdb",
"flushall",
)
// StartMulti starts multi-command-transaction
func StartMulti(db *DB, conn redis.Connection) redis.Reply {
if conn.InMultiState() { if conn.InMultiState() {
return reply.MakeErrReply("ERR MULTI calls can not be nested") return reply.MakeErrReply("ERR MULTI calls can not be nested")
} }
@@ -14,12 +21,16 @@ func startMulti(db *DB, conn redis.Connection) redis.Reply {
return reply.MakeOkReply() return reply.MakeOkReply()
} }
func enqueueCmd(db *DB, conn redis.Connection, cmdLine [][]byte) redis.Reply { // EnqueueCmd puts command line into `multi` pending queue
func EnqueueCmd(db *DB, conn redis.Connection, cmdLine [][]byte) redis.Reply {
cmdName := strings.ToLower(string(cmdLine[0])) cmdName := strings.ToLower(string(cmdLine[0]))
cmd, ok := cmdTable[cmdName] cmd, ok := cmdTable[cmdName]
if !ok { if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmdName + "'") return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
} }
if forbiddenInMulti.Has(cmdName) {
return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI")
}
if cmd.prepare == nil { if cmd.prepare == nil {
return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI") return reply.MakeErrReply("ERR command '" + cmdName + "' cannot be used in MULTI")
} }
@@ -37,7 +48,11 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply {
} }
defer conn.SetMultiState(false) defer conn.SetMultiState(false)
cmdLines := conn.GetQueuedCmdLine() cmdLines := conn.GetQueuedCmdLine()
return ExecMulti(db, cmdLines)
}
// ExecMulti executes multi commands transaction Atomically and Isolated
func ExecMulti(db *DB, cmdLines []CmdLine) redis.Reply {
// prepare // prepare
writeKeys := make([]string, 0) // may contains duplicate writeKeys := make([]string, 0) // may contains duplicate
readKeys := make([]string, 0) readKeys := make([]string, 0)
@@ -53,7 +68,7 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply {
defer db.RWUnLocks(writeKeys, readKeys) defer db.RWUnLocks(writeKeys, readKeys)
// execute // execute
results := make([][]byte, 0, len(cmdLines)) results := make([]redis.Reply, 0, len(cmdLines))
aborted := false aborted := false
undoCmdLines := make([][]CmdLine, 0, len(cmdLines)) undoCmdLines := make([][]CmdLine, 0, len(cmdLines))
for _, cmdLine := range cmdLines { for _, cmdLine := range cmdLines {
@@ -65,7 +80,7 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply {
undoCmdLines = undoCmdLines[:len(undoCmdLines)-1] undoCmdLines = undoCmdLines[:len(undoCmdLines)-1]
break break
} }
results = append(results, result.ToBytes()) results = append(results, result)
} }
if !aborted { if !aborted {
return reply.MakeMultiRawReply(results) return reply.MakeMultiRawReply(results)
@@ -84,7 +99,8 @@ func execMulti(db *DB, conn redis.Connection) redis.Reply {
return reply.MakeErrReply("EXECABORT Transaction discarded because of previous errors.") return reply.MakeErrReply("EXECABORT Transaction discarded because of previous errors.")
} }
func discardMulti(db *DB, conn redis.Connection) redis.Reply { // DiscardMulti drops MULTI pending commands
func DiscardMulti(db *DB, conn redis.Connection) redis.Reply {
if !conn.InMultiState() { if !conn.InMultiState() {
return reply.MakeErrReply("ERR DISCARD without MULTI") return reply.MakeErrReply("ERR DISCARD without MULTI")
} }

View File

@@ -26,6 +26,27 @@ func ParseStream(reader io.Reader) <-chan *Payload {
return ch return ch
} }
// ParseBytes reads data from []byte and return all replies
func ParseBytes(data []byte) ([]redis.Reply, error) {
ch := make(chan *Payload)
reader := bytes.NewReader(data)
go parse0(reader, ch)
var results []redis.Reply
for payload := range ch {
if payload == nil {
return nil, errors.New("no reply")
}
if payload.Err != nil {
if payload.Err == io.EOF {
break
}
return nil, payload.Err
}
results = append(results, payload.Data)
}
return results, nil
}
// ParseOne reads data from []byte and return the first payload // ParseOne reads data from []byte and return the first payload
func ParseOne(data []byte) (redis.Reply, error) { func ParseOne(data []byte) (redis.Reply, error) {
ch := make(chan *Payload) ch := make(chan *Payload)

View File

@@ -68,23 +68,23 @@ func (r *MultiBulkReply) ToBytes() []byte {
// MultiRawReply store complex list structure, for example GeoPos command // MultiRawReply store complex list structure, for example GeoPos command
type MultiRawReply struct { type MultiRawReply struct {
Args [][]byte Replies []redis.Reply
} }
// MakeMultiRawReply creates MultiRawReply // MakeMultiRawReply creates MultiRawReply
func MakeMultiRawReply(args [][]byte) *MultiRawReply { func MakeMultiRawReply(replies []redis.Reply) *MultiRawReply {
return &MultiRawReply{ return &MultiRawReply{
Args: args, Replies: replies,
} }
} }
// ToBytes marshal redis.Reply // ToBytes marshal redis.Reply
func (r *MultiRawReply) ToBytes() []byte { func (r *MultiRawReply) ToBytes() []byte {
argLen := len(r.Args) argLen := len(r.Replies)
var buf bytes.Buffer var buf bytes.Buffer
buf.WriteString("*" + strconv.Itoa(argLen) + CRLF) buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
for _, arg := range r.Args { for _, arg := range r.Replies {
buf.Write(arg) buf.Write(arg.ToBytes())
} }
return buf.Bytes() return buf.Bytes()
} }