Removed discord label from README.md. Updated SUBSCRIBE command handler to send psubscribe event to connection when subscribing to a pattern.

This commit is contained in:
Kelvin Mwinuka
2024-03-20 00:18:38 +08:00
parent 59a03aec2c
commit 28f659033e
4 changed files with 13 additions and 10 deletions

View File

@@ -2,10 +2,7 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/echovault/echovault)](https://goreportcard.com/report/github.com/echovault/echovault) [![Go Report Card](https://goreportcard.com/badge/github.com/echovault/echovault)](https://goreportcard.com/report/github.com/echovault/echovault)
[![Go Coverage](https://github.com/EchoVault/EchoVault/wiki/coverage.svg)](https://raw.githack.com/wiki/EchoVault/EchoVault/coverage.html) [![Go Coverage](https://github.com/EchoVault/EchoVault/wiki/coverage.svg)](https://raw.githack.com/wiki/EchoVault/EchoVault/coverage.html)
[![GitHub Release](https://img.shields.io/github/v/release/EchoVault/EchoVault)]() [![GitHub Release](https://img.shields.io/github/v/release/EchoVault/EchoVault)]()
<br/>
[![License: GPL v3](https://img.shields.io/badge/License-GPL_v3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0.en.html) [![License: GPL v3](https://img.shields.io/badge/License-GPL_v3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0.en.html)
<br/>
[![Discord](https://img.shields.io/discord/1211815152291414037?style=flat&logo=discord&link=https%3A%2F%2Fdiscord.gg%2Fvt45CKfF)](https://discord.gg/vt45CKfF)
<hr/> <hr/>

View File

@@ -53,7 +53,7 @@ func handlePublish(ctx context.Context, cmd []string, server utils.Server, conn
return []byte(utils.OkResponse), nil return []byte(utils.OkResponse), nil
} }
func handlePubSubChannels(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { func handlePubSubChannels(_ context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
if len(cmd) > 3 { if len(cmd) > 3 {
return nil, errors.New(utils.WrongArgsResponse) return nil, errors.New(utils.WrongArgsResponse)
} }
@@ -68,7 +68,7 @@ func handlePubSubChannels(ctx context.Context, cmd []string, server utils.Server
pattern = cmd[2] pattern = cmd[2]
} }
return pubsub.Channels(ctx, pattern), nil return pubsub.Channels(pattern), nil
} }
func handlePubSubNumPat(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { func handlePubSubNumPat(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {

View File

@@ -335,7 +335,7 @@ func Test_HandlePublish(t *testing.T) {
}() }()
// Verify all the responses for each pattern subscription // Verify all the responses for each pattern subscription
for i := 0; i < len(patterns); i++ { for i := 0; i < len(patterns); i++ {
verifyEvent(c, r, []string{"subscribe", patterns[i], fmt.Sprintf("%d", i+1)}) verifyEvent(c, r, []string{"psubscribe", patterns[i], fmt.Sprintf("%d", i+1)})
} }
} }

View File

@@ -26,6 +26,12 @@ func NewPubSub() *PubSub {
func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []string, withPattern bool) { func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []string, withPattern bool) {
r := resp.NewConn(*conn) r := resp.NewConn(*conn)
action := "subscribe"
if withPattern {
action = "psubscribe"
}
for i := 0; i < len(channels); i++ { for i := 0; i < len(channels); i++ {
// Check if channel with given name exists // Check if channel with given name exists
// If it does, subscribe the connection to the channel // If it does, subscribe the connection to the channel
@@ -45,7 +51,7 @@ func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []stri
newChan.Start() newChan.Start()
if newChan.Subscribe(conn) { if newChan.Subscribe(conn) {
if err := r.WriteArray([]resp.Value{ if err := r.WriteArray([]resp.Value{
resp.StringValue("subscribe"), resp.StringValue(action),
resp.StringValue(newChan.name), resp.StringValue(newChan.name),
resp.IntegerValue(i + 1), resp.IntegerValue(i + 1),
}); err != nil { }); err != nil {
@@ -57,7 +63,7 @@ func (ps *PubSub) Subscribe(ctx context.Context, conn *net.Conn, channels []stri
// Subscribe to existing channel // Subscribe to existing channel
if ps.channels[channelIdx].Subscribe(conn) { if ps.channels[channelIdx].Subscribe(conn) {
if err := r.WriteArray([]resp.Value{ if err := r.WriteArray([]resp.Value{
resp.StringValue("subscribe"), resp.StringValue(action),
resp.StringValue(ps.channels[channelIdx].name), resp.StringValue(ps.channels[channelIdx].name),
resp.IntegerValue(i + 1), resp.IntegerValue(i + 1),
}); err != nil { }); err != nil {
@@ -172,7 +178,7 @@ func (ps *PubSub) Publish(ctx context.Context, message string, channelName strin
} }
} }
func (ps *PubSub) Channels(ctx context.Context, pattern string) []byte { func (ps *PubSub) Channels(pattern string) []byte {
var count int var count int
var res string var res string
@@ -183,7 +189,6 @@ func (ps *PubSub) Channels(ctx context.Context, pattern string) []byte {
count += 1 count += 1
} }
} }
res = fmt.Sprintf("*%d\r\n%s", count, res) res = fmt.Sprintf("*%d\r\n%s", count, res)
return []byte(res) return []byte(res)
} }
@@ -197,6 +202,7 @@ func (ps *PubSub) Channels(ctx context.Context, pattern string) []byte {
count += 1 count += 1
continue continue
} }
// Channel is not a pattern channel. Check if the channel name matches the provided glob pattern
if g.Match(channel.name) && channel.IsActive() { if g.Match(channel.name) && channel.IsActive() {
res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name)
count += 1 count += 1