Added godoc comments for api_pubsub.go functions

This commit is contained in:
Kelvin Mwinuka
2024-04-03 03:34:39 +08:00
parent 155a1b8c30
commit 62792a4033
3 changed files with 1233 additions and 1169 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -30,7 +30,17 @@ var conns map[string]connMap
type ReadPubSubMessage func() []string
func (server *EchoVault) SUBSCRIBE(name string, channels ...string) ReadPubSubMessage {
// SUBSCRIBE subscribes the caller to the list of provided channels.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `channels` - ...string - The list of channels to subscribe to.
//
// Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance.
// This function is blocking.
func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage {
// Initialize connection tracker if calling subscribe for the first time
if conns == nil {
conns = make(map[string]connMap)
@@ -39,9 +49,9 @@ func (server *EchoVault) SUBSCRIBE(name string, channels ...string) ReadPubSubMe
// If connection with this name does not exist, create new connection it
var readConn net.Conn
var writeConn net.Conn
if _, ok := conns[name]; !ok {
if _, ok := conns[tag]; !ok {
readConn, writeConn = net.Pipe()
conns[name] = connMap{
conns[tag] = connMap{
readConn: &readConn,
writeConn: &writeConn,
}
@@ -50,7 +60,7 @@ func (server *EchoVault) SUBSCRIBE(name string, channels ...string) ReadPubSubMe
// Subscribe connection to the provided channels
cmd := append([]string{"SUBSCRIBE"}, channels...)
go func() {
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[name].writeConn, false)
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[tag].writeConn, false)
}()
return func() []string {
@@ -66,20 +76,37 @@ func (server *EchoVault) SUBSCRIBE(name string, channels ...string) ReadPubSubMe
}
}
func (server *EchoVault) UNSUBSCRIBE(name string, channels ...string) {
// UNSUBSCRIBE unsubscribes the caller from the given channels.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `channels` - ...string - The list of channels to unsubscribe from.
func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) {
if conns == nil {
return
}
if _, ok := conns[name]; !ok {
if _, ok := conns[tag]; !ok {
return
}
cmd := append([]string{"UNSUBSCRIBE"}, channels...)
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[name].writeConn, false)
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[tag].writeConn, false)
}
func (server *EchoVault) PSUBSCRIBE(name string, patterns ...string) ReadPubSubMessage {
// PSUBSCRIBE subscribes the caller to the list of provided glob patterns.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `patterns` - ...string - The list of glob patterns to subscribe to.
//
// Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance.
// This function is blocking.
func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage {
// Initialize connection tracker if calling subscribe for the first time
if conns == nil {
conns = make(map[string]connMap)
@@ -88,9 +115,9 @@ func (server *EchoVault) PSUBSCRIBE(name string, patterns ...string) ReadPubSubM
// If connection with this name does not exist, create new connection it
var readConn net.Conn
var writeConn net.Conn
if _, ok := conns[name]; !ok {
if _, ok := conns[tag]; !ok {
readConn, writeConn = net.Pipe()
conns[name] = connMap{
conns[tag] = connMap{
readConn: &readConn,
writeConn: &writeConn,
}
@@ -99,7 +126,7 @@ func (server *EchoVault) PSUBSCRIBE(name string, patterns ...string) ReadPubSubM
// Subscribe connection to the provided channels
cmd := append([]string{"PSUBSCRIBE"}, patterns...)
go func() {
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[name].writeConn, false)
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[tag].writeConn, false)
}()
return func() []string {
@@ -115,19 +142,36 @@ func (server *EchoVault) PSUBSCRIBE(name string, patterns ...string) ReadPubSubM
}
}
func (server *EchoVault) PUNSUBSCRIBE(name string, patterns ...string) {
// PUNSUBSCRIBE unsubscribes the caller from the given glob patterns.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `patterns` - ...string - The list of glob patterns to unsubscribe from.
func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) {
if conns == nil {
return
}
if _, ok := conns[name]; !ok {
if _, ok := conns[tag]; !ok {
return
}
cmd := append([]string{"PUNSUBSCRIBE"}, patterns...)
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[name].writeConn, false)
_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), conns[tag].writeConn, false)
}
// PUBLISH publishes a message to the given channel.
//
// Parameters:
//
// `channel` - string - The channel to publish the message to.
//
// `message` - string - The message to publish to the specified channel.
//
// Returns: "OK" when the publish is successful. This does not indicate whether each subscriber has received the message,
// only that the message has been published.
func (server *EchoVault) PUBLISH(channel, message string) (string, error) {
b, err := server.handleCommand(server.context, internal.EncodeCommand([]string{"PUBLISH", channel, message}), nil, false)
if err != nil {
@@ -136,6 +180,13 @@ func (server *EchoVault) PUBLISH(channel, message string) (string, error) {
return internal.ParseStringResponse(b)
}
// PUBSUB_CHANNELS returns the list of channels & patterns that match the glob pattern provided.
//
// Parameters:
//
// `pattern` - string - The glob pattern used to match the channel names.
//
// Returns: A string slice of all the active channels and patterns (i.e. channels and patterns that have 1 or more subscribers).
func (server *EchoVault) PUBSUB_CHANNELS(pattern string) ([]string, error) {
cmd := []string{"PUBSUB", "CHANNELS"}
if pattern != "" {
@@ -148,6 +199,9 @@ func (server *EchoVault) PUBSUB_CHANNELS(pattern string) ([]string, error) {
return internal.ParseStringArrayResponse(b)
}
// PUBSUB_NUMPAT returns the list of active patterns.
//
// Returns: An integer representing the number of all the active patterns (i.e. patterns that have 1 or more subscribers).
func (server *EchoVault) PUBSUB_NUMPAT() (int, error) {
b, err := server.handleCommand(server.context, internal.EncodeCommand([]string{"PUBSUB", "NUMPAT"}), nil, false)
if err != nil {
@@ -156,6 +210,13 @@ func (server *EchoVault) PUBSUB_NUMPAT() (int, error) {
return internal.ParseIntegerResponse(b)
}
// PUBSUB_NUMSUB returns the number of subscribers for each of the specified channels.
//
// Parameters:
//
// `channels` - ...string - The list of channels whose number of subscribers is to be checked.
//
// Returns: A map of map[string]int where the key is the channel name and the value is the number of subscribers.
func (server *EchoVault) PUBSUB_NUMSUB(channels ...string) (map[string]int, error) {
cmd := append([]string{"PUBSUB", "NUMSUB"}, channels...)

View File

@@ -434,7 +434,7 @@ func TestEchoVault_SINTERCARD(t *testing.T) {
name string
presetValues map[string]interface{}
keys []string
limit int
limit uint
want int
wantErr bool
}{
@@ -918,7 +918,7 @@ func TestEchoVault_SPOP(t *testing.T) {
name string
presetValue interface{}
key string
count int
count uint
want []string
wantErr bool
}{