From 974fc246178e06404ee20c28fca1966b388f2690 Mon Sep 17 00:00:00 2001 From: Kelvin Clement Mwinuka Date: Fri, 10 Nov 2023 22:42:23 +0800 Subject: [PATCH] Removed client subdirectory from monorepo into its own repository. Renamed "server" subdirectory to "src" as this entire derectory now only has server code. Removed "bin" directory to project root. Moved Dockerfile to project root. --- .gitignore | 1 + server/Dockerfile => Dockerfile | 4 +- server/Makefile => Makefile | 8 +- client/Makefile | 5 - client/config.go | 66 -------- client/main.go | 146 ----------------- client/utils.go | 155 ------------------ {server => src}/main.go | 8 +- {server => src}/memberlist.go | 2 +- {server => src}/mock.go | 0 .../plugins/commands/list/command.go | 2 +- .../plugins/commands/ping/command.go | 0 .../plugins/commands/ping/command_test.go | 2 +- .../plugins/commands/setget/command.go | 2 +- .../plugins/commands/setget/command_test.go | 2 +- {server => src}/pubsub.go | 12 +- {server => src}/raft.go | 5 +- {server => src}/utils/config.go | 0 {server => src}/utils/mock.go | 0 {server => src}/utils/types.go | 0 {server => src}/utils/utils.go | 0 21 files changed, 22 insertions(+), 398 deletions(-) rename server/Dockerfile => Dockerfile (83%) rename server/Makefile => Makefile (80%) delete mode 100644 client/Makefile delete mode 100644 client/config.go delete mode 100644 client/main.go delete mode 100644 client/utils.go rename {server => src}/main.go (97%) rename {server => src}/memberlist.go (98%) rename {server => src}/mock.go (100%) rename {server => src}/plugins/commands/list/command.go (99%) rename {server => src}/plugins/commands/ping/command.go (100%) rename {server => src}/plugins/commands/ping/command_test.go (95%) rename {server => src}/plugins/commands/setget/command.go (97%) rename {server => src}/plugins/commands/setget/command_test.go (97%) rename {server => src}/pubsub.go (96%) rename {server => src}/raft.go (97%) rename {server => src}/utils/config.go (100%) rename {server => src}/utils/mock.go (100%) rename {server => src}/utils/types.go (100%) rename {server => src}/utils/utils.go (100%) diff --git a/.gitignore b/.gitignore index 808df43..ff8beaa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.idea bin openssl config*.json diff --git a/server/Dockerfile b/Dockerfile similarity index 83% rename from server/Dockerfile rename to Dockerfile index 93c775b..b5a4395 100644 --- a/server/Dockerfile +++ b/Dockerfile @@ -4,8 +4,8 @@ RUN mkdir -p /usr/local/lib/memstore RUN mkdir -p /opt/memstore/bin RUN mkdir -p /etc/ssl/certs/memstore -COPY ./server/bin/linux/x86_64/plugins /usr/local/lib/memstore -COPY ./server/bin/linux/x86_64/server /opt/memstore/bin +COPY ./bin/linux/x86_64/plugins /usr/local/lib/memstore +COPY ./bin/linux/x86_64/server /opt/memstore/bin COPY ./openssl/server /etc/ssl/certs/memstore WORKDIR /opt/memstore/bin diff --git a/server/Makefile b/Makefile similarity index 80% rename from server/Makefile rename to Makefile index fbbe394..0e68537 100644 --- a/server/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ build-plugins: - CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_ping.so plugins/commands/ping/command.go - CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_setget.so plugins/commands/setget/command.go - CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_list.so plugins/commands/list/command.go + CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_ping.so ./src/plugins/commands/ping/command.go + CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_setget.so ./src/plugins/commands/setget/command.go + CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/commands/command_list.so ./src/plugins/commands/list/command.go build-server: - CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./*.go + CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./src/*.go build: env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64/plugins make build-plugins diff --git a/client/Makefile b/client/Makefile deleted file mode 100644 index fcab375..0000000 --- a/client/Makefile +++ /dev/null @@ -1,5 +0,0 @@ -build: - go build -o ./bin/client ./*.go - -run: - ./bin/client --config config.json \ No newline at end of file diff --git a/client/config.go b/client/config.go deleted file mode 100644 index fe1b8b9..0000000 --- a/client/config.go +++ /dev/null @@ -1,66 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "os" - "path" - - "gopkg.in/yaml.v3" -) - -type Config struct { - TLS bool `json:"tls" yaml:"tls"` - Key string `json:"key" yaml:"key"` - Cert string `json:"cert" yaml:"cert"` - Port uint16 `json:"port" yaml:"port"` - Addr string `json:"addr" yaml:"addr"` -} - -func GetConfig() Config { - // Shared - tls := flag.Bool("tls", false, "Start the server in TLS mode. Default is false") - key := flag.String("key", "", "The private key file path.") - cert := flag.String("cert", "", "The signed certificate file path.") - port := flag.Int("port", 7480, "Port to use. Default is 7480") - config := flag.String( - "config", - "", - `File path to a JSON or YAML config file.The values in this config file will override the flag values.`, - ) - addr := flag.String("addr", "127.0.0.1", "On client, this is the address of a server node to connect to.") - - flag.Parse() - - var conf Config - - if len(*config) > 0 { - // Load config from config file - if f, err := os.Open(*config); err != nil { - panic(err) - } else { - defer f.Close() - - ext := path.Ext(f.Name()) - - if ext == ".json" { - json.NewDecoder(f).Decode(&conf) - } - - if ext == ".yaml" || ext == ".yml" { - yaml.NewDecoder(f).Decode(&conf) - } - } - - } else { - conf = Config{ - TLS: *tls, - Key: *key, - Cert: *cert, - Addr: *addr, - Port: uint16(*port), - } - } - - return conf -} diff --git a/client/main.go b/client/main.go deleted file mode 100644 index 20436c7..0000000 --- a/client/main.go +++ /dev/null @@ -1,146 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "crypto/tls" - "crypto/x509" - "fmt" - "io" - "net" - "os" - "strings" -) - -func main() { - conf := GetConfig() - - var conn net.Conn - var err error - - if !conf.TLS { - fmt.Println("Starting client in TCP mode...") - - conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", conf.Addr, conf.Port)) - if err != nil { - panic(err) - } - } else { - // Dial TLS - fmt.Println("Starting client in TLS mode...") - - f, err := os.Open(conf.Cert) - - if err != nil { - panic(err) - } - - cert, err := io.ReadAll(bufio.NewReader(f)) - - if err != nil { - panic(err) - } - - rootCAs := x509.NewCertPool() - - ok := rootCAs.AppendCertsFromPEM(cert) - if !ok { - panic("Failed to parse certificate") - } - - conn, err = tls.Dial("tcp", fmt.Sprintf("%s:%d", conf.Addr, conf.Port), &tls.Config{ - RootCAs: rootCAs, - }) - - if err != nil { - panic(fmt.Sprintf("Handshake Error: %s", err.Error())) - } - } - - defer conn.Close() - - done := make(chan struct{}) - - connRW := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - stdioRW := bufio.NewReadWriter(bufio.NewReader(os.Stdin), bufio.NewWriter(os.Stdout)) - - go func() { - for { - stdioRW.Write([]byte("\n> ")) - stdioRW.Flush() - - if in, err := stdioRW.ReadBytes(byte('\n')); err != nil { - fmt.Println(err) - } else { - in := bytes.TrimSpace(in) - - // Check for quit command - if bytes.Equal(bytes.ToLower(in), []byte("quit")) { - break - } - - // Serialize command and send to connection - encoded, err := Encode(string(in)) - - if err != nil { - fmt.Println(err) - continue - } - - connRW.Write([]byte(encoded)) - connRW.Flush() - - // Read response from server - message, err := ReadMessage(connRW) - - if err != nil && err == io.EOF { - fmt.Println(err) - break - } else if err != nil { - fmt.Println(err) - } - - decoded, err := Decode(message) - - if err != nil { - fmt.Println(err) - continue - } - - fmt.Println(decoded) - - if strings.ToLower(decoded[0]) == "subscribed" { - // If we're subscribed to a channel, listen for messages from the channel - func() { - for { - var message string - - if msg, err := ReadMessage(connRW); err != nil { - if err == io.EOF { - return - } - fmt.Println(err) - continue - } else { - message = msg - } - - if decoded, err := Decode(message); err != nil { - fmt.Println(err) - continue - } else { - connRW.Write([]byte("+ACK\r\n\n")) - connRW.Flush() - - fmt.Println(decoded) - } - } - }() - } - } - } - done <- struct{}{} - }() - - <-done -} diff --git a/client/utils.go b/client/utils.go deleted file mode 100644 index 249754f..0000000 --- a/client/utils.go +++ /dev/null @@ -1,155 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "encoding/csv" - "errors" - "fmt" - "math" - "math/big" - "reflect" - "strings" - - "github.com/tidwall/resp" -) - -func Contains[T comparable](arr []T, elem T) bool { - for _, v := range arr { - if v == elem { - return true - } - } - return false -} - -func ContainsMutual[T comparable](arr1 []T, arr2 []T) (bool, T) { - for _, a := range arr1 { - for _, b := range arr2 { - if a == b { - return true, a - } - } - } - return false, arr1[0] -} - -func IsInteger(n float64) bool { - return math.Mod(n, 1.0) == 0 -} - -func AdaptType(s string) interface{} { - // Adapt the type of the parameter to string, float64 or int - n, _, err := big.ParseFloat(s, 10, 256, big.RoundingMode(big.Exact)) - - if err != nil { - return s - } - - if n.IsInt() { - i, _ := n.Int64() - return i - } - - return n -} - -func IncrBy(num interface{}, by interface{}) (interface{}, error) { - if !Contains[string]([]string{"int", "float64"}, reflect.TypeOf(num).String()) { - return nil, errors.New("can only increment number") - } - if !Contains[string]([]string{"int", "float64"}, reflect.TypeOf(by).String()) { - return nil, errors.New("can only increment by number") - } - - n, _ := num.(float64) - b, _ := by.(float64) - res := n + b - - if IsInteger(res) { - return int(res), nil - } - - return res, nil -} - -func Filter[T comparable](arr []T, test func(elem T) bool) (res []T) { - for _, e := range arr { - if test(e) { - res = append(res, e) - } - } - return -} - -func tokenize(comm string) ([]string, error) { - r := csv.NewReader(strings.NewReader(comm)) - r.Comma = ' ' - return r.Read() -} - -func Encode(comm string) (string, error) { - tokens, err := tokenize(comm) - - if err != nil { - return "", errors.New("could not parse command") - } - - str := fmt.Sprintf("*%d\r\n", len(tokens)) - - for i, token := range tokens { - if i == 0 { - str += fmt.Sprintf("$%d\r\n%s\r\n", len(token), strings.ToUpper(token)) - } else { - str += fmt.Sprintf("$%d\r\n%s\r\n", len(token), token) - } - } - - str += "\n" - - return str, nil -} - -func Decode(raw string) ([]string, error) { - rd := resp.NewReader(bytes.NewBufferString(raw)) - res := []string{} - - v, _, err := rd.ReadValue() - - if err != nil { - return nil, err - } - - if Contains[string]([]string{"SimpleString", "BulkString", "Integer", "Error"}, v.Type().String()) { - return []string{v.String()}, nil - } - - if v.Type().String() == "Array" { - for _, elem := range v.Array() { - res = append(res, elem.String()) - } - } - - return res, nil -} - -func ReadMessage(r *bufio.ReadWriter) (message string, err error) { - var line [][]byte - - for { - b, _, err := r.ReadLine() - - if err != nil { - return "", err - } - - if bytes.Equal(b, []byte("")) { - // End of message - break - } - - line = append(line, b) - } - - return fmt.Sprintf("%s\r\n", string(bytes.Join(line, []byte("\r\n")))), nil -} diff --git a/server/main.go b/src/main.go similarity index 97% rename from server/main.go rename to src/main.go index 2d1f548..9c38e31 100644 --- a/server/main.go +++ b/src/main.go @@ -20,7 +20,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) type Plugin interface { @@ -97,7 +97,7 @@ func (server *Server) handleConnection(conn net.Conn) { continue } - connRW.Write([]byte("+SUBSCRIBED\r\n\n")) + connRW.Write([]byte("+OK\r\n\n")) connRW.Flush() continue } @@ -115,7 +115,7 @@ func (server *Server) handleConnection(conn net.Conn) { continue } - connRW.Write([]byte(":1\r\n\n")) + connRW.Write([]byte("+OK\r\n\n")) connRW.Flush() continue } @@ -155,6 +155,8 @@ func (server *Server) handleConnection(conn net.Conn) { connRW.Write(r.Response) connRW.Flush() + + // TODO: Add command to AOF } else { // TODO: Forward message to leader and wait for a response connRW.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\n")) diff --git a/server/memberlist.go b/src/memberlist.go similarity index 98% rename from server/memberlist.go rename to src/memberlist.go index 683ab8c..190f196 100644 --- a/server/memberlist.go +++ b/src/memberlist.go @@ -9,7 +9,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" "github.com/sethvargo/go-retry" ) diff --git a/server/mock.go b/src/mock.go similarity index 100% rename from server/mock.go rename to src/mock.go diff --git a/server/plugins/commands/list/command.go b/src/plugins/commands/list/command.go similarity index 99% rename from server/plugins/commands/list/command.go rename to src/plugins/commands/list/command.go index e4d3049..f3913d8 100644 --- a/server/plugins/commands/list/command.go +++ b/src/plugins/commands/list/command.go @@ -6,7 +6,7 @@ import ( "math" "strings" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) const ( diff --git a/server/plugins/commands/ping/command.go b/src/plugins/commands/ping/command.go similarity index 100% rename from server/plugins/commands/ping/command.go rename to src/plugins/commands/ping/command.go diff --git a/server/plugins/commands/ping/command_test.go b/src/plugins/commands/ping/command_test.go similarity index 95% rename from server/plugins/commands/ping/command_test.go rename to src/plugins/commands/ping/command_test.go index 2a960f7..515da66 100644 --- a/server/plugins/commands/ping/command_test.go +++ b/src/plugins/commands/ping/command_test.go @@ -6,7 +6,7 @@ import ( "strings" "testing" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) func TestHandleCommandSuccess(t *testing.T) { diff --git a/server/plugins/commands/setget/command.go b/src/plugins/commands/setget/command.go similarity index 97% rename from server/plugins/commands/setget/command.go rename to src/plugins/commands/setget/command.go index 113f5d7..6ddc493 100644 --- a/server/plugins/commands/setget/command.go +++ b/src/plugins/commands/setget/command.go @@ -5,7 +5,7 @@ import ( "fmt" "strings" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) type Server interface { diff --git a/server/plugins/commands/setget/command_test.go b/src/plugins/commands/setget/command_test.go similarity index 97% rename from server/plugins/commands/setget/command_test.go rename to src/plugins/commands/setget/command_test.go index 3c72421..99b20e3 100644 --- a/server/plugins/commands/setget/command_test.go +++ b/src/plugins/commands/setget/command_test.go @@ -6,7 +6,7 @@ import ( "strings" "testing" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) const ( diff --git a/server/pubsub.go b/src/pubsub.go similarity index 96% rename from server/pubsub.go rename to src/pubsub.go index ee0d8dc..dd214dd 100644 --- a/server/pubsub.go +++ b/src/pubsub.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) // Consumer group allows multiple subscribers to share the consumption load of a channel. @@ -98,8 +98,6 @@ func (cg *ConsumerGroup) Unsubscribe(conn *net.Conn) { cg.subscribersRWMut.Lock() defer cg.subscribersRWMut.Unlock() - curr := cg.subscribers.Value - for i := 0; i < cg.subscribers.Len(); i++ { if cg.subscribers.Value == conn { cg.subscribers = cg.subscribers.Prev() @@ -108,14 +106,6 @@ func (cg *ConsumerGroup) Unsubscribe(conn *net.Conn) { } cg.subscribers = cg.subscribers.Next() } - - // Restore the pointer back to the previous location - for i := 0; i < cg.subscribers.Len(); i++ { - if cg.subscribers.Value == curr { - return - } - cg.subscribers = cg.subscribers.Next() - } } func (cg *ConsumerGroup) Publish(message string) { diff --git a/server/raft.go b/src/raft.go similarity index 97% rename from server/raft.go rename to src/raft.go index a195ae6..29c1545 100644 --- a/server/raft.go +++ b/src/raft.go @@ -15,7 +15,7 @@ import ( "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" - "github.com/kelvinmwinuka/memstore/server/utils" + "github.com/kelvinmwinuka/memstore/src/utils" ) func (server *Server) RaftInit() { @@ -87,6 +87,7 @@ func (server *Server) RaftInit() { server.raft = raftServer + // TODO: Only bootstrap cluster if --bootstrapCluster=true config is set if conf.JoinAddr == "" { // Bootstrap raft cluster if err := server.raft.BootstrapCluster(raft.Configuration{ @@ -212,6 +213,8 @@ func (server *Server) Persist(sink raft.SnapshotSink) error { return err } + // TODO: Store data in separate snapshot file + return nil } diff --git a/server/utils/config.go b/src/utils/config.go similarity index 100% rename from server/utils/config.go rename to src/utils/config.go diff --git a/server/utils/mock.go b/src/utils/mock.go similarity index 100% rename from server/utils/mock.go rename to src/utils/mock.go diff --git a/server/utils/types.go b/src/utils/types.go similarity index 100% rename from server/utils/types.go rename to src/utils/types.go diff --git a/server/utils/utils.go b/src/utils/utils.go similarity index 100% rename from server/utils/utils.go rename to src/utils/utils.go