Added happy path test case for data replication

This commit is contained in:
Kelvin Clement Mwinuka
2024-05-22 16:11:00 +08:00
parent 7e585dc256
commit f894a531b0
9 changed files with 1590 additions and 1403 deletions

View File

@@ -27,7 +27,7 @@ jobs:
run: go build -v ./cmd/main.go
- name: Test
run: make test-unit
run: make test
- name: Test for Data Race
run: make test-race

View File

@@ -17,7 +17,7 @@ run:
make build && \
docker-compose up --build
test-unit:
test:
env RACE=false OUT=internal/modules/admin/testdata make build-modules-test && \
env RACE=false OUT=echovault/testdata make build-modules-test && \
go clean -testcache && \

File diff suppressed because it is too large Load Diff

View File

@@ -63,9 +63,9 @@ services:
- RAFT_PORT=8000
- ML_PORT=7946
- SERVER_ID=1
- JOIN_ADDR=cluster_node_2:7946
- JOIN_ADDR=2/cluster_node_2:7946
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- IN_MEMORY=true
- TLS=false
- MTLS=false
- BOOTSTRAP_CLUSTER=true
@@ -110,9 +110,9 @@ services:
- RAFT_PORT=8000
- ML_PORT=7946
- SERVER_ID=2
- JOIN_ADDR=cluster_node_3:7946
- JOIN_ADDR=3/cluster_node_3:7946
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- IN_MEMORY=true
- TLS=false
- MTLS=false
- BOOTSTRAP_CLUSTER=false
@@ -157,9 +157,9 @@ services:
- RAFT_PORT=8000
- ML_PORT=7946
- SERVER_ID=3
- JOIN_ADDR=cluster_node_4:7946
- JOIN_ADDR=4/cluster_node_4:7946
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- IN_MEMORY=true
- TLS=false
- MTLS=false
- BOOTSTRAP_CLUSTER=false
@@ -204,9 +204,9 @@ services:
- RAFT_PORT=8000
- ML_PORT=7946
- SERVER_ID=4
- JOIN_ADDR=cluster_node_5:7946
- JOIN_ADDR=5/cluster_node_5:7946
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- IN_MEMORY=true
- TLS=false
- MTLS=false
- BOOTSTRAP_CLUSTER=false
@@ -251,9 +251,9 @@ services:
- RAFT_PORT=8000
- ML_PORT=7946
- SERVER_ID=5
- JOIN_ADDR=cluster_node_1:7946
- JOIN_ADDR=1/cluster_node_1:7946
- DATA_DIR=/var/lib/echovault
- IN_MEMORY=false
- IN_MEMORY=true
- TLS=false
- MTLS=false
- BOOTSTRAP_CLUSTER=false

View File

@@ -15,7 +15,6 @@
package echovault
import (
"fmt"
"github.com/echovault/echovault/internal"
"strconv"
"strings"
@@ -34,7 +33,6 @@ import (
// "LLen command on non-list item" - when the provided key exists but is not a list.
func (server *EchoVault) LLen(key string) (int, error) {
b, err := server.handleCommand(server.context, internal.EncodeCommand([]string{"LLEN", key}), nil, false, true)
fmt.Println(key, string(b), err)
if err != nil {
return 0, err
}

188
echovault/echovault_test.go Normal file
View File

@@ -0,0 +1,188 @@
package echovault
import (
"fmt"
"github.com/echovault/echovault/internal"
"github.com/tidwall/resp"
"net"
"strings"
"sync"
"testing"
"time"
)
type ClientServerPair struct {
serverId string
bindAddr string
port int
raftPort int
mlPort int
bootstrapCluster bool
client *resp.Conn
server *EchoVault
}
var bindLock sync.Mutex
var bindNum byte = 10
func getBindAddrNet(network byte) net.IP {
bindLock.Lock()
defer bindLock.Unlock()
result := net.IPv4(127, 0, network, bindNum)
bindNum++
if bindNum > 255 {
bindNum = 10
}
return result
}
func getBindAddr() net.IP {
return getBindAddrNet(0)
}
func setupServer(
serverId string,
bootstrapCluster bool,
bindAddr,
joinAddr string,
port,
raftPort,
mlPort int,
) (*EchoVault, error) {
config := DefaultConfig()
config.DataDir = "./testdata"
config.BindAddr = bindAddr
config.JoinAddr = joinAddr
config.Port = uint16(port)
config.InMemory = true
config.ServerID = serverId
config.RaftBindPort = uint16(raftPort)
config.MemberListBindPort = uint16(mlPort)
config.BootstrapCluster = bootstrapCluster
return NewEchoVault(WithConfig(config))
}
func Test_ClusterReplication(t *testing.T) {
pairs := make([]ClientServerPair, 3)
for i := 0; i < len(pairs); i++ {
serverId := fmt.Sprintf("SERVER-%d", i)
bindAddr := getBindAddr().String()
bootstrapCluster := i == 0
joinAddr := ""
if !bootstrapCluster {
joinAddr = fmt.Sprintf("%s/%s:%d", pairs[0].serverId, pairs[0].bindAddr, pairs[0].mlPort)
}
port, err := internal.GetFreePort()
if err != nil {
t.Errorf("could not get free port: %v", err)
}
raftPort, err := internal.GetFreePort()
if err != nil {
t.Errorf("could not get free raft port: %v", err)
}
memberlistPort, err := internal.GetFreePort()
if err != nil {
t.Errorf("could not get free memberlist port: %v", err)
}
server, err := setupServer(serverId, bootstrapCluster, bindAddr, joinAddr, port, raftPort, memberlistPort)
if err != nil {
t.Errorf("could not start server; %v", err)
}
// Start the server
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
server.Start()
}()
<-time.After(5 * time.Second)
// Setup client connection
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", bindAddr, port))
if err != nil {
t.Errorf("could not open tcp connection: %v", err)
}
for {
// Wait until connection is no longer nil
if conn != nil {
break
}
}
client := resp.NewConn(conn)
pairs[i] = ClientServerPair{
serverId: serverId,
bindAddr: bindAddr,
port: port,
raftPort: raftPort,
mlPort: memberlistPort,
bootstrapCluster: bootstrapCluster,
client: client,
server: server,
}
}
// Prepare the write data for the cluster
tests := []struct {
key string
value string
}{
{
key: "key1",
value: "value1",
},
{
key: "key2",
value: "value2",
},
{
key: "key3",
value: "value3",
},
}
// Write all the data to the cluster leader
for i, test := range tests {
node := pairs[0]
if err := node.client.WriteArray([]resp.Value{
resp.StringValue("SET"),
resp.StringValue(test.key),
resp.StringValue(test.value),
}); err != nil {
t.Errorf("could not write data to leader node (test %d): %v", i, err)
}
// Read response and make sure we received "ok" response
rd, _, err := node.client.ReadValue()
if err != nil {
t.Errorf("could not read response from leader node (test %d): %v", i, err)
}
if !strings.EqualFold(rd.String(), "ok") {
t.Errorf("expected response for test %d to be \"OK\", got %s", i, rd.String())
}
}
// On each of the follower nodes, get the values and check if they have been replicated
for i, test := range tests {
for j := 1; j < len(pairs); j++ {
node := pairs[i]
if err := node.client.WriteArray([]resp.Value{
resp.StringValue("GET"),
resp.StringValue(test.key),
}); err != nil {
t.Errorf("could not write data to follower node %d (test %d): %v", j, i, err)
}
rd, _, err := node.client.ReadValue()
if err != nil {
t.Errorf("could not read data from follower node %d (test %d): %v", j, i, err)
}
if rd.String() != test.value {
t.Errorf("exptected value \"%s\" for follower node %d (test %d), got \"%s\"", test.value, j, i, rd.String())
}
}
}
}

View File

@@ -307,7 +307,7 @@ func (server *EchoVault) DeleteKey(ctx context.Context, key string) error {
server.lruCache.cache.Delete(key)
}
log.Printf("deleted key %s\n", key)
// log.Printf("deleted key %s\n", key) // TODO: Uncomment this
return nil
}
@@ -575,7 +575,7 @@ func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error {
// If sampleSize is 0, there's no need to calculate deleted percentage.
if sampleSize == 0 {
log.Println("no keys to sample, skipping eviction")
// log.Println("no keys to sample, skipping eviction") // TODO: Uncomment this
return nil
}

View File

@@ -60,7 +60,9 @@ func NewMemberList(opts Opts) *MemberList {
}
func (m *MemberList) MemberListInit(ctx context.Context) {
cfg := memberlist.DefaultLocalConfig()
cfg := memberlist.DefaultLANConfig()
cfg.RequireNodeNames = true
cfg.Name = m.options.Config.ServerID
cfg.BindAddr = m.options.Config.BindAddr
cfg.BindPort = int(m.options.Config.MemberListBindPort)
cfg.Delegate = NewDelegate(DelegateOpts{

View File

@@ -19,7 +19,6 @@ import (
"fmt"
"github.com/echovault/echovault/internal"
"github.com/echovault/echovault/internal/constants"
"log"
"strconv"
"strings"
"time"
@@ -233,7 +232,7 @@ func handleDel(params internal.HandlerFuncParams) ([]byte, error) {
for _, key := range keys.WriteKeys {
err = params.DeleteKey(params.Context, key)
if err != nil {
log.Printf("could not delete key %s due to error: %+v\n", key, err)
// log.Printf("could not delete key %s due to error: %+v\n", key, err) // TODO: Uncomment this
continue
}
count += 1