From 5285717a2d8b78cd146ef262a206719982dfc74e Mon Sep 17 00:00:00 2001 From: finley Date: Thu, 13 Mar 2025 23:09:27 +0800 Subject: [PATCH] support mget in cluster --- cluster/commands/mget.go | 2 - cluster/commands/mset.go | 142 +++++++++++++++++++--------------- cluster/commands/mset_test.go | 5 +- cluster/commands/tcccore.go | 67 ++++++++++++++++ 4 files changed, 152 insertions(+), 64 deletions(-) delete mode 100644 cluster/commands/mget.go create mode 100644 cluster/commands/tcccore.go diff --git a/cluster/commands/mget.go b/cluster/commands/mget.go deleted file mode 100644 index 1f99e38..0000000 --- a/cluster/commands/mget.go +++ /dev/null @@ -1,2 +0,0 @@ -package commands - diff --git a/cluster/commands/mset.go b/cluster/commands/mset.go index e1fabc3..a4f381b 100644 --- a/cluster/commands/mset.go +++ b/cluster/commands/mset.go @@ -10,22 +10,8 @@ import ( func init() { core.RegisterCmd("mset_", execMSetInLocal) core.RegisterCmd("mset", execMSet) - -} - -type CmdLine = [][]byte - -// node -> keys on the node -type RouteMap map[string][]string - -func getRouteMap(cluster *core.Cluster, keys []string) RouteMap { - m := make(RouteMap) - for _, key := range keys { - slot := cluster.GetSlot(key) - node := cluster.PickNode(slot) - m[node] = append(m[node], key) - } - return m + core.RegisterCmd("mget_", execMGetInLocal) + core.RegisterCmd("mget", execMGet) } // execMSetInLocal executes msets in local node @@ -37,13 +23,26 @@ func execMSetInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) return cluster.LocalExec(c, cmdLine) } +// execMSetInLocal executes msets in local node +func execMGetInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) < 2 { + return protocol.MakeArgNumErrReply("mget") + } + cmdLine[0] = []byte("mget") + return cluster.LocalExec(c, cmdLine) +} + func execMSet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) < 3 || len(cmdLine)%2 != 1 { return protocol.MakeArgNumErrReply("mset") } var keys []string + keyValues := make(map[string][]byte) for i := 1; i < len(cmdLine); i += 2 { - keys = append(keys, string(cmdLine[i])) + key := string(cmdLine[i]) + value := cmdLine[i+1] + keyValues[key] = value + keys = append(keys, key) } routeMap := getRouteMap(cluster, keys) if len(routeMap) == 1 { @@ -53,56 +52,77 @@ func execMSet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis. return cluster.Relay(node, c, cmdLine) } } - return execMSetSlow(cluster, c, cmdLine, routeMap) -} -func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap RouteMap) { - rollbackCmd := utils.ToCmdLine("rollback", txId) - for node := range routeMap { - cluster.Relay(node, c, rollbackCmd) - } -} - -// execMSetSlow execute mset through tcc -func execMSetSlow(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine, routeMap RouteMap) redis.Reply { - txId := utils.RandString(6) - - keyValues := make(map[string][]byte) - for i := 1; i < len(cmdLine); i += 2 { - key := string(cmdLine[i]) - value := cmdLine[i+1] - keyValues[key] = value - } - - // make prepare requests - nodePrepareCmdMap := make(map[string]CmdLine) + // tcc + cmdLineMap := make(map[string]CmdLine) for node, keys := range routeMap { - prepareCmd := utils.ToCmdLine("prepare", txId, "mset") + nodeCmdLine := utils.ToCmdLine("mset") for _, key := range keys { - value := keyValues[key] - prepareCmd = append(prepareCmd, []byte(key), value) + val := keyValues[key] + nodeCmdLine = append(nodeCmdLine, []byte(key), val) } - nodePrepareCmdMap[node] = prepareCmd + cmdLineMap[node] = nodeCmdLine } - - // send prepare request - for node, prepareCmd := range nodePrepareCmdMap { - reply := cluster.Relay(node, c, prepareCmd) - if protocol.IsErrorReply(reply) { - requestRollback(cluster, c, txId, routeMap) - return protocol.MakeErrReply("prepare failed") - } + tx := &TccTx{ + rawCmdLine: cmdLine, + routeMap: routeMap, + cmdLines: cmdLineMap, } - - // send commit request - commiteCmd := utils.ToCmdLine("commit", txId) - for node := range nodePrepareCmdMap { - reply := cluster.Relay(node, c, commiteCmd) - if protocol.IsErrorReply(reply) { - requestRollback(cluster, c, txId, routeMap) - return protocol.MakeErrReply("commit failed") - } + _, err := doTcc(cluster, c, tx) + if err != nil { + return err } - return protocol.MakeOkReply() } + +func execMGet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) < 2 { + return protocol.MakeArgNumErrReply("mget") + } + keys := make([]string, 0, len(cmdLine)-1) + for i := 1; i < len(cmdLine); i++ { + keys = append(keys, string(cmdLine[i])) + } + routeMap := getRouteMap(cluster, keys) + if len(routeMap) == 1 { + // only one node, do it fast + for node := range routeMap { + cmdLine[0] = []byte("mget_") + return cluster.Relay(node, c, cmdLine) + } + } + + // tcc + cmdLineMap := make(map[string]CmdLine) + for node, keys := range routeMap { + cmdLineMap[node] = utils.ToCmdLine2("mget", keys...) + } + tx := &TccTx{ + rawCmdLine: cmdLine, + routeMap: routeMap, + cmdLines: cmdLineMap, + } + nodeResults, err := doTcc(cluster, c, tx) + if err != nil { + return err + } + keyValues := make(map[string][]byte, len(cmdLine)-1) + for node, ret := range nodeResults { + nodeCmdLine := cmdLineMap[node] + result := ret.(*protocol.MultiBulkReply) + if len(result.Args) != len(nodeCmdLine) - 1 { + return protocol.MakeErrReply("wrong response from node " + node) + } + for i := 1; i < len(nodeCmdLine); i++ { + key := string(nodeCmdLine[i]) + value := result.Args[i-1] + keyValues[key] = value + } + } + result := make([][]byte, 0, len(cmdLine)-1) + for i := 1; i < len(cmdLine); i++ { + value := keyValues[string(cmdLine[i])] + result = append(result, value) + } + return protocol.MakeMultiBulkReply(result) +} diff --git a/cluster/commands/mset_test.go b/cluster/commands/mset_test.go index 50c2b16..8713584 100644 --- a/cluster/commands/mset_test.go +++ b/cluster/commands/mset_test.go @@ -6,6 +6,7 @@ import ( "github.com/hdt3213/godis/cluster/core" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/protocol/asserts" ) func TestMset(t *testing.T) { @@ -16,5 +17,7 @@ func TestMset(t *testing.T) { c := connection.NewFakeConn() // 1, 2 will be routed to node1 and node2, see MakeTestCluster res := execMSet(node1, c, utils.ToCmdLine("mset", "1", "1", "2", "2")) - println(res) + asserts.AssertStatusReply(t, res, "OK") + res2 := execMGet(node1, c, utils.ToCmdLine("mget", "1", "2")) + asserts.AssertMultiBulkReply(t, res2, []string{"1", "2"}) } \ No newline at end of file diff --git a/cluster/commands/tcccore.go b/cluster/commands/tcccore.go new file mode 100644 index 0000000..63ecbc4 --- /dev/null +++ b/cluster/commands/tcccore.go @@ -0,0 +1,67 @@ +package commands + +import ( + "github.com/hdt3213/godis/cluster/core" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/protocol" +) + +type CmdLine = [][]byte + +// node -> keys on the node +type RouteMap map[string][]string + +func getRouteMap(cluster *core.Cluster, keys []string) RouteMap { + m := make(RouteMap) + for _, key := range keys { + slot := cluster.GetSlot(key) + node := cluster.PickNode(slot) + m[node] = append(m[node], key) + } + return m +} + +type TccTx struct { + rawCmdLine CmdLine + routeMap RouteMap + cmdLines map[string]CmdLine // node -> CmdLine +} + +// execute tcc +// returns node->result map +func doTcc(cluster *core.Cluster, c redis.Connection, tx *TccTx) (map[string]redis.Reply, protocol.ErrorReply) { + txId := utils.RandString(6) + + // send prepare request + for node, cmdLine := range tx.cmdLines { + prepareCmd := utils.ToCmdLine("prepare", txId) + prepareCmd = append(prepareCmd, cmdLine...) + reply := cluster.Relay(node, c, prepareCmd) + if err := protocol.Try2ErrorReply(reply); err != nil { + requestRollback(cluster, c, txId, tx.routeMap) + return nil, protocol.MakeErrReply("prepare failed: " + err.Error()) + } + } + + // send commit request + commiteCmd := utils.ToCmdLine("commit", txId) + result := make(map[string]redis.Reply) + for node := range tx.routeMap { + reply := cluster.Relay(node, c, commiteCmd) + if err := protocol.Try2ErrorReply(reply); err != nil { + requestRollback(cluster, c, txId, tx.routeMap) + return nil, protocol.MakeErrReply("commit failed: " + err.Error()) + } + result[node] = reply + } + + return result, nil +} + +func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap RouteMap) { + rollbackCmd := utils.ToCmdLine("rollback", txId) + for node := range routeMap { + cluster.Relay(node, c, rollbackCmd) + } +} \ No newline at end of file