cluster test suite

This commit is contained in:
hdt3213
2022-04-13 20:48:42 +08:00
parent 8d08f1d8c2
commit 3605f09def
13 changed files with 371 additions and 175 deletions

View File

@@ -15,28 +15,33 @@ import (
"github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol"
"github.com/jolestar/go-commons-pool/v2" "github.com/jolestar/go-commons-pool/v2"
"runtime/debug" "runtime/debug"
"strconv"
"strings" "strings"
) )
type PeerPicker interface {
AddNode(keys ...string)
PickNode(key string) string
}
// Cluster represents a node of godis cluster // Cluster represents a node of godis cluster
// it holds part of data and coordinates other nodes to finish transactions // it holds part of data and coordinates other nodes to finish transactions
type Cluster struct { type Cluster struct {
self string self string
nodes []string nodes []string
peerPicker *consistenthash.Map peerPicker PeerPicker
peerConnection map[string]*pool.ObjectPool peerConnection map[string]*pool.ObjectPool
db database.EmbedDB db database.EmbedDB
transactions *dict.SimpleDict // id -> Transaction transactions *dict.SimpleDict // id -> Transaction
idGenerator *idgenerator.IDGenerator idGenerator *idgenerator.IDGenerator
// use a variable to allow injecting stub for testing
relayImpl func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply
} }
const ( const (
replicas = 4 replicas = 4
lockSize = 64
) )
// if only one node involved in a transaction, just execute the command don't apply tcc procedure // if only one node involved in a transaction, just execute the command don't apply tcc procedure
@@ -53,6 +58,7 @@ func MakeCluster() *Cluster {
peerConnection: make(map[string]*pool.ObjectPool), peerConnection: make(map[string]*pool.ObjectPool),
idGenerator: idgenerator.MakeGenerator(config.Properties.Self), idGenerator: idgenerator.MakeGenerator(config.Properties.Self),
relayImpl: defaultRelayImpl,
} }
contains := make(map[string]struct{}) contains := make(map[string]struct{})
nodes := make([]string, 0, len(config.Properties.Peers)+1) nodes := make([]string, 0, len(config.Properties.Peers)+1)
@@ -144,45 +150,3 @@ func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis
func (cluster *Cluster) AfterClientClose(c redis.Connection) { func (cluster *Cluster) AfterClientClose(c redis.Connection) {
cluster.db.AfterClientClose(c) cluster.db.AfterClientClose(c)
} }
func ping(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
return cluster.db.Exec(c, cmdLine)
}
/*----- utils -------*/
func makeArgs(cmd string, args ...string) [][]byte {
result := make([][]byte, len(args)+1)
result[0] = []byte(cmd)
for i, arg := range args {
result[i+1] = []byte(arg)
}
return result
}
// return node -> writeKeys
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
result := make(map[string][]string)
for _, key := range keys {
peer := cluster.peerPicker.PickNode(key)
group, ok := result[peer]
if !ok {
group = make([]string, 0)
}
group = append(group, key)
result[peer] = group
}
return result
}
func execSelect(c redis.Connection, args [][]byte) redis.Reply {
dbIndex, err := strconv.Atoi(string(args[1]))
if err != nil {
return protocol.MakeErrReply("ERR invalid DB index")
}
if dbIndex >= config.Properties.Databases {
return protocol.MakeErrReply("ERR DB index is out of range")
}
c.SelectDB(dbIndex)
return protocol.MakeOkReply()
}

View File

@@ -34,26 +34,31 @@ func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client)
return connectionFactory.ReturnObject(context.Background(), peerClient) return connectionFactory.ReturnObject(context.Background(), peerClient)
} }
// relay relays command to peer var defaultRelayImpl = func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply {
// select db by c.GetDBIndex() if node == cluster.self {
// cannot call Prepare, Commit, execRollback of self node
func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
if peer == cluster.self {
// to self db // to self db
return cluster.db.Exec(c, args) return cluster.db.Exec(c, cmdLine)
} }
peerClient, err := cluster.getPeerClient(peer) peerClient, err := cluster.getPeerClient(node)
if err != nil { if err != nil {
return protocol.MakeErrReply(err.Error()) return protocol.MakeErrReply(err.Error())
} }
defer func() { defer func() {
_ = cluster.returnPeerClient(peer, peerClient) _ = cluster.returnPeerClient(node, peerClient)
}() }()
peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex()))) peerClient.Send(utils.ToCmdLine("SELECT", strconv.Itoa(c.GetDBIndex())))
return peerClient.Send(args) return peerClient.Send(cmdLine)
} }
// broadcast broadcasts command to all node in cluster // relay function relays command to peer
// select db by c.GetDBIndex()
// cannot call Prepare, Commit, execRollback of self node
func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
// use a variable to allow injecting stub for testing
return cluster.relayImpl(cluster, peer, c, args)
}
// broadcast function broadcasts command to all node in cluster
func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply { func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
result := make(map[string]redis.Reply) result := make(map[string]redis.Reply)
for _, node := range cluster.nodes { for _, node := range cluster.nodes {

View File

@@ -27,11 +27,11 @@ func TestAuth(t *testing.T) {
config.Properties.RequirePass = "" config.Properties.RequirePass = ""
}() }()
conn := &connection.FakeConn{} conn := &connection.FakeConn{}
ret := testCluster.Exec(conn, toArgs("GET", "a")) ret := testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertErrReply(t, ret, "NOAUTH Authentication required") asserts.AssertErrReply(t, ret, "NOAUTH Authentication required")
ret = testCluster.Exec(conn, toArgs("AUTH", passwd)) ret = testNodeA.Exec(conn, toArgs("AUTH", passwd))
asserts.AssertStatusReply(t, ret, "OK") asserts.AssertStatusReply(t, ret, "OK")
ret = testCluster.Exec(conn, toArgs("GET", "a")) ret = testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
} }

View File

@@ -9,9 +9,9 @@ import (
func TestDel(t *testing.T) { func TestDel(t *testing.T) {
conn := &connection.FakeConn{} conn := &connection.FakeConn{}
allowFastTransaction = false allowFastTransaction = false
testCluster.Exec(conn, toArgs("SET", "a", "a")) testNodeA.Exec(conn, toArgs("SET", "a", "a"))
ret := Del(testCluster, conn, toArgs("DEL", "a", "b", "c")) ret := Del(testNodeA, conn, toArgs("DEL", "a", "b", "c"))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
ret = testCluster.Exec(conn, toArgs("GET", "a")) ret = testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertNullBulk(t, ret) asserts.AssertNullBulk(t, ret)
} }

View File

@@ -9,20 +9,20 @@ import (
func TestMSet(t *testing.T) { func TestMSet(t *testing.T) {
conn := &connection.FakeConn{} conn := &connection.FakeConn{}
allowFastTransaction = false allowFastTransaction = false
ret := MSet(testCluster, conn, toArgs("MSET", "a", "a", "b", "b")) ret := MSet(testNodeA, conn, toArgs("MSET", "a", "a", "b", "b"))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
ret = testCluster.Exec(conn, toArgs("MGET", "a", "b")) ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b"))
asserts.AssertMultiBulkReply(t, ret, []string{"a", "b"}) asserts.AssertMultiBulkReply(t, ret, []string{"a", "b"})
} }
func TestMSetNx(t *testing.T) { func TestMSetNx(t *testing.T) {
conn := &connection.FakeConn{} conn := &connection.FakeConn{}
allowFastTransaction = false allowFastTransaction = false
FlushAll(testCluster, conn, toArgs("FLUSHALL")) FlushAll(testNodeA, conn, toArgs("FLUSHALL"))
ret := MSetNX(testCluster, conn, toArgs("MSETNX", "a", "a", "b", "b")) ret := MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "b", "b"))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
ret = MSetNX(testCluster, conn, toArgs("MSETNX", "a", "a", "c", "c")) ret = MSetNX(testNodeA, conn, toArgs("MSETNX", "a", "a", "c", "c"))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
ret = testCluster.Exec(conn, toArgs("MGET", "a", "b", "c")) ret = testNodeA.Exec(conn, toArgs("MGET", "a", "b", "c"))
asserts.AssertMultiBulkReply(t, ret, []string{"a", "b", ""}) asserts.AssertMultiBulkReply(t, ret, []string{"a", "b", ""})
} }

View File

@@ -10,30 +10,30 @@ import (
func TestMultiExecOnSelf(t *testing.T) { func TestMultiExecOnSelf(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL"))
result := testCluster.Exec(conn, toArgs("MULTI")) result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
key := utils.RandString(10) key := utils.RandString(10)
value := utils.RandString(10) value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) testNodeA.Exec(conn, utils.ToCmdLine("set", key, value))
key2 := utils.RandString(10) key2 := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("rpush", key2, value)) testNodeA.Exec(conn, utils.ToCmdLine("rpush", key2, value))
result = testCluster.Exec(conn, utils.ToCmdLine("exec")) result = testNodeA.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key)) result = testNodeA.Exec(conn, utils.ToCmdLine("get", key))
asserts.AssertBulkReply(t, result, value) asserts.AssertBulkReply(t, result, value)
result = testCluster.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1")) result = testNodeA.Exec(conn, utils.ToCmdLine("lrange", key2, "0", "-1"))
asserts.AssertMultiBulkReply(t, result, []string{value}) asserts.AssertMultiBulkReply(t, result, []string{value})
} }
func TestEmptyMulti(t *testing.T) { func TestEmptyMulti(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL"))
result := testCluster.Exec(conn, toArgs("MULTI")) result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("PING")) result = testNodeA.Exec(conn, utils.ToCmdLine("PING"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("EXEC")) result = testNodeA.Exec(conn, utils.ToCmdLine("EXEC"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
mbr := result.(*protocol.MultiRawReply) mbr := result.(*protocol.MultiRawReply)
asserts.AssertStatusReply(t, mbr.Replies[0], "PONG") asserts.AssertStatusReply(t, mbr.Replies[0], "PONG")
@@ -41,16 +41,16 @@ func TestEmptyMulti(t *testing.T) {
func TestMultiExecOnOthers(t *testing.T) { func TestMultiExecOnOthers(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL"))
result := testCluster.Exec(conn, toArgs("MULTI")) result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
key := utils.RandString(10) key := utils.RandString(10)
value := utils.RandString(10) value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("rpush", key, value)) testNodeA.Exec(conn, utils.ToCmdLine("rpush", key, value))
testCluster.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1")) testNodeA.Exec(conn, utils.ToCmdLine("lrange", key, "0", "-1"))
cmdLines := conn.GetQueuedCmdLine() cmdLines := conn.GetQueuedCmdLine()
rawResp := execMultiOnOtherNode(testCluster, conn, testCluster.self, nil, cmdLines) rawResp := execMultiOnOtherNode(testNodeA, conn, testNodeA.self, nil, cmdLines)
rep := rawResp.(*protocol.MultiRawReply) rep := rawResp.(*protocol.MultiRawReply)
if len(rep.Replies) != 2 { if len(rep.Replies) != 2 {
t.Errorf("expect 2 replies actual %d", len(rep.Replies)) t.Errorf("expect 2 replies actual %d", len(rep.Replies))
@@ -60,53 +60,53 @@ func TestMultiExecOnOthers(t *testing.T) {
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL"))
key := utils.RandString(10) key := utils.RandString(10)
value := utils.RandString(10) value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("watch", key)) testNodeA.Exec(conn, utils.ToCmdLine("watch", key))
testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) testNodeA.Exec(conn, utils.ToCmdLine("set", key, value))
result := testCluster.Exec(conn, toArgs("MULTI")) result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
key2 := utils.RandString(10) key2 := utils.RandString(10)
value2 := utils.RandString(10) value2 := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2))
result = testCluster.Exec(conn, utils.ToCmdLine("exec")) result = testNodeA.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2))
asserts.AssertNullBulk(t, result) asserts.AssertNullBulk(t, result)
testCluster.Exec(conn, utils.ToCmdLine("watch", key)) testNodeA.Exec(conn, utils.ToCmdLine("watch", key))
result = testCluster.Exec(conn, toArgs("MULTI")) result = testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2))
result = testCluster.Exec(conn, utils.ToCmdLine("exec")) result = testNodeA.Exec(conn, utils.ToCmdLine("exec"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2))
asserts.AssertBulkReply(t, result, value2) asserts.AssertBulkReply(t, result, value2)
} }
func TestWatch2(t *testing.T) { func TestWatch2(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testCluster.db.Exec(conn, utils.ToCmdLine("FLUSHALL")) testNodeA.db.Exec(conn, utils.ToCmdLine("FLUSHALL"))
key := utils.RandString(10) key := utils.RandString(10)
value := utils.RandString(10) value := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("watch", key)) testNodeA.Exec(conn, utils.ToCmdLine("watch", key))
testCluster.Exec(conn, utils.ToCmdLine("set", key, value)) testNodeA.Exec(conn, utils.ToCmdLine("set", key, value))
result := testCluster.Exec(conn, toArgs("MULTI")) result := testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
key2 := utils.RandString(10) key2 := utils.RandString(10)
value2 := utils.RandString(10) value2 := utils.RandString(10)
testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2))
cmdLines := conn.GetQueuedCmdLine() cmdLines := conn.GetQueuedCmdLine()
execMultiOnOtherNode(testCluster, conn, testCluster.self, conn.GetWatching(), cmdLines) execMultiOnOtherNode(testNodeA, conn, testNodeA.self, conn.GetWatching(), cmdLines)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2))
asserts.AssertNullBulk(t, result) asserts.AssertNullBulk(t, result)
testCluster.Exec(conn, utils.ToCmdLine("watch", key)) testNodeA.Exec(conn, utils.ToCmdLine("watch", key))
result = testCluster.Exec(conn, toArgs("MULTI")) result = testNodeA.Exec(conn, toArgs("MULTI"))
asserts.AssertNotError(t, result) asserts.AssertNotError(t, result)
testCluster.Exec(conn, utils.ToCmdLine("set", key2, value2)) testNodeA.Exec(conn, utils.ToCmdLine("set", key2, value2))
execMultiOnOtherNode(testCluster, conn, testCluster.self, conn.GetWatching(), cmdLines) execMultiOnOtherNode(testNodeA, conn, testNodeA.self, conn.GetWatching(), cmdLines)
result = testCluster.Exec(conn, utils.ToCmdLine("get", key2)) result = testNodeA.Exec(conn, utils.ToCmdLine("get", key2))
asserts.AssertBulkReply(t, result, value2) asserts.AssertBulkReply(t, result, value2)
} }

View File

@@ -12,9 +12,9 @@ func TestPublish(t *testing.T) {
channel := utils.RandString(5) channel := utils.RandString(5)
msg := utils.RandString(5) msg := utils.RandString(5)
conn := &connection.FakeConn{} conn := &connection.FakeConn{}
Subscribe(testCluster, conn, utils.ToCmdLine("SUBSCRIBE", channel)) Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel))
conn.Clean() // clean subscribe success conn.Clean() // clean subscribe success
Publish(testCluster, conn, utils.ToCmdLine("PUBLISH", channel, msg)) Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg))
data := conn.Bytes() data := conn.Bytes()
ret, err := parser.ParseOne(data) ret, err := parser.ParseOne(data)
if err != nil { if err != nil {
@@ -28,19 +28,19 @@ func TestPublish(t *testing.T) {
}) })
// unsubscribe // unsubscribe
UnSubscribe(testCluster, conn, utils.ToCmdLine("UNSUBSCRIBE", channel)) UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE", channel))
conn.Clean() conn.Clean()
Publish(testCluster, conn, utils.ToCmdLine("PUBLISH", channel, msg)) Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg))
data = conn.Bytes() data = conn.Bytes()
if len(data) > 0 { if len(data) > 0 {
t.Error("expect no msg") t.Error("expect no msg")
} }
// unsubscribe all // unsubscribe all
Subscribe(testCluster, conn, utils.ToCmdLine("SUBSCRIBE", channel)) Subscribe(testNodeA, conn, utils.ToCmdLine("SUBSCRIBE", channel))
UnSubscribe(testCluster, conn, utils.ToCmdLine("UNSUBSCRIBE")) UnSubscribe(testNodeA, conn, utils.ToCmdLine("UNSUBSCRIBE"))
conn.Clean() conn.Clean()
Publish(testCluster, conn, utils.ToCmdLine("PUBLISH", channel, msg)) Publish(testNodeA, conn, utils.ToCmdLine("PUBLISH", channel, msg))
data = conn.Bytes() data = conn.Bytes()
if len(data) > 0 { if len(data) > 0 {
t.Error("expect no msg") t.Error("expect no msg")

View File

@@ -19,7 +19,6 @@ func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if srcNode == destNode { // do fast if srcNode == destNode { // do fast
return cluster.relay(srcNode, c, args) return cluster.relay(srcNode, c, args)
} }
groupMap := map[string][]string{ groupMap := map[string][]string{
srcNode: {srcKey}, srcNode: {srcKey},
destNode: {destKey}, destNode: {destKey},

View File

@@ -1,68 +1,159 @@
package cluster package cluster
import ( import (
"fmt"
"github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
"github.com/hdt3213/godis/redis/protocol/asserts" "github.com/hdt3213/godis/redis/protocol/asserts"
"testing" "testing"
) )
func TestRename(t *testing.T) { func TestRename(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testDB := testCluster.db testNodeA.db.Exec(conn, utils.ToCmdLine("FlushALL"))
testDB.Exec(conn, utils.ToCmdLine("FlushALL"))
key := utils.RandString(10) // cross node rename
key := testNodeA.self + utils.RandString(10)
value := utils.RandString(10) value := utils.RandString(10)
newKey := key + utils.RandString(2) newKey := testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
testDB.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result := Rename(testCluster, conn, utils.ToCmdLine("RENAME", key, newKey)) result := Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key, newKey))
if _, ok := result.(*protocol.OkReply); !ok { asserts.AssertStatusReply(t, result, "OK")
t.Error("expect ok") result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
return asserts.AssertIntReply(t, result, 0)
} result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
//result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key))
//asserts.AssertIntReply(t, result, 0)
result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 1) asserts.AssertIntReply(t, result, 1)
// check ttl result = testNodeB.db.Exec(conn, utils.ToCmdLine("TTL", newKey))
result = testDB.Exec(conn, utils.ToCmdLine("TTL", newKey)) asserts.AssertIntReplyGreaterThan(t, result, 0)
intResult, ok := result.(*protocol.IntReply)
if !ok { // same node rename
t.Error(fmt.Sprintf("expected int protocol, actually %s", result.ToBytes())) key = testNodeA.self + utils.RandString(10)
return value = utils.RandString(10)
} newKey = key + utils.RandString(2)
if intResult.Code <= 0 { testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) result = Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key, newKey))
return asserts.AssertStatusReply(t, result, "OK")
} result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
// test no src key asserts.AssertIntReply(t, result, 0)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", newKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
// test src prepare failed
*simulateATimout = true
key = testNodeA.self + utils.RandString(10)
newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
value = utils.RandString(10)
testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result = Rename(testNodeB, conn, utils.ToCmdLine("RENAME", key, newKey))
asserts.AssertErrReply(t, result, "ERR timeout")
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 0)
*simulateATimout = false
// test dest prepare failed
*simulateBTimout = true
key = testNodeA.self + utils.RandString(10)
newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
value = utils.RandString(10)
testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result = Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key, newKey))
asserts.AssertErrReply(t, result, "ERR timeout")
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 0)
*simulateBTimout = false
result = Rename(testNodeA, conn, utils.ToCmdLine("RENAME", key))
asserts.AssertErrReply(t, result, "ERR wrong number of arguments for 'rename' command")
} }
func TestRenameNx(t *testing.T) { func TestRenameNx(t *testing.T) {
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
testDB := testCluster.db testNodeA.db.Exec(conn, utils.ToCmdLine("FlushALL"))
testDB.Exec(conn, utils.ToCmdLine("FlushALL")) // cross node rename
key := utils.RandString(10) key := testNodeA.self + utils.RandString(10)
value := utils.RandString(10) value := utils.RandString(10)
newKey := key + utils.RandString(2) newKey := testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
testCluster.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000")) testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result := RenameNx(testCluster, conn, utils.ToCmdLine("RENAMENX", key, newKey)) result := RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertIntReply(t, result, 1) asserts.AssertIntReply(t, result, 1)
result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", key)) result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 0) asserts.AssertIntReply(t, result, 0)
result = testDB.Exec(conn, utils.ToCmdLine("EXISTS", newKey)) result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 1) asserts.AssertIntReply(t, result, 1)
result = testDB.Exec(conn, utils.ToCmdLine("TTL", newKey)) result = testNodeB.db.Exec(conn, utils.ToCmdLine("TTL", newKey))
intResult, ok := result.(*protocol.IntReply) asserts.AssertIntReplyGreaterThan(t, result, 0)
if !ok {
t.Error(fmt.Sprintf("expected int protocol, actually %s", result.ToBytes())) // cross node rename, dest key exist
return key = testNodeA.self + utils.RandString(10)
} value = utils.RandString(10)
if intResult.Code <= 0 { newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
t.Errorf("expected ttl more than 0, actual: %d", intResult.Code) testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
return testNodeB.db.Exec(conn, utils.ToCmdLine("SET", newKey, newKey))
} result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertIntReply(t, result, 0)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.db.Exec(conn, utils.ToCmdLine("GET", newKey))
asserts.AssertBulkReply(t, result, newKey)
// same node rename
key = testNodeA.self + utils.RandString(10)
value = utils.RandString(10)
newKey = key + utils.RandString(2)
testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 0)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", newKey))
asserts.AssertIntReplyGreaterThan(t, result, 0)
// test src prepare failed
*simulateATimout = true
key = testNodeA.self + utils.RandString(10)
newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
value = utils.RandString(10)
testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result = RenameNx(testNodeB, conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertErrReply(t, result, "ERR timeout")
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 0)
*simulateATimout = false
// test dest prepare failed
*simulateBTimout = true
key = testNodeA.self + utils.RandString(10)
newKey = testNodeB.self + utils.RandString(10) // route to testNodeB, see mockPicker.PickNode
value = utils.RandString(10)
testNodeA.db.Exec(conn, utils.ToCmdLine("SET", key, value, "ex", "1000"))
result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key, newKey))
asserts.AssertErrReply(t, result, "ERR timeout")
result = testNodeA.db.Exec(conn, utils.ToCmdLine("EXISTS", key))
asserts.AssertIntReply(t, result, 1)
result = testNodeA.db.Exec(conn, utils.ToCmdLine("TTL", key))
asserts.AssertIntReplyGreaterThan(t, result, 0)
result = testNodeB.db.Exec(conn, utils.ToCmdLine("EXISTS", newKey))
asserts.AssertIntReply(t, result, 0)
*simulateBTimout = false
result = RenameNx(testNodeA, conn, utils.ToCmdLine("RENAMENX", key))
asserts.AssertErrReply(t, result, "ERR wrong number of arguments for 'renamenx' command")
} }

View File

@@ -11,37 +11,37 @@ import (
func TestRollback(t *testing.T) { func TestRollback(t *testing.T) {
// rollback uncommitted transaction // rollback uncommitted transaction
conn := new(connection.FakeConn) conn := new(connection.FakeConn)
FlushAll(testCluster, conn, toArgs("FLUSHALL")) FlushAll(testNodeA, conn, toArgs("FLUSHALL"))
txID := rand.Int63() txID := rand.Int63()
txIDStr := strconv.FormatInt(txID, 10) txIDStr := strconv.FormatInt(txID, 10)
keys := []string{"a", "b"} keys := []string{"a", "b"}
groupMap := testCluster.groupBy(keys) groupMap := testNodeA.groupBy(keys)
args := []string{txIDStr, "DEL"} args := []string{txIDStr, "DEL"}
args = append(args, keys...) args = append(args, keys...)
testCluster.Exec(conn, toArgs("SET", "a", "a")) testNodeA.Exec(conn, toArgs("SET", "a", "a"))
ret := execPrepare(testCluster, conn, makeArgs("Prepare", args...)) ret := execPrepare(testNodeA, conn, makeArgs("Prepare", args...))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
requestRollback(testCluster, conn, txID, groupMap) requestRollback(testNodeA, conn, txID, groupMap)
ret = testCluster.Exec(conn, toArgs("GET", "a")) ret = testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertBulkReply(t, ret, "a") asserts.AssertBulkReply(t, ret, "a")
// rollback committed transaction // rollback committed transaction
FlushAll(testCluster, conn, toArgs("FLUSHALL")) FlushAll(testNodeA, conn, toArgs("FLUSHALL"))
txID = rand.Int63() txID = rand.Int63()
txIDStr = strconv.FormatInt(txID, 10) txIDStr = strconv.FormatInt(txID, 10)
args = []string{txIDStr, "DEL"} args = []string{txIDStr, "DEL"}
args = append(args, keys...) args = append(args, keys...)
testCluster.Exec(conn, toArgs("SET", "a", "a")) testNodeA.Exec(conn, toArgs("SET", "a", "a"))
ret = execPrepare(testCluster, conn, makeArgs("Prepare", args...)) ret = execPrepare(testNodeA, conn, makeArgs("Prepare", args...))
asserts.AssertNotError(t, ret) asserts.AssertNotError(t, ret)
_, err := requestCommit(testCluster, conn, txID, groupMap) _, err := requestCommit(testNodeA, conn, txID, groupMap)
if err != nil { if err != nil {
t.Errorf("del failed %v", err) t.Errorf("del failed %v", err)
return return
} }
ret = testCluster.Exec(conn, toArgs("GET", "a")) ret = testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertNullBulk(t, ret) asserts.AssertNullBulk(t, ret)
requestRollback(testCluster, conn, txID, groupMap) requestRollback(testNodeA, conn, txID, groupMap)
ret = testCluster.Exec(conn, toArgs("GET", "a")) ret = testNodeA.Exec(conn, toArgs("GET", "a"))
asserts.AssertBulkReply(t, ret, "a") asserts.AssertBulkReply(t, ret, "a")
} }

50
cluster/utils.go Normal file
View File

@@ -0,0 +1,50 @@
package cluster
import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)
func ping(cluster *Cluster, c redis.Connection, cmdLine CmdLine) redis.Reply {
return cluster.db.Exec(c, cmdLine)
}
/*----- utils -------*/
func makeArgs(cmd string, args ...string) [][]byte {
result := make([][]byte, len(args)+1)
result[0] = []byte(cmd)
for i, arg := range args {
result[i+1] = []byte(arg)
}
return result
}
// return node -> writeKeys
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
result := make(map[string][]string)
for _, key := range keys {
peer := cluster.peerPicker.PickNode(key)
group, ok := result[peer]
if !ok {
group = make([]string, 0)
}
group = append(group, key)
result[peer] = group
}
return result
}
func execSelect(c redis.Connection, args [][]byte) redis.Reply {
dbIndex, err := strconv.Atoi(string(args[1]))
if err != nil {
return protocol.MakeErrReply("ERR invalid DB index")
}
if dbIndex >= config.Properties.Databases {
return protocol.MakeErrReply("ERR DB index is out of range")
}
c.SelectDB(dbIndex)
return protocol.MakeOkReply()
}

View File

@@ -2,10 +2,86 @@ package cluster
import ( import (
"github.com/hdt3213/godis/config" "github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/protocol"
"math/rand" "math/rand"
"strings"
) )
var testCluster = MakeTestCluster(nil) var testNodeA, testNodeB *Cluster
var simulateATimout, simulateBTimout *bool
type mockPicker struct {
nodes []string
}
func (picker *mockPicker) AddNode(keys ...string) {
picker.nodes = append(picker.nodes, keys...)
}
func (picker *mockPicker) PickNode(key string) string {
for _, n := range picker.nodes {
if strings.Contains(key, n) {
return n
}
}
return picker.nodes[0]
}
func makeMockRelay(peer *Cluster) (*bool, func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply) {
simulateTimeout0 := false
simulateTimeout := &simulateTimeout0
return simulateTimeout, func(cluster *Cluster, node string, c redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) == 0 {
return protocol.MakeErrReply("ERR command required")
}
if node == cluster.self {
// to self db
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "prepare" {
return execPrepare(cluster, c, cmdLine)
} else if cmdName == "commit" {
return execCommit(cluster, c, cmdLine)
} else if cmdName == "rollback" {
return execRollback(cluster, c, cmdLine)
}
return cluster.db.Exec(c, cmdLine)
}
if *simulateTimeout {
return protocol.MakeErrReply("ERR timeout")
}
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "prepare" {
return execPrepare(peer, c, cmdLine)
} else if cmdName == "commit" {
return execCommit(peer, c, cmdLine)
} else if cmdName == "rollback" {
return execRollback(peer, c, cmdLine)
}
return peer.db.Exec(c, cmdLine)
}
}
func init() {
if config.Properties == nil {
config.Properties = &config.ServerProperties{}
}
addrA := "127.0.0.1:6399"
addrB := "127.0.0.1:7379"
config.Properties.Self = addrA
config.Properties.Peers = []string{addrB}
testNodeA = MakeCluster()
config.Properties.Self = addrB
config.Properties.Peers = []string{addrA}
testNodeB = MakeCluster()
simulateBTimout, testNodeA.relayImpl = makeMockRelay(testNodeB)
testNodeA.peerPicker = &mockPicker{}
testNodeA.peerPicker.AddNode(addrA, addrB)
simulateATimout, testNodeB.relayImpl = makeMockRelay(testNodeA)
testNodeB.peerPicker = &mockPicker{}
testNodeB.peerPicker.AddNode(addrB, addrA)
}
func MakeTestCluster(peers []string) *Cluster { func MakeTestCluster(peers []string) *Cluster {
if config.Properties == nil { if config.Properties == nil {

View File

@@ -21,6 +21,17 @@ func AssertIntReply(t *testing.T, actual redis.Reply, expected int) {
} }
} }
func AssertIntReplyGreaterThan(t *testing.T, actual redis.Reply, expected int) {
intResult, ok := actual.(*protocol.IntReply)
if !ok {
t.Errorf("expected int protocol, actually %s, %s", actual.ToBytes(), printStack())
return
}
if intResult.Code < int64(expected) {
t.Errorf("expected %d, actually %d, %s", expected, intResult.Code, printStack())
}
}
// AssertBulkReply checks if the given redis.Reply is the expected string // AssertBulkReply checks if the given redis.Reply is the expected string
func AssertBulkReply(t *testing.T, actual redis.Reply, expected string) { func AssertBulkReply(t *testing.T, actual redis.Reply, expected string) {
bulkReply, ok := actual.(*protocol.BulkReply) bulkReply, ok := actual.(*protocol.BulkReply)