mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-05 16:57:06 +08:00
add unittest for cluster
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -4,6 +4,8 @@
|
|||||||
*.dll
|
*.dll
|
||||||
*.so
|
*.so
|
||||||
*.dylib
|
*.dylib
|
||||||
|
*.aof
|
||||||
|
*.rdb
|
||||||
|
|
||||||
# Test binary, build with `go test -c`
|
# Test binary, build with `go test -c`
|
||||||
*.test
|
*.test
|
||||||
|
@@ -19,7 +19,7 @@ import (
|
|||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
self string
|
self string
|
||||||
|
|
||||||
peers []string
|
nodes []string
|
||||||
peerPicker *consistenthash.Map
|
peerPicker *consistenthash.Map
|
||||||
peerConnection map[string]*pool.ObjectPool
|
peerConnection map[string]*pool.ObjectPool
|
||||||
|
|
||||||
@@ -34,6 +34,9 @@ const (
|
|||||||
lockSize = 64
|
lockSize = 64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// if only one node involved in a transaction, just execute the command don't apply tcc procedure
|
||||||
|
var allowFastTransaction = true
|
||||||
|
|
||||||
// start current processing as a node of cluster
|
// start current processing as a node of cluster
|
||||||
func MakeCluster() *Cluster {
|
func MakeCluster() *Cluster {
|
||||||
cluster := &Cluster{
|
cluster := &Cluster{
|
||||||
@@ -58,12 +61,12 @@ func MakeCluster() *Cluster {
|
|||||||
nodes = append(nodes, config.Properties.Self)
|
nodes = append(nodes, config.Properties.Self)
|
||||||
cluster.peerPicker.AddNode(nodes...)
|
cluster.peerPicker.AddNode(nodes...)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for _, peer := range nodes {
|
for _, peer := range config.Properties.Peers {
|
||||||
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{
|
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{
|
||||||
Peer: peer,
|
Peer: peer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
cluster.peers = nodes
|
cluster.nodes = nodes
|
||||||
return cluster
|
return cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,17 +97,10 @@ func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Re
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
|
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||||
if len(args) == 1 {
|
return db.Ping(cluster.db, args[1:])
|
||||||
return &reply.PongReply{}
|
|
||||||
} else if len(args) == 2 {
|
|
||||||
return reply.MakeStatusReply("\"" + string(args[1]) + "\"")
|
|
||||||
} else {
|
|
||||||
return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*----- utils -------*/
|
/*----- utils -------*/
|
||||||
|
@@ -52,9 +52,9 @@ func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) re
|
|||||||
}
|
}
|
||||||
|
|
||||||
// broadcast command to all node in cluster
|
// broadcast command to all node in cluster
|
||||||
func (cluster *Cluster) Broadcast(peer string, c redis.Connection, args [][]byte) map[string]redis.Reply {
|
func (cluster *Cluster) Broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
|
||||||
result := make(map[string]redis.Reply)
|
result := make(map[string]redis.Reply)
|
||||||
for _, node := range cluster.peers {
|
for _, node := range cluster.nodes {
|
||||||
reply := cluster.Relay(node, c, args)
|
reply := cluster.Relay(node, c, args)
|
||||||
result[node] = reply
|
result[node] = reply
|
||||||
}
|
}
|
||||||
|
37
cluster/com_test.go
Normal file
37
cluster/com_test.go
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/redis/reply/asserts"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestExec(t *testing.T) {
|
||||||
|
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
key := RandString(4)
|
||||||
|
value := RandString(4)
|
||||||
|
testCluster2.Exec(nil, toArgs("SET", key, value))
|
||||||
|
ret := testCluster2.Exec(nil, toArgs("GET", key))
|
||||||
|
asserts.AssertBulkReply(t, ret, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRelay(t *testing.T) {
|
||||||
|
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
|
||||||
|
key := RandString(4)
|
||||||
|
value := RandString(4)
|
||||||
|
ret := testCluster2.Relay("127.0.0.1:6379", nil, toArgs("SET", key, value))
|
||||||
|
asserts.AssertNotError(t, ret)
|
||||||
|
ret = testCluster2.Relay("127.0.0.1:6379", nil, toArgs("GET", key))
|
||||||
|
asserts.AssertBulkReply(t, ret, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBroadcast(t *testing.T) {
|
||||||
|
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
|
||||||
|
key := RandString(4)
|
||||||
|
value := RandString(4)
|
||||||
|
rets := testCluster2.Broadcast(nil, toArgs("SET", key, value))
|
||||||
|
for _, v := range rets {
|
||||||
|
asserts.AssertNotError(t, v)
|
||||||
|
}
|
||||||
|
}
|
@@ -15,7 +15,7 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
|||||||
keys[i-1] = string(args[i])
|
keys[i-1] = string(args[i])
|
||||||
}
|
}
|
||||||
groupMap := cluster.groupBy(keys)
|
groupMap := cluster.groupBy(keys)
|
||||||
if len(groupMap) == 1 { // do fast
|
if len(groupMap) == 1 && allowFastTransaction { // do fast
|
||||||
for peer, group := range groupMap { // only one group
|
for peer, group := range groupMap { // only one group
|
||||||
return cluster.Relay(peer, c, makeArgs("DEL", group...))
|
return cluster.Relay(peer, c, makeArgs("DEL", group...))
|
||||||
}
|
}
|
||||||
|
15
cluster/del_test.go
Normal file
15
cluster/del_test.go
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/redis/reply/asserts"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDel(t *testing.T) {
|
||||||
|
allowFastTransaction = false
|
||||||
|
testCluster.Exec(nil, toArgs("SET", "a", "a"))
|
||||||
|
ret := Del(testCluster, nil, toArgs("DEL", "a", "b", "c"))
|
||||||
|
asserts.AssertNotError(t, ret)
|
||||||
|
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||||
|
asserts.AssertNullBulk(t, ret)
|
||||||
|
}
|
25
cluster/keys.go
Normal file
25
cluster/keys.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/interface/redis"
|
||||||
|
"github.com/hdt3213/godis/redis/reply"
|
||||||
|
)
|
||||||
|
|
||||||
|
func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||||
|
replies := cluster.Broadcast(c, args)
|
||||||
|
var errReply reply.ErrorReply
|
||||||
|
for _, v := range replies {
|
||||||
|
if reply.IsErrorReply(v) {
|
||||||
|
errReply = v.(reply.ErrorReply)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if errReply == nil {
|
||||||
|
return &reply.OkReply{}
|
||||||
|
}
|
||||||
|
return reply.MakeErrReply("error occurs: " + errReply.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func FlushAll(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||||
|
return FlushDB(cluster, c, args)
|
||||||
|
}
|
@@ -95,7 +95,7 @@ func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
groupMap := cluster.groupBy(keys)
|
groupMap := cluster.groupBy(keys)
|
||||||
if len(groupMap) == 1 { // do fast
|
if len(groupMap) == 1 && allowFastTransaction { // do fast
|
||||||
for peer := range groupMap {
|
for peer := range groupMap {
|
||||||
return cluster.Relay(peer, c, args)
|
return cluster.Relay(peer, c, args)
|
||||||
}
|
}
|
||||||
|
21
cluster/mset_test.go
Normal file
21
cluster/mset_test.go
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/redis/reply/asserts"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMSet(t *testing.T) {
|
||||||
|
allowFastTransaction = false
|
||||||
|
ret := MSet(testCluster, nil, toArgs("MSET", "a", "a", "b", "b"))
|
||||||
|
asserts.AssertNotError(t, ret)
|
||||||
|
ret = testCluster.Exec(nil, toArgs("MGET", "a", "b"))
|
||||||
|
asserts.AssertMultiBulkReply(t, ret, []string{"a", "b"})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMSetNx(t *testing.T) {
|
||||||
|
allowFastTransaction = false
|
||||||
|
FlushAll(testCluster, nil, toArgs("FLUSHALL"))
|
||||||
|
ret := MSetNX(testCluster, nil, toArgs("MSETNX", "a", "a", "b", "b"))
|
||||||
|
asserts.AssertNotError(t, ret)
|
||||||
|
}
|
@@ -19,7 +19,7 @@ var (
|
|||||||
// broadcast msg to all peers in cluster when receive publish command from client
|
// broadcast msg to all peers in cluster when receive publish command from client
|
||||||
func Publish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
func Publish(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
|
||||||
var count int64 = 0
|
var count int64 = 0
|
||||||
for _, peer := range cluster.peers {
|
for _, peer := range cluster.nodes {
|
||||||
var re redis.Reply
|
var re redis.Reply
|
||||||
if peer == cluster.self {
|
if peer == cluster.self {
|
||||||
args0 := make([][]byte, len(args))
|
args0 := make([][]byte, len(args))
|
||||||
|
@@ -106,8 +106,8 @@ func MakeRouter() map[string]CmdFunc {
|
|||||||
routerMap["subscribe"] = Subscribe
|
routerMap["subscribe"] = Subscribe
|
||||||
routerMap["unsubscribe"] = UnSubscribe
|
routerMap["unsubscribe"] = UnSubscribe
|
||||||
|
|
||||||
//routerMap["flushdb"] = FlushDB
|
routerMap["flushdb"] = FlushDB
|
||||||
//routerMap["flushall"] = FlushAll
|
routerMap["flushall"] = FlushAll
|
||||||
//routerMap["keys"] = Keys
|
//routerMap["keys"] = Keys
|
||||||
|
|
||||||
return routerMap
|
return routerMap
|
||||||
|
45
cluster/transaction_test.go
Normal file
45
cluster/transaction_test.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/redis/reply/asserts"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRollback(t *testing.T) {
|
||||||
|
// rollback uncommitted transaction
|
||||||
|
FlushAll(testCluster, nil, toArgs("FLUSHALL"))
|
||||||
|
txId := rand.Int63()
|
||||||
|
txIdStr := strconv.FormatInt(txId, 10)
|
||||||
|
keys := []string{"a", "b"}
|
||||||
|
groupMap := testCluster.groupBy(keys)
|
||||||
|
args := []string{txIdStr}
|
||||||
|
args = append(args, keys...)
|
||||||
|
testCluster.Exec(nil, toArgs("SET", "a", "a"))
|
||||||
|
ret := PrepareDel(testCluster, nil, makeArgs("PrepareDel", args...))
|
||||||
|
asserts.AssertNotError(t, ret)
|
||||||
|
RequestRollback(testCluster, nil, txId, groupMap)
|
||||||
|
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||||
|
asserts.AssertBulkReply(t, ret, "a")
|
||||||
|
|
||||||
|
// rollback committed transaction
|
||||||
|
FlushAll(testCluster, nil, toArgs("FLUSHALL"))
|
||||||
|
txId = rand.Int63()
|
||||||
|
txIdStr = strconv.FormatInt(txId, 10)
|
||||||
|
args = []string{txIdStr}
|
||||||
|
args = append(args, keys...)
|
||||||
|
testCluster.Exec(nil, toArgs("SET", "a", "a"))
|
||||||
|
ret = PrepareDel(testCluster, nil, makeArgs("PrepareDel", args...))
|
||||||
|
asserts.AssertNotError(t, ret)
|
||||||
|
_, err := RequestCommit(testCluster, nil, txId, groupMap)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("del failed %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||||
|
asserts.AssertNullBulk(t, ret)
|
||||||
|
RequestRollback(testCluster, nil, txId, groupMap)
|
||||||
|
ret = testCluster.Exec(nil, toArgs("GET", "a"))
|
||||||
|
asserts.AssertBulkReply(t, ret, "a")
|
||||||
|
}
|
35
cluster/utils_test.go
Normal file
35
cluster/utils_test.go
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/config"
|
||||||
|
"math/rand"
|
||||||
|
)
|
||||||
|
|
||||||
|
var testCluster = MakeTestCluster(nil)
|
||||||
|
|
||||||
|
func MakeTestCluster(peers []string) *Cluster {
|
||||||
|
if config.Properties == nil {
|
||||||
|
config.Properties = &config.PropertyHolder{}
|
||||||
|
}
|
||||||
|
config.Properties.Self = "127.0.0.1:6399"
|
||||||
|
config.Properties.Peers = peers
|
||||||
|
return MakeCluster()
|
||||||
|
}
|
||||||
|
|
||||||
|
func toArgs(cmd ...string) [][]byte {
|
||||||
|
args := make([][]byte, len(cmd))
|
||||||
|
for i, s := range cmd {
|
||||||
|
args[i] = []byte(s)
|
||||||
|
}
|
||||||
|
return args
|
||||||
|
}
|
||||||
|
|
||||||
|
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
|
||||||
|
|
||||||
|
func RandString(n int) string {
|
||||||
|
b := make([]rune, n)
|
||||||
|
for i := range b {
|
||||||
|
b[i] = letters[rand.Intn(len(letters))]
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
@@ -4,7 +4,6 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"github.com/hdt3213/godis/lib/logger"
|
"github.com/hdt3213/godis/lib/logger"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -93,7 +92,7 @@ func parse(src io.Reader) *PropertyHolder {
|
|||||||
func SetupConfig(configFilename string) {
|
func SetupConfig(configFilename string) {
|
||||||
file, err := os.Open(configFilename)
|
file, err := os.Open(configFilename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
Properties = parse(file)
|
Properties = parse(file)
|
||||||
|
@@ -1,14 +1,8 @@
|
|||||||
package lock
|
package lock
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"runtime"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -121,32 +115,3 @@ func (locks *Locks) RUnLocks(keys ...string) {
|
|||||||
mu.RUnlock()
|
mu.RUnlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GoID() int {
|
|
||||||
var buf [64]byte
|
|
||||||
n := runtime.Stack(buf[:], false)
|
|
||||||
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
|
|
||||||
id, err := strconv.Atoi(idField)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Sprintf("cannot get goroutine id: %v", err))
|
|
||||||
}
|
|
||||||
return id
|
|
||||||
}
|
|
||||||
|
|
||||||
func debug(testing.T) {
|
|
||||||
lm := Locks{}
|
|
||||||
size := 10
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(size)
|
|
||||||
for i := 0; i < size; i++ {
|
|
||||||
go func(i int) {
|
|
||||||
lm.Locks("1", "2")
|
|
||||||
println("go: " + strconv.Itoa(GoID()))
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
println("go: " + strconv.Itoa(GoID()))
|
|
||||||
lm.UnLocks("1", "2")
|
|
||||||
wg.Done()
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
@@ -62,6 +62,37 @@ func AssertErrReply(t *testing.T, actual redis.Reply, expected string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AssertNotError(t *testing.T, result redis.Reply) {
|
||||||
|
if result == nil {
|
||||||
|
t.Errorf("result is nil %s", printStack())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bytes := result.ToBytes()
|
||||||
|
if len(bytes) == 0 {
|
||||||
|
t.Errorf("result is empty %s", printStack())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if bytes[0] == '-' {
|
||||||
|
t.Errorf("result is err reply %s", printStack())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func AssertNullBulk(t *testing.T, result redis.Reply) {
|
||||||
|
if result == nil {
|
||||||
|
t.Errorf("result is nil %s", printStack())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bytes := result.ToBytes()
|
||||||
|
if len(bytes) == 0 {
|
||||||
|
t.Errorf("result is empty %s", printStack())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
expect := (&reply.NullBulkReply{}).ToBytes()
|
||||||
|
if !utils.BytesEquals(expect, bytes) {
|
||||||
|
t.Errorf("result is not null-bulk-reply %s", printStack())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func AssertMultiBulkReply(t *testing.T, actual redis.Reply, expected []string) {
|
func AssertMultiBulkReply(t *testing.T, actual redis.Reply, expected []string) {
|
||||||
multiBulk, ok := actual.(*reply.MultiBulkReply)
|
multiBulk, ok := actual.(*reply.MultiBulkReply)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
Reference in New Issue
Block a user