mirror of
https://github.com/zgwit/beeq.git
synced 2025-09-26 19:51:13 +08:00
156 lines
3.1 KiB
Go
156 lines
3.1 KiB
Go
package beeq
|
|
|
|
import (
|
|
"git.zgwit.com/iot/beeq/packet"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
type SubNode struct {
|
|
//Subscribed clients
|
|
//clientId
|
|
//clients map[string]packet.MsgQos
|
|
clients sync.Map
|
|
|
|
//Sub level
|
|
//topic->children
|
|
//children map[string]*SubNode
|
|
children sync.Map
|
|
|
|
//Multi Wildcard #
|
|
mw *SubNode
|
|
|
|
//Single Wildcard +
|
|
sw *SubNode
|
|
}
|
|
|
|
func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) {
|
|
if len(topics) == 0 {
|
|
// Publish all matched clients
|
|
sn.clients.Range(func(key, value interface{}) bool {
|
|
clientId := key.(string)
|
|
qos := value.(packet.MsgQos)
|
|
|
|
if sub, ok := subs[clientId]; ok {
|
|
//rewrite by larger Qos
|
|
if sub < qos {
|
|
subs[clientId] = qos
|
|
}
|
|
} else {
|
|
subs[clientId] = qos
|
|
}
|
|
|
|
return true
|
|
})
|
|
} else {
|
|
name := topics[0]
|
|
// Sub-Level
|
|
|
|
if sub, ok := sn.children.Load(name); ok {
|
|
sub.(*SubNode).Publish(topics[1:], subs)
|
|
}
|
|
// Multi wildcard
|
|
if sn.mw != nil {
|
|
sn.mw.Publish(topics[1:1], subs)
|
|
}
|
|
// Single wildcard
|
|
if sn.sw != nil {
|
|
sn.sw.Publish(topics[1:], subs)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sn *SubNode) Subscribe(topics []string, clientId string, qos packet.MsgQos) {
|
|
if len(topics) == 0 {
|
|
sn.clients.Store(clientId, qos)
|
|
return
|
|
}
|
|
|
|
name := topics[0]
|
|
if name == "#" {
|
|
if sn.mw == nil {
|
|
sn.mw = &SubNode{}
|
|
}
|
|
sn.mw.Subscribe(topics[1:1], clientId, qos)
|
|
} else if name == "+" {
|
|
if sn.sw == nil {
|
|
sn.sw = &SubNode{}
|
|
}
|
|
sn.sw.Subscribe(topics[1:], clientId, qos)
|
|
} else {
|
|
value, _ := sn.children.LoadOrStore(name, &SubNode{})
|
|
value.(*SubNode).Subscribe(topics[1:], clientId, qos)
|
|
}
|
|
}
|
|
|
|
func (sn *SubNode) UnSubscribe(topics []string, clientId string) {
|
|
if len(topics) == 0 {
|
|
sn.clients.Delete(clientId)
|
|
} else {
|
|
name := topics[0]
|
|
if name == "#" {
|
|
if sn.mw != nil {
|
|
sn.mw.UnSubscribe(topics[1:1], clientId)
|
|
}
|
|
} else if name == "+" {
|
|
if sn.sw != nil {
|
|
sn.sw.UnSubscribe(topics[1:], clientId)
|
|
}
|
|
} else {
|
|
sn.children.Range(func(key, value interface{}) bool {
|
|
value.(*SubNode).UnSubscribe(topics[1:], clientId)
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sn *SubNode) ClearClient(clientId string) {
|
|
sn.clients.Delete(clientId)
|
|
|
|
if sn.mw != nil {
|
|
sn.mw.ClearClient(clientId)
|
|
}
|
|
if sn.sw != nil {
|
|
sn.sw.ClearClient(clientId)
|
|
}
|
|
|
|
sn.children.Range(func(key, value interface{}) bool {
|
|
value.(*SubNode).ClearClient(clientId)
|
|
return true
|
|
})
|
|
}
|
|
|
|
type SubTree struct {
|
|
//tree root
|
|
root SubNode
|
|
}
|
|
|
|
func (st *SubTree) Publish(topic []byte, subs map[string]packet.MsgQos) {
|
|
topics := strings.Split(string(topic), "/")
|
|
if topics[0] == "" {
|
|
topics[0] = "/"
|
|
}
|
|
st.root.Publish(topics, subs)
|
|
}
|
|
|
|
func (st *SubTree) Subscribe(topic []byte, clientId string, qos packet.MsgQos) {
|
|
topics := strings.Split(string(topic), "/")
|
|
if topics[0] == "" {
|
|
topics[0] = "/"
|
|
}
|
|
st.root.Subscribe(topics, clientId, qos)
|
|
}
|
|
|
|
func (st *SubTree) UnSubscribe(topic []byte, clientId string) {
|
|
topics := strings.Split(string(topic), "/")
|
|
if topics[0] == "" {
|
|
topics[0] = "/"
|
|
}
|
|
st.root.UnSubscribe(topics, clientId)
|
|
}
|
|
|
|
func (st *SubTree) ClearClient(clientId string) {
|
|
st.root.ClearClient(clientId)
|
|
}
|