support mget in cluster

This commit is contained in:
finley
2025-03-13 23:09:27 +08:00
parent 360de97479
commit 5285717a2d
4 changed files with 152 additions and 64 deletions

View File

@@ -1,2 +0,0 @@
package commands

View File

@@ -10,22 +10,8 @@ import (
func init() {
core.RegisterCmd("mset_", execMSetInLocal)
core.RegisterCmd("mset", execMSet)
}
type CmdLine = [][]byte
// node -> keys on the node
type RouteMap map[string][]string
func getRouteMap(cluster *core.Cluster, keys []string) RouteMap {
m := make(RouteMap)
for _, key := range keys {
slot := cluster.GetSlot(key)
node := cluster.PickNode(slot)
m[node] = append(m[node], key)
}
return m
core.RegisterCmd("mget_", execMGetInLocal)
core.RegisterCmd("mget", execMGet)
}
// execMSetInLocal executes msets in local node
@@ -37,13 +23,26 @@ func execMSetInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine)
return cluster.LocalExec(c, cmdLine)
}
// execMSetInLocal executes msets in local node
func execMGetInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 2 {
return protocol.MakeArgNumErrReply("mget")
}
cmdLine[0] = []byte("mget")
return cluster.LocalExec(c, cmdLine)
}
func execMSet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 3 || len(cmdLine)%2 != 1 {
return protocol.MakeArgNumErrReply("mset")
}
var keys []string
keyValues := make(map[string][]byte)
for i := 1; i < len(cmdLine); i += 2 {
keys = append(keys, string(cmdLine[i]))
key := string(cmdLine[i])
value := cmdLine[i+1]
keyValues[key] = value
keys = append(keys, key)
}
routeMap := getRouteMap(cluster, keys)
if len(routeMap) == 1 {
@@ -53,56 +52,77 @@ func execMSet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.
return cluster.Relay(node, c, cmdLine)
}
}
return execMSetSlow(cluster, c, cmdLine, routeMap)
}
func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap RouteMap) {
rollbackCmd := utils.ToCmdLine("rollback", txId)
for node := range routeMap {
cluster.Relay(node, c, rollbackCmd)
}
}
// execMSetSlow execute mset through tcc
func execMSetSlow(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine, routeMap RouteMap) redis.Reply {
txId := utils.RandString(6)
keyValues := make(map[string][]byte)
for i := 1; i < len(cmdLine); i += 2 {
key := string(cmdLine[i])
value := cmdLine[i+1]
keyValues[key] = value
}
// make prepare requests
nodePrepareCmdMap := make(map[string]CmdLine)
// tcc
cmdLineMap := make(map[string]CmdLine)
for node, keys := range routeMap {
prepareCmd := utils.ToCmdLine("prepare", txId, "mset")
nodeCmdLine := utils.ToCmdLine("mset")
for _, key := range keys {
value := keyValues[key]
prepareCmd = append(prepareCmd, []byte(key), value)
val := keyValues[key]
nodeCmdLine = append(nodeCmdLine, []byte(key), val)
}
nodePrepareCmdMap[node] = prepareCmd
cmdLineMap[node] = nodeCmdLine
}
// send prepare request
for node, prepareCmd := range nodePrepareCmdMap {
reply := cluster.Relay(node, c, prepareCmd)
if protocol.IsErrorReply(reply) {
requestRollback(cluster, c, txId, routeMap)
return protocol.MakeErrReply("prepare failed")
}
tx := &TccTx{
rawCmdLine: cmdLine,
routeMap: routeMap,
cmdLines: cmdLineMap,
}
// send commit request
commiteCmd := utils.ToCmdLine("commit", txId)
for node := range nodePrepareCmdMap {
reply := cluster.Relay(node, c, commiteCmd)
if protocol.IsErrorReply(reply) {
requestRollback(cluster, c, txId, routeMap)
return protocol.MakeErrReply("commit failed")
}
_, err := doTcc(cluster, c, tx)
if err != nil {
return err
}
return protocol.MakeOkReply()
}
func execMGet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) < 2 {
return protocol.MakeArgNumErrReply("mget")
}
keys := make([]string, 0, len(cmdLine)-1)
for i := 1; i < len(cmdLine); i++ {
keys = append(keys, string(cmdLine[i]))
}
routeMap := getRouteMap(cluster, keys)
if len(routeMap) == 1 {
// only one node, do it fast
for node := range routeMap {
cmdLine[0] = []byte("mget_")
return cluster.Relay(node, c, cmdLine)
}
}
// tcc
cmdLineMap := make(map[string]CmdLine)
for node, keys := range routeMap {
cmdLineMap[node] = utils.ToCmdLine2("mget", keys...)
}
tx := &TccTx{
rawCmdLine: cmdLine,
routeMap: routeMap,
cmdLines: cmdLineMap,
}
nodeResults, err := doTcc(cluster, c, tx)
if err != nil {
return err
}
keyValues := make(map[string][]byte, len(cmdLine)-1)
for node, ret := range nodeResults {
nodeCmdLine := cmdLineMap[node]
result := ret.(*protocol.MultiBulkReply)
if len(result.Args) != len(nodeCmdLine) - 1 {
return protocol.MakeErrReply("wrong response from node " + node)
}
for i := 1; i < len(nodeCmdLine); i++ {
key := string(nodeCmdLine[i])
value := result.Args[i-1]
keyValues[key] = value
}
}
result := make([][]byte, 0, len(cmdLine)-1)
for i := 1; i < len(cmdLine); i++ {
value := keyValues[string(cmdLine[i])]
result = append(result, value)
}
return protocol.MakeMultiBulkReply(result)
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/hdt3213/godis/cluster/core"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
)
func TestMset(t *testing.T) {
@@ -16,5 +17,7 @@ func TestMset(t *testing.T) {
c := connection.NewFakeConn()
// 1, 2 will be routed to node1 and node2, see MakeTestCluster
res := execMSet(node1, c, utils.ToCmdLine("mset", "1", "1", "2", "2"))
println(res)
asserts.AssertStatusReply(t, res, "OK")
res2 := execMGet(node1, c, utils.ToCmdLine("mget", "1", "2"))
asserts.AssertMultiBulkReply(t, res2, []string{"1", "2"})
}

View File

@@ -0,0 +1,67 @@
package commands
import (
"github.com/hdt3213/godis/cluster/core"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
)
type CmdLine = [][]byte
// node -> keys on the node
type RouteMap map[string][]string
func getRouteMap(cluster *core.Cluster, keys []string) RouteMap {
m := make(RouteMap)
for _, key := range keys {
slot := cluster.GetSlot(key)
node := cluster.PickNode(slot)
m[node] = append(m[node], key)
}
return m
}
type TccTx struct {
rawCmdLine CmdLine
routeMap RouteMap
cmdLines map[string]CmdLine // node -> CmdLine
}
// execute tcc
// returns node->result map
func doTcc(cluster *core.Cluster, c redis.Connection, tx *TccTx) (map[string]redis.Reply, protocol.ErrorReply) {
txId := utils.RandString(6)
// send prepare request
for node, cmdLine := range tx.cmdLines {
prepareCmd := utils.ToCmdLine("prepare", txId)
prepareCmd = append(prepareCmd, cmdLine...)
reply := cluster.Relay(node, c, prepareCmd)
if err := protocol.Try2ErrorReply(reply); err != nil {
requestRollback(cluster, c, txId, tx.routeMap)
return nil, protocol.MakeErrReply("prepare failed: " + err.Error())
}
}
// send commit request
commiteCmd := utils.ToCmdLine("commit", txId)
result := make(map[string]redis.Reply)
for node := range tx.routeMap {
reply := cluster.Relay(node, c, commiteCmd)
if err := protocol.Try2ErrorReply(reply); err != nil {
requestRollback(cluster, c, txId, tx.routeMap)
return nil, protocol.MakeErrReply("commit failed: " + err.Error())
}
result[node] = reply
}
return result, nil
}
func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap RouteMap) {
rollbackCmd := utils.ToCmdLine("rollback", txId)
for node := range routeMap {
cluster.Relay(node, c, rollbackCmd)
}
}