diff --git a/cluster/cluster.go b/cluster/cluster.go index 686097f..8b5aeae 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -15,28 +15,33 @@ import ( "github.com/hdt3213/godis/redis/protocol" "github.com/jolestar/go-commons-pool/v2" "runtime/debug" - "strconv" "strings" ) +type PeerPicker interface { + AddNode(keys ...string) + PickNode(key string) string +} + // Cluster represents a node of godis cluster // it holds part of data and coordinates other nodes to finish transactions type Cluster struct { self string nodes []string - peerPicker *consistenthash.Map + peerPicker PeerPicker peerConnection map[string]*pool.ObjectPool db database.EmbedDB transactions *dict.SimpleDict // id -> Transaction idGenerator *idgenerator.IDGenerator + // use a variable to allow injecting stub for testing + relayImpl func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply } const ( replicas = 4 - lockSize = 64 ) // if only one node involved in a transaction, just execute the command don't apply tcc procedure @@ -53,6 +58,7 @@ func MakeCluster() *Cluster { peerConnection: make(map[string]*pool.ObjectPool), idGenerator: idgenerator.MakeGenerator(config.Properties.Self), + relayImpl: defaultRelayImpl, } contains := make(map[string]struct{}) nodes := make([]string, 0, len(config.Properties.Peers)+1) @@ -144,45 +150,3 @@ func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis func (cluster *Cluster) AfterClientClose(c redis.Connection) { cluster.db.AfterClientClose(c) } - -func ping(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { - return cluster.db.Exec(c, cmdLine) -} - -/*----- utils -------*/ - -func makeArgs(cmd string, args ...string) [][]byte { - result := make([][]byte, len(args)+1) - result[0] = []byte(cmd) - for i, arg := range args { - result[i+1] = []byte(arg) - } - return result -} - -// return node -> writeKeys -func (cluster *Cluster) groupBy(keys []string) map[string][]string { - result := make(map[string][]string) - for _, key := range keys { - peer := cluster.peerPicker.PickNode(key) - group, ok := result[peer] - if !ok { - group = make([]string, 0) - } - group = append(group, key) - result[peer] = group - } - return result -} - -func execSelect(c redis.Connection, args [][]byte) redis.Reply { - dbIndex, err := strconv.Atoi(string(args[1])) - if err != nil { - return protocol.MakeErrReply("ERR invalid DB index") - } - if dbIndex >= config.Properties.Databases { - return protocol.MakeErrReply("ERR DB index is out of range") - } - c.SelectDB(dbIndex) - return protocol.MakeOkReply() -} diff --git a/cluster/com.go b/cluster/com.go index deb329e..652d62f 100644 --- a/cluster/com.go +++ b/cluster/com.go @@ -34,26 +34,31 @@ func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client) return connectionFactory.ReturnObject(context.Background(), peerClient) } -// relay relays command to peer -// select db by c.GetDBIndex() -// cannot call Prepare, Commit, execRollback of self node -func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply { - if peer == cluster.self { +var defaultRelayImpl = func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply { + if node == cluster.self { // to self db - return cluster.db.Exec(c, args) + return cluster.db.Exec(c, cmdLine) } - peerClient, err := cluster.getPeerClient(peer) + peerClient, err := cluster.getPeerClient(node) if err != nil { return protocol.MakeErrReply(err.Error()) } defer func() { - _ = cluster.returnPeerClient(peer, peerClient) + _ = cluster.returnPeerClient(node, peerClient) }() peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex()))) - return peerClient.Send(args) + return peerClient.Send(cmdLine) } -// broadcast broadcasts command to all node in cluster +// relay function relays command to peer +// select db by c.GetDBIndex() +// cannot call Prepare, Commit, execRollback of self node +func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply { + // use a variable to allow injecting stub for testing + return cluster.relayImpl(cluster, peer, c, args) +} + +// broadcast function broadcasts command to all node in cluster func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply { result := make(map[string]redis.Reply) for _, node := range cluster.nodes { diff --git a/cluster/com_test.go b/cluster/com_test.go index 5bf756f..25ee917 100644 --- a/cluster/com_test.go +++ b/cluster/com_test.go @@ -27,11 +27,11 @@ func TestAuth(t *testing.T) { config.Properties.RequirePass = "" }() conn := &connection.FakeConn{} - ret := testCluster.Exec(conn, toArgs("GET", "a")) + ret := testNodeA.Exec(conn, toArgs("GET", "a")) asserts.AssertErrReply(t, ret, "NOAUTH Authentication required") - ret = testCluster.Exec(conn, toArgs("AUTH", passwd)) + ret = testNodeA.Exec(conn, toArgs("AUTH", passwd)) asserts.AssertStatusReply(t, ret, "OK") - ret = testCluster.Exec(conn, toArgs("GET", "a")) + ret = testNodeA.Exec(conn, toArgs("GET", "a")) asserts.AssertNotError(t, ret) } diff --git a/cluster/del_test.go b/cluster/del_test.go index 6a115ac..fbd0f32 100644 --- a/cluster/del_test.go +++ b/cluster/del_test.go @@ -9,9 +9,9 @@ import ( func TestDel(t *testing.T) { conn := &connection.FakeConn{} allowFastTransaction = false - testCluster.Exec(conn, toArgs("SET", "a", "a")) - ret := Del(testCluster, conn, toArgs("DEL", "a", "b", "c")) + testNodeA.Exec(conn, toArgs("SET", "a", "a")) + ret := Del(testNodeA, conn, toArgs("DEL", "a", "b", "c")) asserts.AssertNotError(t, ret) - ret = testCluster.Exec(conn, toArgs("GET", "a")) + ret = testNodeA.Exec(conn, toArgs("GET", "a")) asserts.AssertNullBulk(t, ret) } diff --git a/cluster/mset_test.go b/cluster/mset_test.go index 34dcfb3..377f575 100644 --- a/cluster/mset_test.go +++ b/cluster/mset_test.go @@ -9,20 +9,20 @@ import ( func TestMSet(t *testing.T) { conn := &connection.FakeConn{} allowFastTransaction = false - ret := MSet(testCluster, conn, toArgs("MSET", "a", "a", "b", "b")) + ret := MSet(testNodeA, conn, toArgs("MSET", "a", "a", "b", "b")) asserts.AssertNotError(t, ret) - ret = testCluster.Exec(conn, toArgs("MGET", "a", "b")) + ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b")) asserts.AssertMultiBulkReply(t, ret, []string{"a", "b"}) } func TestMSetNx(t *testing.T) { conn := &connection.FakeConn{} allowFastTransaction = false - FlushAll(testCluster, conn, toArgs("FLUSHALL")) - ret := MSetNX(testCluster, conn, toArgs("MSETNX", "a", "a", "b", "b")) + FlushAll(testNodeA, conn, toArgs("FLUSHALL")) + ret := MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "b", "b")) asserts.AssertNotError(t, ret) - ret = MSetNX(testCluster, conn, toArgs("MSETNX", "a", "a", "c", "c")) + ret = MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "c", "c")) asserts.AssertNotError(t, ret) - ret = testCluster.Exec(conn, toArgs("MGET", "a", "b", "c")) + ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b", "c")) asserts.AssertMultiBulkReply(t, ret, []string{"a", "b", ""}) } diff --git a/cluster/multi_test.go b/cluster/multi_test.go index 44f15cd..33219b7 100644 --- a/cluster/multi_test.go +++ b/cluster/multi_test.go @@ -10,30 +10,30 @@ import ( func TestMultiExecOnSelf(t *testing.T) { conn := new(connection.FakeConn) - testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) - result := testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) + result := testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) key := utils.RandString(10) value := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) + testNodeA.Exec(conn, utils.ToCmdLine("set", key, value)) key2 := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("rpush", key2, value)) - result = testCluster.Exec(conn, utils.ToCmdLine("exec")) + testNodeA.Exec(conn, utils.ToCmdLine("rpush", key2, value)) + result = testNodeA.Exec(conn, utils.ToCmdLine("exec")) asserts.AssertNotError(t, result) - result = testCluster.Exec(conn, utils.ToCmdLine("get", key)) + result = testNodeA.Exec(conn, utils.ToCmdLine("get", key)) asserts.AssertBulkReply(t, result, value) - result = testCluster.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1")) + result = testNodeA.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1")) asserts.AssertMultiBulkReply(t, result, []string{value}) } func TestEmptyMulti(t *testing.T) { conn := new(connection.FakeConn) - testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) - result := testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) + result := testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) - result = testCluster.Exec(conn, utils.ToCmdLine("PING")) + result = testNodeA.Exec(conn, utils.ToCmdLine("PING")) asserts.AssertNotError(t, result) - result = testCluster.Exec(conn, utils.ToCmdLine("EXEC")) + result = testNodeA.Exec(conn, utils.ToCmdLine("EXEC")) asserts.AssertNotError(t, result) mbr := result.(*protocol.MultiRawReply) asserts.AssertStatusReply(t, mbr.Replies[0], "PONG") @@ -41,16 +41,16 @@ func TestEmptyMulti(t *testing.T) { func TestMultiExecOnOthers(t *testing.T) { conn := new(connection.FakeConn) - testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) - result := testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) + result := testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) key := utils.RandString(10) value := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("rpush", key, value)) - testCluster.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1")) + testNodeA.Exec(conn, utils.ToCmdLine("rpush", key, value)) + testNodeA.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1")) cmdLines := conn.GetQueuedCmdLine() - rawResp := execMultiOnOtherNode(testCluster, conn, testCluster.self, nil, cmdLines) + rawResp := execMultiOnOtherNode(testNodeA, conn, testNodeA.self, nil, cmdLines) rep := rawResp.(*protocol.MultiRawReply) if len(rep.Replies) != 2 { t.Errorf("expect 2 replies actual %d", len(rep.Replies)) @@ -60,53 +60,53 @@ func TestMultiExecOnOthers(t *testing.T) { func TestWatch(t *testing.T) { conn := new(connection.FakeConn) - testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) + testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) key := utils.RandString(10) value := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("watch", key)) - testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) - result := testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.Exec(conn, utils.ToCmdLine("watch", key)) + testNodeA.Exec(conn, utils.ToCmdLine("set", key, value)) + result := testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) key2 := utils.RandString(10) value2 := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) - result = testCluster.Exec(conn, utils.ToCmdLine("exec")) + testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2)) + result = testNodeA.Exec(conn, utils.ToCmdLine("exec")) asserts.AssertNotError(t, result) - result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) + result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2)) asserts.AssertNullBulk(t, result) - testCluster.Exec(conn, utils.ToCmdLine("watch", key)) - result = testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.Exec(conn, utils.ToCmdLine("watch", key)) + result = testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) - testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) - result = testCluster.Exec(conn, utils.ToCmdLine("exec")) + testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2)) + result = testNodeA.Exec(conn, utils.ToCmdLine("exec")) asserts.AssertNotError(t, result) - result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) + result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2)) asserts.AssertBulkReply(t, result, value2) } func TestWatch2(t *testing.T) { conn := new(connection.FakeConn) - testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) + testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) key := utils.RandString(10) value := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("watch", key)) - testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) - result := testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.Exec(conn, utils.ToCmdLine("watch", key)) + testNodeA.Exec(conn, utils.ToCmdLine("set", key, value)) + result := testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) key2 := utils.RandString(10) value2 := utils.RandString(10) - testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) + testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2)) cmdLines := conn.GetQueuedCmdLine() - execMultiOnOtherNode(testCluster, conn, testCluster.self, conn.GetWatching(), cmdLines) - result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) + execMultiOnOtherNode(testNodeA, conn, testNodeA.self, conn.GetWatching(), cmdLines) + result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2)) asserts.AssertNullBulk(t, result) - testCluster.Exec(conn, utils.ToCmdLine("watch", key)) - result = testCluster.Exec(conn, toArgs("MULTI")) + testNodeA.Exec(conn, utils.ToCmdLine("watch", key)) + result = testNodeA.Exec(conn, toArgs("MULTI")) asserts.AssertNotError(t, result) - testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) - execMultiOnOtherNode(testCluster, conn, testCluster.self, conn.GetWatching(), cmdLines) - result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) + testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2)) + execMultiOnOtherNode(testNodeA, conn, testNodeA.self, conn.GetWatching(), cmdLines) + result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2)) asserts.AssertBulkReply(t, result, value2) } diff --git a/cluster/pubsub_test.go b/cluster/pubsub_test.go index 61e2b81..5b51dd7 100644 --- a/cluster/pubsub_test.go +++ b/cluster/pubsub_test.go @@ -12,9 +12,9 @@ func TestPublish(t *testing.T) { channel := utils.RandString(5) msg := utils.RandString(5) conn := &connection.FakeConn{} - Subscribe(testCluster, conn, utils.ToCmdLine("SUBSCRIBE", channel)) + Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel)) conn.Clean() // clean subscribe success - Publish(testCluster, conn, utils.ToCmdLine("PUBLISH", channel, msg)) + Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg)) data := conn.Bytes() ret, err := parser.ParseOne(data) if err != nil { @@ -28,19 +28,19 @@ func TestPublish(t *testing.T) { }) // unsubscribe - UnSubscribe(testCluster, conn, utils.ToCmdLine("UNSUBSCRIBE", channel)) + UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE", channel)) conn.Clean() - Publish(testCluster, conn, utils.ToCmdLine("PUBLISH", channel, msg)) + Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg)) data = conn.Bytes() if len(data) > 0 { t.Error("expect no msg") } // unsubscribe all - Subscribe(testCluster, conn, utils.ToCmdLine("SUBSCRIBE", channel)) - UnSubscribe(testCluster, conn, utils.ToCmdLine("UNSUBSCRIBE")) + Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel)) + UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE")) conn.Clean() - Publish(testCluster, conn, utils.ToCmdLine("PUBLISH", channel, msg)) + Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg)) data = conn.Bytes() if len(data) > 0 { t.Error("expect no msg") diff --git a/cluster/rename.go b/cluster/rename.go index d0ba48a..74fbec7 100644 --- a/cluster/rename.go +++ b/cluster/rename.go @@ -19,7 +19,6 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply { if srcNode == destNode { // do fast return cluster.relay(srcNode, c, args) } - groupMap := map[string][]string{ srcNode: {srcKey}, destNode: {destKey}, diff --git a/cluster/rename_test.go b/cluster/rename_test.go index fa0d94a..ca8b070 100644 --- a/cluster/rename_test.go +++ b/cluster/rename_test.go @@ -1,68 +1,159 @@ package cluster import ( - "fmt" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol/asserts" "testing" ) func TestRename(t *testing.T) { conn := new(connection.FakeConn) - testDB := testCluster.db - testDB.Exec(conn, utils.ToCmdLine("FlushALL")) - key := utils.RandString(10) + testNodeA.db.Exec(conn, utils.ToCmdLine("FlushALL")) + + // cross node rename + key := testNodeA.self + utils.RandString(10) value := utils.RandString(10) - newKey := key + utils.RandString(2) - testDB.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) - result := Rename(testCluster, conn, utils.ToCmdLine("RENAME", key, newKey)) - if _, ok := result.(*protocol.OkReply); !ok { - t.Error("expect ok") - return - } - //result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key)) - //asserts.AssertIntReply(t, result, 0) - result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + newKey := testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result := Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key, newKey)) + asserts.AssertStatusReply(t, result, "OK") + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 0) + result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) asserts.AssertIntReply(t, result, 1) - // check ttl - result = testDB.Exec(conn, utils.ToCmdLine("TTL", newKey)) - intResult, ok := result.(*protocol.IntReply) - if !ok { - t.Error(fmt.Sprintf("expected int protocol, actually %s", result.ToBytes())) - return - } - if intResult.Code <= 0 { - t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) - return - } - // test no src key + result = testNodeB.db.Exec(conn, utils.ToCmdLine("TTL", newKey)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + + // same node rename + key = testNodeA.self + utils.RandString(10) + value = utils.RandString(10) + newKey = key + utils.RandString(2) + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result = Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key, newKey)) + asserts.AssertStatusReply(t, result, "OK") + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 0) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", newKey)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + + // test src prepare failed + *simulateATimout = true + key = testNodeA.self + utils.RandString(10) + newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + value = utils.RandString(10) + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result = Rename(testNodeB, conn, utils.ToCmdLine("RENAME", key, newKey)) + asserts.AssertErrReply(t, result, "ERR timeout") + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + asserts.AssertIntReply(t, result, 0) + *simulateATimout = false + + // test dest prepare failed + *simulateBTimout = true + key = testNodeA.self + utils.RandString(10) + newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + value = utils.RandString(10) + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result = Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key, newKey)) + asserts.AssertErrReply(t, result, "ERR timeout") + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + asserts.AssertIntReply(t, result, 0) + *simulateBTimout = false + + result = Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key)) + asserts.AssertErrReply(t, result, "ERR wrong number of arguments for 'rename' command") } func TestRenameNx(t *testing.T) { conn := new(connection.FakeConn) - testDB := testCluster.db - testDB.Exec(conn, utils.ToCmdLine("FlushALL")) - key := utils.RandString(10) + testNodeA.db.Exec(conn, utils.ToCmdLine("FlushALL")) + // cross node rename + key := testNodeA.self + utils.RandString(10) value := utils.RandString(10) - newKey := key + utils.RandString(2) - testCluster.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) - result := RenameNx(testCluster, conn, utils.ToCmdLine("RENAMENX", key, newKey)) + newKey := testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result := RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey)) asserts.AssertIntReply(t, result, 1) - result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key)) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) asserts.AssertIntReply(t, result, 0) - result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) - + result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) asserts.AssertIntReply(t, result, 1) - result = testDB.Exec(conn, utils.ToCmdLine("TTL", newKey)) - intResult, ok := result.(*protocol.IntReply) - if !ok { - t.Error(fmt.Sprintf("expected int protocol, actually %s", result.ToBytes())) - return - } - if intResult.Code <= 0 { - t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) - return - } + result = testNodeB.db.Exec(conn, utils.ToCmdLine("TTL", newKey)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + + // cross node rename, dest key exist + key = testNodeA.self + utils.RandString(10) + value = utils.RandString(10) + newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + testNodeB.db.Exec(conn, utils.ToCmdLine("SET", newKey, newKey)) + result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey)) + asserts.AssertIntReply(t, result, 0) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + result = testNodeB.db.Exec(conn, utils.ToCmdLine("GET", newKey)) + asserts.AssertBulkReply(t, result, newKey) + + // same node rename + key = testNodeA.self + utils.RandString(10) + value = utils.RandString(10) + newKey = key + utils.RandString(2) + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 0) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", newKey)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + + // test src prepare failed + *simulateATimout = true + key = testNodeA.self + utils.RandString(10) + newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + value = utils.RandString(10) + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result = RenameNx(testNodeB, conn, utils.ToCmdLine("RENAMENX", key, newKey)) + asserts.AssertErrReply(t, result, "ERR timeout") + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + asserts.AssertIntReply(t, result, 0) + *simulateATimout = false + + // test dest prepare failed + *simulateBTimout = true + key = testNodeA.self + utils.RandString(10) + newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode + value = utils.RandString(10) + testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) + result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey)) + asserts.AssertErrReply(t, result, "ERR timeout") + result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key)) + asserts.AssertIntReply(t, result, 1) + result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key)) + asserts.AssertIntReplyGreaterThan(t, result, 0) + result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) + asserts.AssertIntReply(t, result, 0) + *simulateBTimout = false + + result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key)) + asserts.AssertErrReply(t, result, "ERR wrong number of arguments for 'renamenx' command") + } diff --git a/cluster/tcc_test.go b/cluster/tcc_test.go index 2a83bdf..f8c2ec8 100644 --- a/cluster/tcc_test.go +++ b/cluster/tcc_test.go @@ -11,37 +11,37 @@ import ( func TestRollback(t *testing.T) { // rollback uncommitted transaction conn := new(connection.FakeConn) - FlushAll(testCluster, conn, toArgs("FLUSHALL")) + FlushAll(testNodeA, conn, toArgs("FLUSHALL")) txID := rand.Int63() txIDStr := strconv.FormatInt(txID, 10) keys := []string{"a", "b"} - groupMap := testCluster.groupBy(keys) + groupMap := testNodeA.groupBy(keys) args := []string{txIDStr, "DEL"} args = append(args, keys...) - testCluster.Exec(conn, toArgs("SET", "a", "a")) - ret := execPrepare(testCluster, conn, makeArgs("Prepare", args...)) + testNodeA.Exec(conn, toArgs("SET", "a", "a")) + ret := execPrepare(testNodeA, conn, makeArgs("Prepare", args...)) asserts.AssertNotError(t, ret) - requestRollback(testCluster, conn, txID, groupMap) - ret = testCluster.Exec(conn, toArgs("GET", "a")) + requestRollback(testNodeA, conn, txID, groupMap) + ret = testNodeA.Exec(conn, toArgs("GET", "a")) asserts.AssertBulkReply(t, ret, "a") // rollback committed transaction - FlushAll(testCluster, conn, toArgs("FLUSHALL")) + FlushAll(testNodeA, conn, toArgs("FLUSHALL")) txID = rand.Int63() txIDStr = strconv.FormatInt(txID, 10) args = []string{txIDStr, "DEL"} args = append(args, keys...) - testCluster.Exec(conn, toArgs("SET", "a", "a")) - ret = execPrepare(testCluster, conn, makeArgs("Prepare", args...)) + testNodeA.Exec(conn, toArgs("SET", "a", "a")) + ret = execPrepare(testNodeA, conn, makeArgs("Prepare", args...)) asserts.AssertNotError(t, ret) - _, err := requestCommit(testCluster, conn, txID, groupMap) + _, err := requestCommit(testNodeA, conn, txID, groupMap) if err != nil { t.Errorf("del failed %v", err) return } - ret = testCluster.Exec(conn, toArgs("GET", "a")) + ret = testNodeA.Exec(conn, toArgs("GET", "a")) asserts.AssertNullBulk(t, ret) - requestRollback(testCluster, conn, txID, groupMap) - ret = testCluster.Exec(conn, toArgs("GET", "a")) + requestRollback(testNodeA, conn, txID, groupMap) + ret = testNodeA.Exec(conn, toArgs("GET", "a")) asserts.AssertBulkReply(t, ret, "a") } diff --git a/cluster/utils.go b/cluster/utils.go new file mode 100644 index 0000000..f9de730 --- /dev/null +++ b/cluster/utils.go @@ -0,0 +1,50 @@ +package cluster + +import ( + "github.com/hdt3213/godis/config" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/redis/protocol" + "strconv" +) + +func ping(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply { + return cluster.db.Exec(c, cmdLine) +} + +/*----- utils -------*/ + +func makeArgs(cmd string, args ...string) [][]byte { + result := make([][]byte, len(args)+1) + result[0] = []byte(cmd) + for i, arg := range args { + result[i+1] = []byte(arg) + } + return result +} + +// return node -> writeKeys +func (cluster *Cluster) groupBy(keys []string) map[string][]string { + result := make(map[string][]string) + for _, key := range keys { + peer := cluster.peerPicker.PickNode(key) + group, ok := result[peer] + if !ok { + group = make([]string, 0) + } + group = append(group, key) + result[peer] = group + } + return result +} + +func execSelect(c redis.Connection, args [][]byte) redis.Reply { + dbIndex, err := strconv.Atoi(string(args[1])) + if err != nil { + return protocol.MakeErrReply("ERR invalid DB index") + } + if dbIndex >= config.Properties.Databases { + return protocol.MakeErrReply("ERR DB index is out of range") + } + c.SelectDB(dbIndex) + return protocol.MakeOkReply() +} diff --git a/cluster/utils_test.go b/cluster/utils_test.go index 8882eb2..8aa701d 100644 --- a/cluster/utils_test.go +++ b/cluster/utils_test.go @@ -2,10 +2,86 @@ package cluster import ( "github.com/hdt3213/godis/config" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/redis/protocol" "math/rand" + "strings" ) -var testCluster = MakeTestCluster(nil) +var testNodeA, testNodeB *Cluster +var simulateATimout, simulateBTimout *bool + +type mockPicker struct { + nodes []string +} + +func (picker *mockPicker) AddNode(keys ...string) { + picker.nodes = append(picker.nodes, keys...) +} + +func (picker *mockPicker) PickNode(key string) string { + for _, n := range picker.nodes { + if strings.Contains(key, n) { + return n + } + } + return picker.nodes[0] +} + +func makeMockRelay(peer *Cluster) (*bool, func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply) { + simulateTimeout0 := false + simulateTimeout := &simulateTimeout0 + return simulateTimeout, func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply { + if len(cmdLine) == 0 { + return protocol.MakeErrReply("ERR command required") + } + if node == cluster.self { + // to self db + cmdName := strings.ToLower(string(cmdLine[0])) + if cmdName == "prepare" { + return execPrepare(cluster, c, cmdLine) + } else if cmdName == "commit" { + return execCommit(cluster, c, cmdLine) + } else if cmdName == "rollback" { + return execRollback(cluster, c, cmdLine) + } + return cluster.db.Exec(c, cmdLine) + } + if *simulateTimeout { + return protocol.MakeErrReply("ERR timeout") + } + cmdName := strings.ToLower(string(cmdLine[0])) + if cmdName == "prepare" { + return execPrepare(peer, c, cmdLine) + } else if cmdName == "commit" { + return execCommit(peer, c, cmdLine) + } else if cmdName == "rollback" { + return execRollback(peer, c, cmdLine) + } + return peer.db.Exec(c, cmdLine) + } +} + +func init() { + if config.Properties == nil { + config.Properties = &config.ServerProperties{} + } + addrA := "127.0.0.1:6399" + addrB := "127.0.0.1:7379" + config.Properties.Self = addrA + config.Properties.Peers = []string{addrB} + testNodeA = MakeCluster() + config.Properties.Self = addrB + config.Properties.Peers = []string{addrA} + testNodeB = MakeCluster() + + simulateBTimout, testNodeA.relayImpl = makeMockRelay(testNodeB) + testNodeA.peerPicker = &mockPicker{} + testNodeA.peerPicker.AddNode(addrA, addrB) + simulateATimout, testNodeB.relayImpl = makeMockRelay(testNodeA) + testNodeB.peerPicker = &mockPicker{} + testNodeB.peerPicker.AddNode(addrB, addrA) +} func MakeTestCluster(peers []string) *Cluster { if config.Properties == nil { diff --git a/redis/protocol/asserts/assert.go b/redis/protocol/asserts/assert.go index 2fcc177..bbd6011 100644 --- a/redis/protocol/asserts/assert.go +++ b/redis/protocol/asserts/assert.go @@ -21,6 +21,17 @@ func AssertIntReply(t *testing.T, actual redis.Reply, expected int) { } } +func AssertIntReplyGreaterThan(t *testing.T, actual redis.Reply, expected int) { + intResult, ok := actual.(*protocol.IntReply) + if !ok { + t.Errorf("expected int protocol, actually %s, %s", actual.ToBytes(), printStack()) + return + } + if intResult.Code < int64(expected) { + t.Errorf("expected %d, actually %d, %s", expected, intResult.Code, printStack()) + } +} + // AssertBulkReply checks if the given redis.Reply is the expected string func AssertBulkReply(t *testing.T, actual redis.Reply, expected string) { bulkReply, ok := actual.(*protocol.BulkReply)