Removed client subdirectory from monorepo into its own repository.

Renamed "server" subdirectory to "src" as this entire derectory now only has server code.
Removed "bin" directory to project root.
Moved Dockerfile to project root.
This commit is contained in:
Kelvin Clement Mwinuka
2023-11-10 22:42:23 +08:00
parent e505361ef6
commit 974fc24617
21 changed files with 22 additions and 398 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.idea
bin bin
openssl openssl
config*.json config*.json

View File

@@ -4,8 +4,8 @@ RUN mkdir -p /usr/local/lib/memstore
RUN mkdir -p /opt/memstore/bin RUN mkdir -p /opt/memstore/bin
RUN mkdir -p /etc/ssl/certs/memstore RUN mkdir -p /etc/ssl/certs/memstore
COPY ./server/bin/linux/x86_64/plugins /usr/local/lib/memstore COPY ./bin/linux/x86_64/plugins /usr/local/lib/memstore
COPY ./server/bin/linux/x86_64/server /opt/memstore/bin COPY ./bin/linux/x86_64/server /opt/memstore/bin
COPY ./openssl/server /etc/ssl/certs/memstore COPY ./openssl/server /etc/ssl/certs/memstore
WORKDIR /opt/memstore/bin WORKDIR /opt/memstore/bin

View File

@@ -1,10 +1,10 @@
build-plugins: build-plugins:
CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_ping.so plugins/commands/ping/command.go CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_ping.so ./src/plugins/commands/ping/command.go
CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_setget.so plugins/commands/setget/command.go CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_setget.so ./src/plugins/commands/setget/command.go
CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_list.so plugins/commands/list/command.go CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_list.so ./src/plugins/commands/list/command.go
build-server: build-server:
CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./*.go CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./src/*.go
build: build:
env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64/plugins make build-plugins env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64/plugins make build-plugins

View File

@@ -1,5 +0,0 @@
build:
go build -o ./bin/client ./*.go
run:
./bin/client --config config.json

View File

@@ -1,66 +0,0 @@
package main
import (
"encoding/json"
"flag"
"os"
"path"
"gopkg.in/yaml.v3"
)
type Config struct {
TLS bool `json:"tls" yaml:"tls"`
Key string `json:"key" yaml:"key"`
Cert string `json:"cert" yaml:"cert"`
Port uint16 `json:"port" yaml:"port"`
Addr string `json:"addr" yaml:"addr"`
}
func GetConfig() Config {
// Shared
tls := flag.Bool("tls", false, "Start the server in TLS mode. Default is false")
key := flag.String("key", "", "The private key file path.")
cert := flag.String("cert", "", "The signed certificate file path.")
port := flag.Int("port", 7480, "Port to use. Default is 7480")
config := flag.String(
"config",
"",
`File path to a JSON or YAML config file.The values in this config file will override the flag values.`,
)
addr := flag.String("addr", "127.0.0.1", "On client, this is the address of a server node to connect to.")
flag.Parse()
var conf Config
if len(*config) > 0 {
// Load config from config file
if f, err := os.Open(*config); err != nil {
panic(err)
} else {
defer f.Close()
ext := path.Ext(f.Name())
if ext == ".json" {
json.NewDecoder(f).Decode(&conf)
}
if ext == ".yaml" || ext == ".yml" {
yaml.NewDecoder(f).Decode(&conf)
}
}
} else {
conf = Config{
TLS: *tls,
Key: *key,
Cert: *cert,
Addr: *addr,
Port: uint16(*port),
}
}
return conf
}

View File

@@ -1,146 +0,0 @@
package main
import (
"bufio"
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"os"
"strings"
)
func main() {
conf := GetConfig()
var conn net.Conn
var err error
if !conf.TLS {
fmt.Println("Starting client in TCP mode...")
conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", conf.Addr, conf.Port))
if err != nil {
panic(err)
}
} else {
// Dial TLS
fmt.Println("Starting client in TLS mode...")
f, err := os.Open(conf.Cert)
if err != nil {
panic(err)
}
cert, err := io.ReadAll(bufio.NewReader(f))
if err != nil {
panic(err)
}
rootCAs := x509.NewCertPool()
ok := rootCAs.AppendCertsFromPEM(cert)
if !ok {
panic("Failed to parse certificate")
}
conn, err = tls.Dial("tcp", fmt.Sprintf("%s:%d", conf.Addr, conf.Port), &tls.Config{
RootCAs: rootCAs,
})
if err != nil {
panic(fmt.Sprintf("Handshake Error: %s", err.Error()))
}
}
defer conn.Close()
done := make(chan struct{})
connRW := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
stdioRW := bufio.NewReadWriter(bufio.NewReader(os.Stdin), bufio.NewWriter(os.Stdout))
go func() {
for {
stdioRW.Write([]byte("\n> "))
stdioRW.Flush()
if in, err := stdioRW.ReadBytes(byte('\n')); err != nil {
fmt.Println(err)
} else {
in := bytes.TrimSpace(in)
// Check for quit command
if bytes.Equal(bytes.ToLower(in), []byte("quit")) {
break
}
// Serialize command and send to connection
encoded, err := Encode(string(in))
if err != nil {
fmt.Println(err)
continue
}
connRW.Write([]byte(encoded))
connRW.Flush()
// Read response from server
message, err := ReadMessage(connRW)
if err != nil && err == io.EOF {
fmt.Println(err)
break
} else if err != nil {
fmt.Println(err)
}
decoded, err := Decode(message)
if err != nil {
fmt.Println(err)
continue
}
fmt.Println(decoded)
if strings.ToLower(decoded[0]) == "subscribed" {
// If we're subscribed to a channel, listen for messages from the channel
func() {
for {
var message string
if msg, err := ReadMessage(connRW); err != nil {
if err == io.EOF {
return
}
fmt.Println(err)
continue
} else {
message = msg
}
if decoded, err := Decode(message); err != nil {
fmt.Println(err)
continue
} else {
connRW.Write([]byte("+ACK\r\n\n"))
connRW.Flush()
fmt.Println(decoded)
}
}
}()
}
}
}
done <- struct{}{}
}()
<-done
}

View File

@@ -1,155 +0,0 @@
package main
import (
"bufio"
"bytes"
"encoding/csv"
"errors"
"fmt"
"math"
"math/big"
"reflect"
"strings"
"github.com/tidwall/resp"
)
func Contains[T comparable](arr []T, elem T) bool {
for _, v := range arr {
if v == elem {
return true
}
}
return false
}
func ContainsMutual[T comparable](arr1 []T, arr2 []T) (bool, T) {
for _, a := range arr1 {
for _, b := range arr2 {
if a == b {
return true, a
}
}
}
return false, arr1[0]
}
func IsInteger(n float64) bool {
return math.Mod(n, 1.0) == 0
}
func AdaptType(s string) interface{} {
// Adapt the type of the parameter to string, float64 or int
n, _, err := big.ParseFloat(s, 10, 256, big.RoundingMode(big.Exact))
if err != nil {
return s
}
if n.IsInt() {
i, _ := n.Int64()
return i
}
return n
}
func IncrBy(num interface{}, by interface{}) (interface{}, error) {
if !Contains[string]([]string{"int", "float64"}, reflect.TypeOf(num).String()) {
return nil, errors.New("can only increment number")
}
if !Contains[string]([]string{"int", "float64"}, reflect.TypeOf(by).String()) {
return nil, errors.New("can only increment by number")
}
n, _ := num.(float64)
b, _ := by.(float64)
res := n + b
if IsInteger(res) {
return int(res), nil
}
return res, nil
}
func Filter[T comparable](arr []T, test func(elem T) bool) (res []T) {
for _, e := range arr {
if test(e) {
res = append(res, e)
}
}
return
}
func tokenize(comm string) ([]string, error) {
r := csv.NewReader(strings.NewReader(comm))
r.Comma = ' '
return r.Read()
}
func Encode(comm string) (string, error) {
tokens, err := tokenize(comm)
if err != nil {
return "", errors.New("could not parse command")
}
str := fmt.Sprintf("*%d\r\n", len(tokens))
for i, token := range tokens {
if i == 0 {
str += fmt.Sprintf("$%d\r\n%s\r\n", len(token), strings.ToUpper(token))
} else {
str += fmt.Sprintf("$%d\r\n%s\r\n", len(token), token)
}
}
str += "\n"
return str, nil
}
func Decode(raw string) ([]string, error) {
rd := resp.NewReader(bytes.NewBufferString(raw))
res := []string{}
v, _, err := rd.ReadValue()
if err != nil {
return nil, err
}
if Contains[string]([]string{"SimpleString", "BulkString", "Integer", "Error"}, v.Type().String()) {
return []string{v.String()}, nil
}
if v.Type().String() == "Array" {
for _, elem := range v.Array() {
res = append(res, elem.String())
}
}
return res, nil
}
func ReadMessage(r *bufio.ReadWriter) (message string, err error) {
var line [][]byte
for {
b, _, err := r.ReadLine()
if err != nil {
return "", err
}
if bytes.Equal(b, []byte("")) {
// End of message
break
}
line = append(line, b)
}
return fmt.Sprintf("%s\r\n", string(bytes.Join(line, []byte("\r\n")))), nil
}

View File

@@ -20,7 +20,7 @@ import (
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
type Plugin interface { type Plugin interface {
@@ -97,7 +97,7 @@ func (server *Server) handleConnection(conn net.Conn) {
continue continue
} }
connRW.Write([]byte("+SUBSCRIBED\r\n\n")) connRW.Write([]byte("+OK\r\n\n"))
connRW.Flush() connRW.Flush()
continue continue
} }
@@ -115,7 +115,7 @@ func (server *Server) handleConnection(conn net.Conn) {
continue continue
} }
connRW.Write([]byte(":1\r\n\n")) connRW.Write([]byte("+OK\r\n\n"))
connRW.Flush() connRW.Flush()
continue continue
} }
@@ -155,6 +155,8 @@ func (server *Server) handleConnection(conn net.Conn) {
connRW.Write(r.Response) connRW.Write(r.Response)
connRW.Flush() connRW.Flush()
// TODO: Add command to AOF
} else { } else {
// TODO: Forward message to leader and wait for a response // TODO: Forward message to leader and wait for a response
connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n")) connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n"))

View File

@@ -9,7 +9,7 @@ import (
"github.com/hashicorp/memberlist" "github.com/hashicorp/memberlist"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
"github.com/sethvargo/go-retry" "github.com/sethvargo/go-retry"
) )

View File

@@ -6,7 +6,7 @@ import (
"math" "math"
"strings" "strings"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
const ( const (

View File

@@ -6,7 +6,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
func TestHandleCommandSuccess(t *testing.T) { func TestHandleCommandSuccess(t *testing.T) {

View File

@@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
type Server interface { type Server interface {

View File

@@ -6,7 +6,7 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
const ( const (

View File

@@ -9,7 +9,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
// Consumer group allows multiple subscribers to share the consumption load of a channel. // Consumer group allows multiple subscribers to share the consumption load of a channel.
@@ -98,8 +98,6 @@ func (cg *ConsumerGroup) Unsubscribe(conn *net.Conn) {
cg.subscribersRWMut.Lock() cg.subscribersRWMut.Lock()
defer cg.subscribersRWMut.Unlock() defer cg.subscribersRWMut.Unlock()
curr := cg.subscribers.Value
for i := 0; i < cg.subscribers.Len(); i++ { for i := 0; i < cg.subscribers.Len(); i++ {
if cg.subscribers.Value == conn { if cg.subscribers.Value == conn {
cg.subscribers = cg.subscribers.Prev() cg.subscribers = cg.subscribers.Prev()
@@ -108,14 +106,6 @@ func (cg *ConsumerGroup) Unsubscribe(conn *net.Conn) {
} }
cg.subscribers = cg.subscribers.Next() cg.subscribers = cg.subscribers.Next()
} }
// Restore the pointer back to the previous location
for i := 0; i < cg.subscribers.Len(); i++ {
if cg.subscribers.Value == curr {
return
}
cg.subscribers = cg.subscribers.Next()
}
} }
func (cg *ConsumerGroup) Publish(message string) { func (cg *ConsumerGroup) Publish(message string) {

View File

@@ -15,7 +15,7 @@ import (
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb" raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/kelvinmwinuka/memstore/server/utils" "github.com/kelvinmwinuka/memstore/src/utils"
) )
func (server *Server) RaftInit() { func (server *Server) RaftInit() {
@@ -87,6 +87,7 @@ func (server *Server) RaftInit() {
server.raft = raftServer server.raft = raftServer
// TODO: Only bootstrap cluster if --bootstrapCluster=true config is set
if conf.JoinAddr == "" { if conf.JoinAddr == "" {
// Bootstrap raft cluster // Bootstrap raft cluster
if err := server.raft.BootstrapCluster(raft.Configuration{ if err := server.raft.BootstrapCluster(raft.Configuration{
@@ -212,6 +213,8 @@ func (server *Server) Persist(sink raft.SnapshotSink) error {
return err return err
} }
// TODO: Store data in separate snapshot file
return nil return nil
} }