diff --git a/cluster/commands/mget.go b/cluster/commands/mget.go new file mode 100644 index 0000000..1f99e38 --- /dev/null +++ b/cluster/commands/mget.go @@ -0,0 +1,2 @@ +package commands + diff --git a/cluster/commands/mset.go b/cluster/commands/mset.go index 5884b55..e1fabc3 100644 --- a/cluster/commands/mset.go +++ b/cluster/commands/mset.go @@ -8,13 +8,28 @@ import ( ) func init() { - core.RegisterCmd("mset_", execMSet_) + core.RegisterCmd("mset_", execMSetInLocal) + core.RegisterCmd("mset", execMSet) + } type CmdLine = [][]byte -// execMSet_ executes msets in local node -func execMSet_(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { +// node -> keys on the node +type RouteMap map[string][]string + +func getRouteMap(cluster *core.Cluster, keys []string) RouteMap { + m := make(RouteMap) + for _, key := range keys { + slot := cluster.GetSlot(key) + node := cluster.PickNode(slot) + m[node] = append(m[node], key) + } + return m +} + +// execMSetInLocal executes msets in local node +func execMSetInLocal(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { if len(cmdLine) < 3 { return protocol.MakeArgNumErrReply("mset") } @@ -22,15 +37,34 @@ func execMSet_(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis return cluster.LocalExec(c, cmdLine) } -func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap map[string][]string) { +func execMSet(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) < 3 || len(cmdLine)%2 != 1 { + return protocol.MakeArgNumErrReply("mset") + } + var keys []string + for i := 1; i < len(cmdLine); i += 2 { + keys = append(keys, string(cmdLine[i])) + } + routeMap := getRouteMap(cluster, keys) + if len(routeMap) == 1 { + // only one node, do it fast + for node := range routeMap { + cmdLine[0] = []byte("mset_") + return cluster.Relay(node, c, cmdLine) + } + } + return execMSetSlow(cluster, c, cmdLine, routeMap) +} + +func requestRollback(cluster *core.Cluster, c redis.Connection, txId string, routeMap RouteMap) { rollbackCmd := utils.ToCmdLine("rollback", txId) - for node := range routeMap { + for node := range routeMap { cluster.Relay(node, c, rollbackCmd) } } // execMSetSlow execute mset through tcc -func execMSetSlow(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine, routeMap map[string][]string) redis.Reply { +func execMSetSlow(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine, routeMap RouteMap) redis.Reply { txId := utils.RandString(6) keyValues := make(map[string][]byte) @@ -62,7 +96,7 @@ func execMSetSlow(cluster *core.Cluster, c redis.Connection, cmdLine CmdLine, ro // send commit request commiteCmd := utils.ToCmdLine("commit", txId) - for node := range nodePrepareCmdMap { + for node := range nodePrepareCmdMap { reply := cluster.Relay(node, c, commiteCmd) if protocol.IsErrorReply(reply) { requestRollback(cluster, c, txId, routeMap) diff --git a/cluster/commands/mset_test.go b/cluster/commands/mset_test.go new file mode 100644 index 0000000..50c2b16 --- /dev/null +++ b/cluster/commands/mset_test.go @@ -0,0 +1,20 @@ +package commands + +import ( + "testing" + + "github.com/hdt3213/godis/cluster/core" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" +) + +func TestMset(t *testing.T) { + id1 := "1" + id2 := "2" + nodes := core.MakeTestCluster([]string{id1, id2}) + node1 := nodes[id1] + c := connection.NewFakeConn() + // 1, 2 will be routed to node1 and node2, see MakeTestCluster + res := execMSet(node1, c, utils.ToCmdLine("mset", "1", "1", "2", "2")) + println(res) +} \ No newline at end of file diff --git a/cluster/core/command.go b/cluster/core/command.go index b78bde7..0a0d96d 100644 --- a/cluster/core/command.go +++ b/cluster/core/command.go @@ -62,7 +62,7 @@ func RegisterDefaultCmd(name string) { // relay command to responsible peer, and return its protocol to client func DefaultFunc(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { key := string(args[1]) - slotId := GetSlot(key) + slotId := cluster.GetSlot(key) peer := cluster.PickNode(slotId) if peer == cluster.SelfID() { // to self db diff --git a/cluster/core/core.go b/cluster/core/core.go index 84ce223..39cf3ca 100644 --- a/cluster/core/core.go +++ b/cluster/core/core.go @@ -22,6 +22,11 @@ type Cluster struct { slotsManager *slotsManager rebalanceManger *rebalanceManager transactions *TransactionManager + + // allow inject route implementation + getSlotImpl func(key string) uint32 + pickNodeImpl func(slotID uint32) string + id_ string // for tests only } type Config struct { @@ -32,6 +37,9 @@ type Config struct { } func (c *Cluster) SelfID() string { + if c.raftNode == nil { + return c.id_ + } return c.raftNode.Cfg.ID() } @@ -130,6 +138,12 @@ func NewCluster(cfg *Config) (*Cluster, error) { slotsManager: newSlotsManager(), transactions: newTransactionManager(), } + cluster.pickNodeImpl = func(slotID uint32) string { + return defaultPickNodeImpl(cluster, slotID) + } + cluster.getSlotImpl = func(key string) uint32 { + return defaultGetSlotImpl(cluster, key) + } cluster.injectInsertCallback() cluster.injectDeleteCallback() return cluster, nil diff --git a/cluster/core/migration.go b/cluster/core/migration.go index 27bb3c9..0ffec8c 100644 --- a/cluster/core/migration.go +++ b/cluster/core/migration.go @@ -46,7 +46,7 @@ func (sm *slotStatus) finishExportingWithinLock() { func (cluster *Cluster) injectInsertCallback() { cb := func(dbIndex int, key string, entity *database.DataEntity) { - slotIndex := GetSlot(key) + slotIndex := cluster.GetSlot(key) slotManager := cluster.slotsManager.getSlot(slotIndex) slotManager.mu.Lock() defer slotManager.mu.Unlock() @@ -60,7 +60,7 @@ func (cluster *Cluster) injectInsertCallback() { func (cluster *Cluster) injectDeleteCallback() { cb := func(dbIndex int, key string, entity *database.DataEntity) { - slotIndex := GetSlot(key) + slotIndex := cluster.GetSlot(key) slotManager := cluster.slotsManager.getSlot(slotIndex) slotManager.mu.Lock() defer slotManager.mu.Unlock() diff --git a/cluster/core/tcc.go b/cluster/core/tcc.go index 6162de9..4bf901e 100644 --- a/cluster/core/tcc.go +++ b/cluster/core/tcc.go @@ -79,7 +79,7 @@ func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Rep } cluster.transactions.mu.Unlock() - resp := cluster.db.Exec(c, tx.realCmdLine) + resp := cluster.db.ExecWithLock(c, tx.realCmdLine) // unlock regardless of result cluster.db.RWUnLocks(0, tx.writeKeys, tx.readKeys) @@ -88,7 +88,8 @@ func execCommit(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Rep // do not delete transaction, waiting rollback return resp } - // todo delete transaction after deadline + + // todo: delete transaction after deadline // cluster.transactions.mu.Lock() // delete(cluster.transactions.txs, txId) // cluster.transactions.mu.Unlock() @@ -115,7 +116,7 @@ func execRollback(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.R cluster.db.RWLocks(0, tx.writeKeys, tx.readKeys) for i := len(tx.undoLogs) - 1; i >= 0; i-- { cmdline := tx.undoLogs[i] - cluster.db.Exec(c, cmdline) + cluster.db.ExecWithLock(c, cmdline) } cluster.db.RWUnLocks(0, tx.writeKeys, tx.readKeys) diff --git a/cluster/core/tests.go b/cluster/core/tests.go new file mode 100644 index 0000000..4847fd1 --- /dev/null +++ b/cluster/core/tests.go @@ -0,0 +1,43 @@ +package core + +import ( + "strconv" + + dbimpl "github.com/hdt3213/godis/database" +) + +// MakeTestCluster creates a cluster for test, which communications are done through local function calls. +func MakeTestCluster(ids []string) map[string]*Cluster { + nodes := make(map[string]*Cluster) + connections := NewInMemConnectionFactory() + connections.nodes = nodes + for _, id := range ids { + db := dbimpl.NewStandaloneServer() + cluster := &Cluster{ + db: db, + config: &Config{}, + connections: connections, + rebalanceManger: newRebalanceManager(), + slotsManager: newSlotsManager(), + transactions: newTransactionManager(), + id_: id, + } + cluster.pickNodeImpl = func(slotID uint32) string { + // skip raft for test + index := int(slotID) % len(ids) + return ids[index] + } + cluster.getSlotImpl = func(key string) uint32 { + // backdoor for test + i, err := strconv.Atoi(key) + if err == nil && i < SlotCount { + return uint32(i) + } + return defaultGetSlotImpl(cluster, key) + } + cluster.injectInsertCallback() + cluster.injectDeleteCallback() + nodes[id] = cluster + } + return nodes +} diff --git a/cluster/core/utils.go b/cluster/core/utils.go index 01e55d6..b02eb30 100644 --- a/cluster/core/utils.go +++ b/cluster/core/utils.go @@ -52,15 +52,23 @@ func GetPartitionKey(key string) string { return key[beg+1 : end] } -func GetSlot(key string) uint32 { +func defaultGetSlotImpl(cluster *Cluster, key string) uint32 { partitionKey := GetPartitionKey(key) return crc32.ChecksumIEEE([]byte(partitionKey)) % uint32(SlotCount) } +func (cluster *Cluster) GetSlot(key string) uint32 { + return cluster.getSlotImpl(key) +} + +func defaultPickNodeImpl(cluster *Cluster, slotID uint32) string { + return cluster.raftNode.FSM.PickNode(slotID) +} + // pickNode returns the node id hosting the given slot. // If the slot is migrating, return the node which is exporting the slot func (cluster *Cluster) PickNode(slotID uint32) string { - return cluster.raftNode.FSM.PickNode(slotID) + return cluster.pickNodeImpl(slotID) } // format: raft.committedindex