From e7c7cf1553e54d4e67233209de19fa7abece6ac2 Mon Sep 17 00:00:00 2001 From: Kelvin Clement Mwinuka Date: Wed, 13 Dec 2023 01:56:55 +0800 Subject: [PATCH] Deleted plugin folder. Removed nested plugin in command list for modules. Moved PubSub feature from plugin folder to modules folder. --- Makefile | 4 - src/main.go | 10 +- src/modules/acl/commands.go | 14 +-- src/modules/ping/commands.go | 2 - src/modules/pubsub/commands.go | 133 +++++++++++++++++++++ src/{plugins => modules}/pubsub/pubsub.go | 23 ++-- src/modules/string/commands.go | 4 - src/plugins/pubsub/command.go | 139 ---------------------- src/plugins/pubsub/utils.go | 46 ------- 9 files changed, 155 insertions(+), 220 deletions(-) rename src/{plugins => modules}/pubsub/pubsub.go (90%) delete mode 100644 src/plugins/pubsub/command.go delete mode 100644 src/plugins/pubsub/utils.go diff --git a/Makefile b/Makefile index 376b862..8dbf496 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,5 @@ -build-plugins: - CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/command_pubsub.so ./src/plugins/pubsub/*.go - build-server: CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./src/*.go build: - env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64/plugins make build-plugins env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64 make build-server diff --git a/src/main.go b/src/main.go index e0f82cb..ed80650 100644 --- a/src/main.go +++ b/src/main.go @@ -10,6 +10,7 @@ import ( "github.com/kelvinmwinuka/memstore/src/modules/get" "github.com/kelvinmwinuka/memstore/src/modules/list" "github.com/kelvinmwinuka/memstore/src/modules/ping" + "github.com/kelvinmwinuka/memstore/src/modules/pubsub" "github.com/kelvinmwinuka/memstore/src/modules/set" str "github.com/kelvinmwinuka/memstore/src/modules/string" "io" @@ -48,7 +49,8 @@ type Server struct { cancelCh *chan os.Signal - ACL *acl.ACL + ACL *acl.ACL + PubSub *pubsub.PubSub } func (server *Server) KeyLock(ctx context.Context, key string) (bool, error) { @@ -305,8 +307,9 @@ func (server *Server) StartHTTP(ctx context.Context) { } func (server *Server) LoadModules(ctx context.Context) { - server.commands = append(server.commands, ping.NewModule().GetCommands()...) server.commands = append(server.commands, acl.NewModule(server.ACL).GetCommands()...) + server.commands = append(server.commands, pubsub.NewModule(server.PubSub).GetCommands()...) + server.commands = append(server.commands, ping.NewModule().GetCommands()...) server.commands = append(server.commands, set.NewModule().GetCommands()...) server.commands = append(server.commands, str.NewModule().GetCommands()...) server.commands = append(server.commands, get.NewModule().GetCommands()...) @@ -379,7 +382,8 @@ func main() { broadcastQueue: new(memberlist.TransmitLimitedQueue), numOfNodes: 0, - ACL: acl.NewACL(config), + ACL: acl.NewACL(config), + PubSub: pubsub.NewPubSub(), cancelCh: &cancelCh, } diff --git a/src/modules/acl/commands.go b/src/modules/acl/commands.go index ed921ec..15824de 100644 --- a/src/modules/acl/commands.go +++ b/src/modules/acl/commands.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/kelvinmwinuka/memstore/src/utils" "net" "strings" @@ -119,6 +120,7 @@ func (p Plugin) handleLoad(ctx context.Context, cmd []string, server utils.Serve } func (p Plugin) handleSave(ctx context.Context, cmd []string, server utils.Server) ([]byte, error) { + fmt.Println(p.acl) return nil, errors.New("ACL SAVE not implemented") } @@ -133,7 +135,6 @@ func NewModule(acl *ACL) Plugin { Description: "List all the categories and commands inside a category", HandleWithConnection: false, Sync: false, - Plugin: ACLPlugin, }, { Command: "cat", @@ -141,7 +142,6 @@ func NewModule(acl *ACL) Plugin { Description: "List all the categories and commands inside a category", HandleWithConnection: false, Sync: false, - Plugin: ACLPlugin, }, { Command: "auth", @@ -149,7 +149,6 @@ func NewModule(acl *ACL) Plugin { Description: "Authenticates the connection", HandleWithConnection: true, Sync: false, - Plugin: ACLPlugin, }, { Command: "users", @@ -157,7 +156,6 @@ func NewModule(acl *ACL) Plugin { Description: "List all ACL users", HandleWithConnection: false, Sync: false, - Plugin: ACLPlugin, }, { Command: "setuser", @@ -165,7 +163,6 @@ func NewModule(acl *ACL) Plugin { Description: "Configure a new or existing user", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, { Command: "getuser", @@ -173,7 +170,6 @@ func NewModule(acl *ACL) Plugin { Description: "List the ACL rules of a user", HandleWithConnection: true, Sync: false, - Plugin: ACLPlugin, }, { Command: "deluser", @@ -181,7 +177,6 @@ func NewModule(acl *ACL) Plugin { Description: "Deletes users and terminates their connections", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, { Command: "whoami", @@ -189,7 +184,6 @@ func NewModule(acl *ACL) Plugin { Description: "Returns the authenticated user of the current connection", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, { @@ -198,7 +192,6 @@ func NewModule(acl *ACL) Plugin { Description: "Generates a password that can be used to identify a user", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, { Command: "list", @@ -206,7 +199,6 @@ func NewModule(acl *ACL) Plugin { Description: "Dumps effective acl rules in acl config file format", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, { Command: "load", @@ -214,7 +206,6 @@ func NewModule(acl *ACL) Plugin { Description: "Reloads the rules from the configured ACL config file", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, { Command: "save", @@ -222,7 +213,6 @@ func NewModule(acl *ACL) Plugin { Description: "Saves the effective ACL rules the configured ACL config file", HandleWithConnection: false, Sync: true, - Plugin: ACLPlugin, }, }, description: "Internal plugin to handle ACL commands", diff --git a/src/modules/ping/commands.go b/src/modules/ping/commands.go index cf734be..288e45f 100644 --- a/src/modules/ping/commands.go +++ b/src/modules/ping/commands.go @@ -73,7 +73,6 @@ func NewModule() Plugin { Description: "", HandleWithConnection: false, Sync: false, - Plugin: PingModule, }, { Command: "ack", @@ -81,7 +80,6 @@ func NewModule() Plugin { Description: "", HandleWithConnection: false, Sync: false, - Plugin: PingModule, }, }, description: "Handle PING command", diff --git a/src/modules/pubsub/commands.go b/src/modules/pubsub/commands.go index bdd8147..a3eb97d 100644 --- a/src/modules/pubsub/commands.go +++ b/src/modules/pubsub/commands.go @@ -1 +1,134 @@ package pubsub + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/kelvinmwinuka/memstore/src/utils" + "net" + "strings" +) + +const ( + OK = "+OK\r\n\n" +) + +type Plugin struct { + name string + commands []utils.Command + description string + pubSub *PubSub +} + +var PubSubModule Plugin + +func (p Plugin) Name() string { + return p.name +} + +func (p Plugin) Commands() ([]byte, error) { + return json.Marshal(p.commands) +} + +func (p Plugin) GetCommands() []utils.Command { + fmt.Println(p) + return p.commands +} + +func (p Plugin) Description() string { + return p.description +} + +func (p Plugin) HandleCommandWithConnection(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { + switch strings.ToLower(cmd[0]) { + default: + return nil, errors.New("command unknown") + case "subscribe": + return handleSubscribe(ctx, p, cmd, server, conn) + case "unsubscribe": + return handleUnsubscribe(ctx, p, cmd, server, conn) + } +} + +func (p Plugin) HandleCommand(ctx context.Context, cmd []string, server utils.Server) ([]byte, error) { + switch strings.ToLower(cmd[0]) { + default: + return nil, errors.New("command unknown") + case "publish": + return handlePublish(ctx, p, cmd, server) + } +} + +func handleSubscribe(ctx context.Context, p Plugin, cmd []string, s utils.Server, conn *net.Conn) ([]byte, error) { + switch len(cmd) { + case 1: + // Subscribe to all channels + p.pubSub.Subscribe(ctx, conn, nil, nil) + case 2: + // Subscribe to specified channel + p.pubSub.Subscribe(ctx, conn, cmd[1], nil) + case 3: + // Subscribe to specified channel and specified consumer group + fmt.Println(p.pubSub) + p.pubSub.Subscribe(ctx, conn, cmd[1], cmd[2]) + default: + return nil, errors.New("wrong number of arguments") + } + return []byte("+SUBSCRIBE_OK\r\n\n"), nil +} + +func handleUnsubscribe(ctx context.Context, p Plugin, cmd []string, s utils.Server, conn *net.Conn) ([]byte, error) { + switch len(cmd) { + case 1: + p.pubSub.Unsubscribe(ctx, conn, nil) + case 2: + p.pubSub.Unsubscribe(ctx, conn, cmd[1]) + default: + return nil, errors.New("wrong number of arguments") + } + return []byte("+OK\r\n\n"), nil +} + +func handlePublish(ctx context.Context, p Plugin, cmd []string, s utils.Server) ([]byte, error) { + if len(cmd) == 3 { + p.pubSub.Publish(ctx, cmd[2], cmd[1]) + } else if len(cmd) == 2 { + p.pubSub.Publish(ctx, cmd[1], nil) + } else { + return nil, errors.New("wrong number of arguments") + } + return []byte("+PUBLISH_OK\r\n\n"), nil +} + +func NewModule(pubsub *PubSub) Plugin { + PubSubModule := Plugin{ + pubSub: pubsub, + name: "PubSubCommands", + commands: []utils.Command{ + { + Command: "publish", + Categories: []string{}, + Description: "", + HandleWithConnection: false, + Sync: true, + }, + { + Command: "subscribe", + Categories: []string{}, + Description: "", + HandleWithConnection: true, + Sync: false, + }, + { + Command: "unsubscribe", + Categories: []string{}, + Description: "", + HandleWithConnection: true, + Sync: false, + }, + }, + description: "Handle PUBSUB functionality", + } + return PubSubModule +} diff --git a/src/plugins/pubsub/pubsub.go b/src/modules/pubsub/pubsub.go similarity index 90% rename from src/plugins/pubsub/pubsub.go rename to src/modules/pubsub/pubsub.go index 007501e..a7ea523 100644 --- a/src/plugins/pubsub/pubsub.go +++ b/src/modules/pubsub/pubsub.go @@ -1,10 +1,11 @@ -package main +package pubsub import ( "bufio" "container/ring" "context" "fmt" + "github.com/kelvinmwinuka/memstore/src/utils" "net" "strings" "sync" @@ -45,7 +46,7 @@ func (cg *ConsumerGroup) SendMessage(message string) { // Wait for an ACK // If no ACK is received within a time limit, remove this connection from subscribers and retry (*conn).SetReadDeadline(time.Now().Add(250 * time.Millisecond)) - if msg, err := ReadMessage(rw); err != nil { + if msg, err := utils.ReadMessage(rw); err != nil { // Remove the connection from subscribers list cg.Unsubscribe(conn) // Reset the deadline @@ -160,7 +161,7 @@ func (ch *Channel) Start() { (*conn).SetReadDeadline(time.Time{}) }() - if msg, err := ReadMessage(rw); err != nil { + if msg, err := utils.ReadMessage(rw); err != nil { ch.Unsubscribe(conn) } else { if strings.TrimSpace(msg) != "+ACK" { @@ -176,14 +177,14 @@ func (ch *Channel) Start() { } func (ch *Channel) Subscribe(conn *net.Conn, consumerGroupName interface{}) { - if consumerGroupName == nil && !Contains[*net.Conn](ch.subscribers, conn) { + if consumerGroupName == nil && !utils.Contains[*net.Conn](ch.subscribers, conn) { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() ch.subscribers = append(ch.subscribers, conn) return } - groups := Filter[*ConsumerGroup](ch.consumerGroups, func(group *ConsumerGroup) bool { + groups := utils.Filter[*ConsumerGroup](ch.consumerGroups, func(group *ConsumerGroup) bool { return group.name == consumerGroupName.(string) }) @@ -206,7 +207,7 @@ func (ch *Channel) Unsubscribe(conn *net.Conn) { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() - ch.subscribers = Filter[*net.Conn](ch.subscribers, func(c *net.Conn) bool { + ch.subscribers = utils.Filter[*net.Conn](ch.subscribers, func(c *net.Conn) bool { return c != conn }) @@ -229,7 +230,9 @@ type PubSub struct { func NewPubSub() *PubSub { return &PubSub{ - channels: []*Channel{}, + channels: []*Channel{ + NewChannel("chan"), + }, } } @@ -245,7 +248,7 @@ func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channelName int // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it - channels := Filter[*Channel](ps.channels, func(c *Channel) bool { + channels := utils.Filter[*Channel](ps.channels, func(c *Channel) bool { return c.name == channelName }) @@ -272,7 +275,7 @@ func (ps *PubSub) Unsubscribe(ctx context.Context, conn *net.Conn, channelName i return } - channels := Filter[*Channel](ps.channels, func(c *Channel) bool { + channels := utils.Filter[*Channel](ps.channels, func(c *Channel) bool { return c.name == channelName }) @@ -289,7 +292,7 @@ func (ps *PubSub) Publish(ctx context.Context, message string, channelName inter return } - channels := Filter[*Channel](ps.channels, func(c *Channel) bool { + channels := utils.Filter[*Channel](ps.channels, func(c *Channel) bool { return c.name == channelName }) diff --git a/src/modules/string/commands.go b/src/modules/string/commands.go index 9336b6f..0ab3905 100644 --- a/src/modules/string/commands.go +++ b/src/modules/string/commands.go @@ -217,7 +217,6 @@ func NewModule() Plugin { Description: "(SETRANGE key offset value) Overwrites part of a string value with another by offset. Creates the key if it doesn't exist.", HandleWithConnection: false, Sync: true, - Plugin: StringModule, }, { Command: "strlen", @@ -225,7 +224,6 @@ func NewModule() Plugin { Description: "(STRLEN key) Returns length of the key's value if it's a string.", HandleWithConnection: false, Sync: false, - Plugin: StringModule, }, { Command: "substr", @@ -233,7 +231,6 @@ func NewModule() Plugin { Description: "(SUBSTR key start end) Returns a substring from the string value.", HandleWithConnection: false, Sync: false, - Plugin: StringModule, }, { Command: "getrange", @@ -241,7 +238,6 @@ func NewModule() Plugin { Description: "(GETRANGE key start end) Returns a substring from the string value.", HandleWithConnection: false, Sync: false, - Plugin: StringModule, }, }, description: "Handle basic STRING commands", diff --git a/src/plugins/pubsub/command.go b/src/plugins/pubsub/command.go deleted file mode 100644 index e71662c..0000000 --- a/src/plugins/pubsub/command.go +++ /dev/null @@ -1,139 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "errors" - "net" - "strings" -) - -const ( - OK = "+OK\r\n\n" -) - -type Server interface { - KeyLock(ctx context.Context, key string) (bool, error) - KeyUnlock(key string) - KeyRLock(ctx context.Context, key string) (bool, error) - KeyRUnlock(key string) - KeyExists(key string) bool - CreateKeyAndLock(ctx context.Context, key string) (bool, error) - GetValue(key string) interface{} - SetValue(ctx context.Context, key string, value interface{}) -} - -type Command struct { - Command string `json:"Command"` - Categories []string `json:"Categories"` - Description string `json:"Description"` - HandleWithConnection bool `json:"HandleWithConnection"` - Sync bool `json:"Sync"` -} - -type plugin struct { - name string - commands []Command - description string - pubSub *PubSub -} - -var Plugin plugin - -func (p *plugin) Name() string { - return p.name -} - -func (p *plugin) Commands() ([]byte, error) { - return json.Marshal(p.commands) -} - -func (p *plugin) Description() string { - return p.description -} - -func (p *plugin) HandleCommandWithConnection(ctx context.Context, cmd []string, server interface{}, conn *net.Conn) ([]byte, error) { - switch strings.ToLower(cmd[0]) { - default: - return nil, errors.New("command unknown") - case "subscribe": - return handleSubscribe(ctx, p, cmd, server.(Server), conn) - case "unsubscribe": - return handleUnsubscribe(ctx, p, cmd, server.(Server), conn) - } -} - -func (p *plugin) HandleCommand(ctx context.Context, cmd []string, server interface{}) ([]byte, error) { - switch strings.ToLower(cmd[0]) { - default: - return nil, errors.New("command unknown") - case "publish": - return handlePublish(ctx, p, cmd, server.(Server)) - } -} - -func handleSubscribe(ctx context.Context, p *plugin, cmd []string, s Server, conn *net.Conn) ([]byte, error) { - switch len(cmd) { - case 1: - p.pubSub.Subscribe(ctx, conn, nil, nil) - case 2: - p.pubSub.Subscribe(ctx, conn, cmd[1], nil) - case 3: - p.pubSub.Subscribe(ctx, conn, cmd[1], cmd[2]) - default: - return nil, errors.New("wrong number of arguments") - } - return []byte("+SUBSCRIBE_OK\r\n\n"), nil -} - -func handleUnsubscribe(ctx context.Context, p *plugin, cmd []string, s Server, conn *net.Conn) ([]byte, error) { - switch len(cmd) { - case 1: - p.pubSub.Unsubscribe(ctx, conn, nil) - case 2: - p.pubSub.Unsubscribe(ctx, conn, cmd[1]) - default: - return nil, errors.New("wrong number of arguments") - } - return []byte("+OK\r\n\n"), nil -} - -func handlePublish(ctx context.Context, p *plugin, cmd []string, s Server) ([]byte, error) { - if len(cmd) == 3 { - p.pubSub.Publish(ctx, cmd[2], cmd[1]) - } else if len(cmd) == 2 { - p.pubSub.Publish(ctx, cmd[1], nil) - } else { - return nil, errors.New("wrong number of arguments") - } - return []byte("+PUBLISH_OK\r\n\n"), nil -} - -func init() { - Plugin.name = "PubSubCommands" - Plugin.commands = []Command{ - { - Command: "publish", - Categories: []string{}, - Description: "", - HandleWithConnection: false, - Sync: true, - }, - { - Command: "subscribe", - Categories: []string{}, - Description: "", - HandleWithConnection: true, - Sync: false, - }, - { - Command: "unsubscribe", - Categories: []string{}, - Description: "", - HandleWithConnection: true, - Sync: false, - }, - } - Plugin.description = "Handle PUBSUB functionality." - Plugin.pubSub = NewPubSub() -} diff --git a/src/plugins/pubsub/utils.go b/src/plugins/pubsub/utils.go deleted file mode 100644 index f324892..0000000 --- a/src/plugins/pubsub/utils.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "fmt" -) - -func ReadMessage(r *bufio.ReadWriter) (message string, err error) { - var line [][]byte - - for { - b, _, err := r.ReadLine() - - if err != nil { - return "", err - } - - if bytes.Equal(b, []byte("")) { - // End of message - break - } - - line = append(line, b) - } - - return fmt.Sprintf("%s\r\n", string(bytes.Join(line, []byte("\r\n")))), nil -} - -func Contains[T comparable](arr []T, elem T) bool { - for _, v := range arr { - if v == elem { - return true - } - } - return false -} - -func Filter[T comparable](arr []T, test func(elem T) bool) (res []T) { - for _, e := range arr { - if test(e) { - res = append(res, e) - } - } - return -}