From 28f659033e695ba0ffb68ce58dcf182d0182eeea Mon Sep 17 00:00:00 2001 From: Kelvin Mwinuka Date: Wed, 20 Mar 2024 00:18:38 +0800 Subject: [PATCH] Removed discord label from README.md. Updated SUBSCRIBE command handler to send psubscribe event to connection when subscribing to a pattern. --- README.md | 3 --- src/modules/pubsub/commands.go | 4 ++-- src/modules/pubsub/commands_test.go | 2 +- src/modules/pubsub/pubsub.go | 14 ++++++++++---- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 336a0da..8306b1c 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,7 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/echovault/echovault)](https://goreportcard.com/report/github.com/echovault/echovault) [![Go Coverage](https://github.com/EchoVault/EchoVault/wiki/coverage.svg)](https://raw.githack.com/wiki/EchoVault/EchoVault/coverage.html) [![GitHub Release](https://img.shields.io/github/v/release/EchoVault/EchoVault)]() -
[![License: GPL v3](https://img.shields.io/badge/License-GPL_v3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0.en.html) -
-[![Discord](https://img.shields.io/discord/1211815152291414037?style=flat&logo=discord&link=https%3A%2F%2Fdiscord.gg%2Fvt45CKfF)](https://discord.gg/vt45CKfF)
diff --git a/src/modules/pubsub/commands.go b/src/modules/pubsub/commands.go index 05f21ae..b0a4639 100644 --- a/src/modules/pubsub/commands.go +++ b/src/modules/pubsub/commands.go @@ -53,7 +53,7 @@ func handlePublish(ctx context.Context, cmd []string, server utils.Server, conn return []byte(utils.OkResponse), nil } -func handlePubSubChannels(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { +func handlePubSubChannels(_ context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { if len(cmd) > 3 { return nil, errors.New(utils.WrongArgsResponse) } @@ -68,7 +68,7 @@ func handlePubSubChannels(ctx context.Context, cmd []string, server utils.Server pattern = cmd[2] } - return pubsub.Channels(ctx, pattern), nil + return pubsub.Channels(pattern), nil } func handlePubSubNumPat(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { diff --git a/src/modules/pubsub/commands_test.go b/src/modules/pubsub/commands_test.go index fe5e9f3..7082eb7 100644 --- a/src/modules/pubsub/commands_test.go +++ b/src/modules/pubsub/commands_test.go @@ -335,7 +335,7 @@ func Test_HandlePublish(t *testing.T) { }() // Verify all the responses for each pattern subscription for i := 0; i < len(patterns); i++ { - verifyEvent(c, r, []string{"subscribe", patterns[i], fmt.Sprintf("%d", i+1)}) + verifyEvent(c, r, []string{"psubscribe", patterns[i], fmt.Sprintf("%d", i+1)}) } } diff --git a/src/modules/pubsub/pubsub.go b/src/modules/pubsub/pubsub.go index 9fecddc..7796c10 100644 --- a/src/modules/pubsub/pubsub.go +++ b/src/modules/pubsub/pubsub.go @@ -26,6 +26,12 @@ func NewPubSub() *PubSub { func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []string, withPattern bool) { r := resp.NewConn(*conn) + + action := "subscribe" + if withPattern { + action = "psubscribe" + } + for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel @@ -45,7 +51,7 @@ func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []stri newChan.Start() if newChan.Subscribe(conn) { if err := r.WriteArray([]resp.Value{ - resp.StringValue("subscribe"), + resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { @@ -57,7 +63,7 @@ func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []stri // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { if err := r.WriteArray([]resp.Value{ - resp.StringValue("subscribe"), + resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { @@ -172,7 +178,7 @@ func (ps *PubSub) Publish(ctx context.Context, message string, channelName strin } } -func (ps *PubSub) Channels(ctx context.Context, pattern string) []byte { +func (ps *PubSub) Channels(pattern string) []byte { var count int var res string @@ -183,7 +189,6 @@ func (ps *PubSub) Channels(ctx context.Context, pattern string) []byte { count += 1 } } - res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } @@ -197,6 +202,7 @@ func (ps *PubSub) Channels(ctx context.Context, pattern string) []byte { count += 1 continue } + // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1