mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-05 16:57:06 +08:00
159 lines
3.9 KiB
Go
159 lines
3.9 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"github.com/hdt3213/godis/datastruct/list"
|
|
"github.com/hdt3213/godis/interface/redis"
|
|
"github.com/hdt3213/godis/lib/utils"
|
|
"github.com/hdt3213/godis/redis/protocol"
|
|
"strconv"
|
|
)
|
|
|
|
var (
|
|
_subscribe = "subscribe"
|
|
_unsubscribe = "unsubscribe"
|
|
messageBytes = []byte("message")
|
|
unSubscribeNothing = []byte("*3\r\n$11\r\nunsubscribe\r\n$-1\n:0\r\n")
|
|
)
|
|
|
|
func makeMsg(t string, channel string, code int64) []byte {
|
|
return []byte("*3\r\n$" + strconv.FormatInt(int64(len(t)), 10) + protocol.CRLF + t + protocol.CRLF +
|
|
"$" + strconv.FormatInt(int64(len(channel)), 10) + protocol.CRLF + channel + protocol.CRLF +
|
|
":" + strconv.FormatInt(code, 10) + protocol.CRLF)
|
|
}
|
|
|
|
/*
|
|
* invoker should lock channel
|
|
* return: is new subscribed
|
|
*/
|
|
func subscribe0(hub *Hub, channel string, client redis.Connection) bool {
|
|
client.Subscribe(channel)
|
|
|
|
// add into hub.subs
|
|
raw, ok := hub.subs.Get(channel)
|
|
var subscribers *list.LinkedList
|
|
if ok {
|
|
subscribers, _ = raw.(*list.LinkedList)
|
|
} else {
|
|
subscribers = list.Make()
|
|
hub.subs.Put(channel, subscribers)
|
|
}
|
|
if subscribers.Contains(func(a interface{}) bool {
|
|
return a == client
|
|
}) {
|
|
return false
|
|
}
|
|
subscribers.Add(client)
|
|
return true
|
|
}
|
|
|
|
/*
|
|
* invoker should lock channel
|
|
* return: is actually un-subscribe
|
|
*/
|
|
func unsubscribe0(hub *Hub, channel string, client redis.Connection) bool {
|
|
client.UnSubscribe(channel)
|
|
|
|
// remove from hub.subs
|
|
raw, ok := hub.subs.Get(channel)
|
|
if ok {
|
|
subscribers, _ := raw.(*list.LinkedList)
|
|
subscribers.RemoveAllByVal(func(a interface{}) bool {
|
|
return utils.Equals(a, client)
|
|
})
|
|
|
|
if subscribers.Len() == 0 {
|
|
// clean
|
|
hub.subs.Remove(channel)
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Subscribe puts the given connection into the given channel
|
|
func Subscribe(hub *Hub, c redis.Connection, args [][]byte) redis.Reply {
|
|
channels := make([]string, len(args))
|
|
for i, b := range args {
|
|
channels[i] = string(b)
|
|
}
|
|
|
|
hub.subsLocker.Locks(channels...)
|
|
defer hub.subsLocker.UnLocks(channels...)
|
|
|
|
for _, channel := range channels {
|
|
if subscribe0(hub, channel, c) {
|
|
_ = c.Write(makeMsg(_subscribe, channel, int64(c.SubsCount())))
|
|
}
|
|
}
|
|
return &protocol.NoReply{}
|
|
}
|
|
|
|
// UnsubscribeAll removes the given connection from all subscribing channel
|
|
func UnsubscribeAll(hub *Hub, c redis.Connection) {
|
|
channels := c.GetChannels()
|
|
|
|
hub.subsLocker.Locks(channels...)
|
|
defer hub.subsLocker.UnLocks(channels...)
|
|
|
|
for _, channel := range channels {
|
|
unsubscribe0(hub, channel, c)
|
|
}
|
|
|
|
}
|
|
|
|
// UnSubscribe removes the given connection from the given channel
|
|
func UnSubscribe(db *Hub, c redis.Connection, args [][]byte) redis.Reply {
|
|
var channels []string
|
|
if len(args) > 0 {
|
|
channels = make([]string, len(args))
|
|
for i, b := range args {
|
|
channels[i] = string(b)
|
|
}
|
|
} else {
|
|
channels = c.GetChannels()
|
|
}
|
|
|
|
db.subsLocker.Locks(channels...)
|
|
defer db.subsLocker.UnLocks(channels...)
|
|
|
|
if len(channels) == 0 {
|
|
_ = c.Write(unSubscribeNothing)
|
|
return &protocol.NoReply{}
|
|
}
|
|
|
|
for _, channel := range channels {
|
|
if unsubscribe0(db, channel, c) {
|
|
_ = c.Write(makeMsg(_unsubscribe, channel, int64(c.SubsCount())))
|
|
}
|
|
}
|
|
return &protocol.NoReply{}
|
|
}
|
|
|
|
// Publish send msg to all subscribing client
|
|
func Publish(hub *Hub, args [][]byte) redis.Reply {
|
|
if len(args) != 2 {
|
|
return &protocol.ArgNumErrReply{Cmd: "publish"}
|
|
}
|
|
channel := string(args[0])
|
|
message := args[1]
|
|
|
|
hub.subsLocker.Lock(channel)
|
|
defer hub.subsLocker.UnLock(channel)
|
|
|
|
raw, ok := hub.subs.Get(channel)
|
|
if !ok {
|
|
return protocol.MakeIntReply(0)
|
|
}
|
|
subscribers, _ := raw.(*list.LinkedList)
|
|
subscribers.ForEach(func(i int, c interface{}) bool {
|
|
client, _ := c.(redis.Connection)
|
|
replyArgs := make([][]byte, 3)
|
|
replyArgs[0] = messageBytes
|
|
replyArgs[1] = []byte(channel)
|
|
replyArgs[2] = message
|
|
_ = client.Write(protocol.MakeMultiBulkReply(replyArgs).ToBytes())
|
|
return true
|
|
})
|
|
return protocol.MakeIntReply(int64(subscribers.Len()))
|
|
}
|