Deleted plugin folder.

Removed nested plugin in command list for modules.
Moved PubSub feature from plugin folder to modules folder.
This commit is contained in:
Kelvin Clement Mwinuka
2023-12-13 01:56:55 +08:00
parent f82508e964
commit e7c7cf1553
9 changed files with 155 additions and 220 deletions

View File

@@ -1,9 +1,5 @@
build-plugins:
CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -buildmode=plugin -o $(DEST)/command_pubsub.so ./src/plugins/pubsub/*.go
build-server: build-server:
CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./src/*.go CGO_ENABLED=$(CGO_ENABLED) CC=$(CC) GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(DEST)/server ./src/*.go
build: build:
env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64/plugins make build-plugins
env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64 make build-server env CGO_ENABLED=1 CC=x86_64-linux-musl-gcc GOOS=linux GOARCH=amd64 DEST=bin/linux/x86_64 make build-server

View File

@@ -10,6 +10,7 @@ import (
"github.com/kelvinmwinuka/memstore/src/modules/get" "github.com/kelvinmwinuka/memstore/src/modules/get"
"github.com/kelvinmwinuka/memstore/src/modules/list" "github.com/kelvinmwinuka/memstore/src/modules/list"
"github.com/kelvinmwinuka/memstore/src/modules/ping" "github.com/kelvinmwinuka/memstore/src/modules/ping"
"github.com/kelvinmwinuka/memstore/src/modules/pubsub"
"github.com/kelvinmwinuka/memstore/src/modules/set" "github.com/kelvinmwinuka/memstore/src/modules/set"
str "github.com/kelvinmwinuka/memstore/src/modules/string" str "github.com/kelvinmwinuka/memstore/src/modules/string"
"io" "io"
@@ -49,6 +50,7 @@ type Server struct {
cancelCh *chan os.Signal cancelCh *chan os.Signal
ACL *acl.ACL ACL *acl.ACL
PubSub *pubsub.PubSub
} }
func (server *Server) KeyLock(ctx context.Context, key string) (bool, error) { func (server *Server) KeyLock(ctx context.Context, key string) (bool, error) {
@@ -305,8 +307,9 @@ func (server *Server) StartHTTP(ctx context.Context) {
} }
func (server *Server) LoadModules(ctx context.Context) { func (server *Server) LoadModules(ctx context.Context) {
server.commands = append(server.commands, ping.NewModule().GetCommands()...)
server.commands = append(server.commands, acl.NewModule(server.ACL).GetCommands()...) server.commands = append(server.commands, acl.NewModule(server.ACL).GetCommands()...)
server.commands = append(server.commands, pubsub.NewModule(server.PubSub).GetCommands()...)
server.commands = append(server.commands, ping.NewModule().GetCommands()...)
server.commands = append(server.commands, set.NewModule().GetCommands()...) server.commands = append(server.commands, set.NewModule().GetCommands()...)
server.commands = append(server.commands, str.NewModule().GetCommands()...) server.commands = append(server.commands, str.NewModule().GetCommands()...)
server.commands = append(server.commands, get.NewModule().GetCommands()...) server.commands = append(server.commands, get.NewModule().GetCommands()...)
@@ -380,6 +383,7 @@ func main() {
numOfNodes: 0, numOfNodes: 0,
ACL: acl.NewACL(config), ACL: acl.NewACL(config),
PubSub: pubsub.NewPubSub(),
cancelCh: &cancelCh, cancelCh: &cancelCh,
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"github.com/kelvinmwinuka/memstore/src/utils" "github.com/kelvinmwinuka/memstore/src/utils"
"net" "net"
"strings" "strings"
@@ -119,6 +120,7 @@ func (p Plugin) handleLoad(ctx context.Context, cmd []string, server utils.Serve
} }
func (p Plugin) handleSave(ctx context.Context, cmd []string, server utils.Server) ([]byte, error) { func (p Plugin) handleSave(ctx context.Context, cmd []string, server utils.Server) ([]byte, error) {
fmt.Println(p.acl)
return nil, errors.New("ACL SAVE not implemented") return nil, errors.New("ACL SAVE not implemented")
} }
@@ -133,7 +135,6 @@ func NewModule(acl *ACL) Plugin {
Description: "List all the categories and commands inside a category", Description: "List all the categories and commands inside a category",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: ACLPlugin,
}, },
{ {
Command: "cat", Command: "cat",
@@ -141,7 +142,6 @@ func NewModule(acl *ACL) Plugin {
Description: "List all the categories and commands inside a category", Description: "List all the categories and commands inside a category",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: ACLPlugin,
}, },
{ {
Command: "auth", Command: "auth",
@@ -149,7 +149,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Authenticates the connection", Description: "Authenticates the connection",
HandleWithConnection: true, HandleWithConnection: true,
Sync: false, Sync: false,
Plugin: ACLPlugin,
}, },
{ {
Command: "users", Command: "users",
@@ -157,7 +156,6 @@ func NewModule(acl *ACL) Plugin {
Description: "List all ACL users", Description: "List all ACL users",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: ACLPlugin,
}, },
{ {
Command: "setuser", Command: "setuser",
@@ -165,7 +163,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Configure a new or existing user", Description: "Configure a new or existing user",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
{ {
Command: "getuser", Command: "getuser",
@@ -173,7 +170,6 @@ func NewModule(acl *ACL) Plugin {
Description: "List the ACL rules of a user", Description: "List the ACL rules of a user",
HandleWithConnection: true, HandleWithConnection: true,
Sync: false, Sync: false,
Plugin: ACLPlugin,
}, },
{ {
Command: "deluser", Command: "deluser",
@@ -181,7 +177,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Deletes users and terminates their connections", Description: "Deletes users and terminates their connections",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
{ {
Command: "whoami", Command: "whoami",
@@ -189,7 +184,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Returns the authenticated user of the current connection", Description: "Returns the authenticated user of the current connection",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
{ {
@@ -198,7 +192,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Generates a password that can be used to identify a user", Description: "Generates a password that can be used to identify a user",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
{ {
Command: "list", Command: "list",
@@ -206,7 +199,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Dumps effective acl rules in acl config file format", Description: "Dumps effective acl rules in acl config file format",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
{ {
Command: "load", Command: "load",
@@ -214,7 +206,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Reloads the rules from the configured ACL config file", Description: "Reloads the rules from the configured ACL config file",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
{ {
Command: "save", Command: "save",
@@ -222,7 +213,6 @@ func NewModule(acl *ACL) Plugin {
Description: "Saves the effective ACL rules the configured ACL config file", Description: "Saves the effective ACL rules the configured ACL config file",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: ACLPlugin,
}, },
}, },
description: "Internal plugin to handle ACL commands", description: "Internal plugin to handle ACL commands",

View File

@@ -73,7 +73,6 @@ func NewModule() Plugin {
Description: "", Description: "",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: PingModule,
}, },
{ {
Command: "ack", Command: "ack",
@@ -81,7 +80,6 @@ func NewModule() Plugin {
Description: "", Description: "",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: PingModule,
}, },
}, },
description: "Handle PING command", description: "Handle PING command",

View File

@@ -1 +1,134 @@
package pubsub package pubsub
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/kelvinmwinuka/memstore/src/utils"
"net"
"strings"
)
const (
OK = "+OK\r\n\n"
)
type Plugin struct {
name string
commands []utils.Command
description string
pubSub *PubSub
}
var PubSubModule Plugin
func (p Plugin) Name() string {
return p.name
}
func (p Plugin) Commands() ([]byte, error) {
return json.Marshal(p.commands)
}
func (p Plugin) GetCommands() []utils.Command {
fmt.Println(p)
return p.commands
}
func (p Plugin) Description() string {
return p.description
}
func (p Plugin) HandleCommandWithConnection(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
switch strings.ToLower(cmd[0]) {
default:
return nil, errors.New("command unknown")
case "subscribe":
return handleSubscribe(ctx, p, cmd, server, conn)
case "unsubscribe":
return handleUnsubscribe(ctx, p, cmd, server, conn)
}
}
func (p Plugin) HandleCommand(ctx context.Context, cmd []string, server utils.Server) ([]byte, error) {
switch strings.ToLower(cmd[0]) {
default:
return nil, errors.New("command unknown")
case "publish":
return handlePublish(ctx, p, cmd, server)
}
}
func handleSubscribe(ctx context.Context, p Plugin, cmd []string, s utils.Server, conn *net.Conn) ([]byte, error) {
switch len(cmd) {
case 1:
// Subscribe to all channels
p.pubSub.Subscribe(ctx, conn, nil, nil)
case 2:
// Subscribe to specified channel
p.pubSub.Subscribe(ctx, conn, cmd[1], nil)
case 3:
// Subscribe to specified channel and specified consumer group
fmt.Println(p.pubSub)
p.pubSub.Subscribe(ctx, conn, cmd[1], cmd[2])
default:
return nil, errors.New("wrong number of arguments")
}
return []byte("+SUBSCRIBE_OK\r\n\n"), nil
}
func handleUnsubscribe(ctx context.Context, p Plugin, cmd []string, s utils.Server, conn *net.Conn) ([]byte, error) {
switch len(cmd) {
case 1:
p.pubSub.Unsubscribe(ctx, conn, nil)
case 2:
p.pubSub.Unsubscribe(ctx, conn, cmd[1])
default:
return nil, errors.New("wrong number of arguments")
}
return []byte("+OK\r\n\n"), nil
}
func handlePublish(ctx context.Context, p Plugin, cmd []string, s utils.Server) ([]byte, error) {
if len(cmd) == 3 {
p.pubSub.Publish(ctx, cmd[2], cmd[1])
} else if len(cmd) == 2 {
p.pubSub.Publish(ctx, cmd[1], nil)
} else {
return nil, errors.New("wrong number of arguments")
}
return []byte("+PUBLISH_OK\r\n\n"), nil
}
func NewModule(pubsub *PubSub) Plugin {
PubSubModule := Plugin{
pubSub: pubsub,
name: "PubSubCommands",
commands: []utils.Command{
{
Command: "publish",
Categories: []string{},
Description: "",
HandleWithConnection: false,
Sync: true,
},
{
Command: "subscribe",
Categories: []string{},
Description: "",
HandleWithConnection: true,
Sync: false,
},
{
Command: "unsubscribe",
Categories: []string{},
Description: "",
HandleWithConnection: true,
Sync: false,
},
},
description: "Handle PUBSUB functionality",
}
return PubSubModule
}

View File

@@ -1,10 +1,11 @@
package main package pubsub
import ( import (
"bufio" "bufio"
"container/ring" "container/ring"
"context" "context"
"fmt" "fmt"
"github.com/kelvinmwinuka/memstore/src/utils"
"net" "net"
"strings" "strings"
"sync" "sync"
@@ -45,7 +46,7 @@ func (cg *ConsumerGroup) SendMessage(message string) {
// Wait for an ACK // Wait for an ACK
// If no ACK is received within a time limit, remove this connection from subscribers and retry // If no ACK is received within a time limit, remove this connection from subscribers and retry
(*conn).SetReadDeadline(time.Now().Add(250 * time.Millisecond)) (*conn).SetReadDeadline(time.Now().Add(250 * time.Millisecond))
if msg, err := ReadMessage(rw); err != nil { if msg, err := utils.ReadMessage(rw); err != nil {
// Remove the connection from subscribers list // Remove the connection from subscribers list
cg.Unsubscribe(conn) cg.Unsubscribe(conn)
// Reset the deadline // Reset the deadline
@@ -160,7 +161,7 @@ func (ch *Channel) Start() {
(*conn).SetReadDeadline(time.Time{}) (*conn).SetReadDeadline(time.Time{})
}() }()
if msg, err := ReadMessage(rw); err != nil { if msg, err := utils.ReadMessage(rw); err != nil {
ch.Unsubscribe(conn) ch.Unsubscribe(conn)
} else { } else {
if strings.TrimSpace(msg) != "+ACK" { if strings.TrimSpace(msg) != "+ACK" {
@@ -176,14 +177,14 @@ func (ch *Channel) Start() {
} }
func (ch *Channel) Subscribe(conn *net.Conn, consumerGroupName interface{}) { func (ch *Channel) Subscribe(conn *net.Conn, consumerGroupName interface{}) {
if consumerGroupName == nil && !Contains[*net.Conn](ch.subscribers, conn) { if consumerGroupName == nil && !utils.Contains[*net.Conn](ch.subscribers, conn) {
ch.subscribersRWMut.Lock() ch.subscribersRWMut.Lock()
defer ch.subscribersRWMut.Unlock() defer ch.subscribersRWMut.Unlock()
ch.subscribers = append(ch.subscribers, conn) ch.subscribers = append(ch.subscribers, conn)
return return
} }
groups := Filter[*ConsumerGroup](ch.consumerGroups, func(group *ConsumerGroup) bool { groups := utils.Filter[*ConsumerGroup](ch.consumerGroups, func(group *ConsumerGroup) bool {
return group.name == consumerGroupName.(string) return group.name == consumerGroupName.(string)
}) })
@@ -206,7 +207,7 @@ func (ch *Channel) Unsubscribe(conn *net.Conn) {
ch.subscribersRWMut.Lock() ch.subscribersRWMut.Lock()
defer ch.subscribersRWMut.Unlock() defer ch.subscribersRWMut.Unlock()
ch.subscribers = Filter[*net.Conn](ch.subscribers, func(c *net.Conn) bool { ch.subscribers = utils.Filter[*net.Conn](ch.subscribers, func(c *net.Conn) bool {
return c != conn return c != conn
}) })
@@ -229,7 +230,9 @@ type PubSub struct {
func NewPubSub() *PubSub { func NewPubSub() *PubSub {
return &PubSub{ return &PubSub{
channels: []*Channel{}, channels: []*Channel{
NewChannel("chan"),
},
} }
} }
@@ -245,7 +248,7 @@ func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channelName int
// Check if channel with given name exists // Check if channel with given name exists
// If it does, subscribe the connection to the channel // If it does, subscribe the connection to the channel
// If it does not, create the channel and subscribe to it // If it does not, create the channel and subscribe to it
channels := Filter[*Channel](ps.channels, func(c *Channel) bool { channels := utils.Filter[*Channel](ps.channels, func(c *Channel) bool {
return c.name == channelName return c.name == channelName
}) })
@@ -272,7 +275,7 @@ func (ps *PubSub) Unsubscribe(ctx context.Context, conn *net.Conn, channelName i
return return
} }
channels := Filter[*Channel](ps.channels, func(c *Channel) bool { channels := utils.Filter[*Channel](ps.channels, func(c *Channel) bool {
return c.name == channelName return c.name == channelName
}) })
@@ -289,7 +292,7 @@ func (ps *PubSub) Publish(ctx context.Context, message string, channelName inter
return return
} }
channels := Filter[*Channel](ps.channels, func(c *Channel) bool { channels := utils.Filter[*Channel](ps.channels, func(c *Channel) bool {
return c.name == channelName return c.name == channelName
}) })

View File

@@ -217,7 +217,6 @@ func NewModule() Plugin {
Description: "(SETRANGE key offset value) Overwrites part of a string value with another by offset. Creates the key if it doesn't exist.", Description: "(SETRANGE key offset value) Overwrites part of a string value with another by offset. Creates the key if it doesn't exist.",
HandleWithConnection: false, HandleWithConnection: false,
Sync: true, Sync: true,
Plugin: StringModule,
}, },
{ {
Command: "strlen", Command: "strlen",
@@ -225,7 +224,6 @@ func NewModule() Plugin {
Description: "(STRLEN key) Returns length of the key's value if it's a string.", Description: "(STRLEN key) Returns length of the key's value if it's a string.",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: StringModule,
}, },
{ {
Command: "substr", Command: "substr",
@@ -233,7 +231,6 @@ func NewModule() Plugin {
Description: "(SUBSTR key start end) Returns a substring from the string value.", Description: "(SUBSTR key start end) Returns a substring from the string value.",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: StringModule,
}, },
{ {
Command: "getrange", Command: "getrange",
@@ -241,7 +238,6 @@ func NewModule() Plugin {
Description: "(GETRANGE key start end) Returns a substring from the string value.", Description: "(GETRANGE key start end) Returns a substring from the string value.",
HandleWithConnection: false, HandleWithConnection: false,
Sync: false, Sync: false,
Plugin: StringModule,
}, },
}, },
description: "Handle basic STRING commands", description: "Handle basic STRING commands",

View File

@@ -1,139 +0,0 @@
package main
import (
"context"
"encoding/json"
"errors"
"net"
"strings"
)
const (
OK = "+OK\r\n\n"
)
type Server interface {
KeyLock(ctx context.Context, key string) (bool, error)
KeyUnlock(key string)
KeyRLock(ctx context.Context, key string) (bool, error)
KeyRUnlock(key string)
KeyExists(key string) bool
CreateKeyAndLock(ctx context.Context, key string) (bool, error)
GetValue(key string) interface{}
SetValue(ctx context.Context, key string, value interface{})
}
type Command struct {
Command string `json:"Command"`
Categories []string `json:"Categories"`
Description string `json:"Description"`
HandleWithConnection bool `json:"HandleWithConnection"`
Sync bool `json:"Sync"`
}
type plugin struct {
name string
commands []Command
description string
pubSub *PubSub
}
var Plugin plugin
func (p *plugin) Name() string {
return p.name
}
func (p *plugin) Commands() ([]byte, error) {
return json.Marshal(p.commands)
}
func (p *plugin) Description() string {
return p.description
}
func (p *plugin) HandleCommandWithConnection(ctx context.Context, cmd []string, server interface{}, conn *net.Conn) ([]byte, error) {
switch strings.ToLower(cmd[0]) {
default:
return nil, errors.New("command unknown")
case "subscribe":
return handleSubscribe(ctx, p, cmd, server.(Server), conn)
case "unsubscribe":
return handleUnsubscribe(ctx, p, cmd, server.(Server), conn)
}
}
func (p *plugin) HandleCommand(ctx context.Context, cmd []string, server interface{}) ([]byte, error) {
switch strings.ToLower(cmd[0]) {
default:
return nil, errors.New("command unknown")
case "publish":
return handlePublish(ctx, p, cmd, server.(Server))
}
}
func handleSubscribe(ctx context.Context, p *plugin, cmd []string, s Server, conn *net.Conn) ([]byte, error) {
switch len(cmd) {
case 1:
p.pubSub.Subscribe(ctx, conn, nil, nil)
case 2:
p.pubSub.Subscribe(ctx, conn, cmd[1], nil)
case 3:
p.pubSub.Subscribe(ctx, conn, cmd[1], cmd[2])
default:
return nil, errors.New("wrong number of arguments")
}
return []byte("+SUBSCRIBE_OK\r\n\n"), nil
}
func handleUnsubscribe(ctx context.Context, p *plugin, cmd []string, s Server, conn *net.Conn) ([]byte, error) {
switch len(cmd) {
case 1:
p.pubSub.Unsubscribe(ctx, conn, nil)
case 2:
p.pubSub.Unsubscribe(ctx, conn, cmd[1])
default:
return nil, errors.New("wrong number of arguments")
}
return []byte("+OK\r\n\n"), nil
}
func handlePublish(ctx context.Context, p *plugin, cmd []string, s Server) ([]byte, error) {
if len(cmd) == 3 {
p.pubSub.Publish(ctx, cmd[2], cmd[1])
} else if len(cmd) == 2 {
p.pubSub.Publish(ctx, cmd[1], nil)
} else {
return nil, errors.New("wrong number of arguments")
}
return []byte("+PUBLISH_OK\r\n\n"), nil
}
func init() {
Plugin.name = "PubSubCommands"
Plugin.commands = []Command{
{
Command: "publish",
Categories: []string{},
Description: "",
HandleWithConnection: false,
Sync: true,
},
{
Command: "subscribe",
Categories: []string{},
Description: "",
HandleWithConnection: true,
Sync: false,
},
{
Command: "unsubscribe",
Categories: []string{},
Description: "",
HandleWithConnection: true,
Sync: false,
},
}
Plugin.description = "Handle PUBSUB functionality."
Plugin.pubSub = NewPubSub()
}

View File

@@ -1,46 +0,0 @@
package main
import (
"bufio"
"bytes"
"fmt"
)
func ReadMessage(r *bufio.ReadWriter) (message string, err error) {
var line [][]byte
for {
b, _, err := r.ReadLine()
if err != nil {
return "", err
}
if bytes.Equal(b, []byte("")) {
// End of message
break
}
line = append(line, b)
}
return fmt.Sprintf("%s\r\n", string(bytes.Join(line, []byte("\r\n")))), nil
}
func Contains[T comparable](arr []T, elem T) bool {
for _, v := range arr {
if v == elem {
return true
}
}
return false
}
func Filter[T comparable](arr []T, test func(elem T) bool) (res []T) {
for _, e := range arr {
if test(e) {
res = append(res, e)
}
}
return
}