Files
beeq/subtree.go
2020-09-17 10:14:58 +08:00

156 lines
3.1 KiB
Go

package beeq
import (
"git.zgwit.com/iot/beeq/packet"
"strings"
"sync"
)
type SubNode struct {
//Subscribed clients
//clientId
//clients map[string]packet.MsgQos
clients sync.Map
//Sub level
//topic->children
//children map[string]*SubNode
children sync.Map
//Multi Wildcard #
mw *SubNode
//Single Wildcard +
sw *SubNode
}
func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) {
if len(topics) == 0 {
// Publish all matched clients
sn.clients.Range(func(key, value interface{}) bool {
clientId := key.(string)
qos := value.(packet.MsgQos)
if sub, ok := subs[clientId]; ok {
//rewrite by larger Qos
if sub < qos {
subs[clientId] = qos
}
} else {
subs[clientId] = qos
}
return true
})
} else {
name := topics[0]
// Sub-Level
if sub, ok := sn.children.Load(name); ok {
sub.(*SubNode).Publish(topics[1:], subs)
}
// Multi wildcard
if sn.mw != nil {
sn.mw.Publish(topics[1:1], subs)
}
// Single wildcard
if sn.sw != nil {
sn.sw.Publish(topics[1:], subs)
}
}
}
func (sn *SubNode) Subscribe(topics []string, clientId string, qos packet.MsgQos) {
if len(topics) == 0 {
sn.clients.Store(clientId, qos)
return
}
name := topics[0]
if name == "#" {
if sn.mw == nil {
sn.mw = &SubNode{}
}
sn.mw.Subscribe(topics[1:1], clientId, qos)
} else if name == "+" {
if sn.sw == nil {
sn.sw = &SubNode{}
}
sn.sw.Subscribe(topics[1:], clientId, qos)
} else {
value, _ := sn.children.LoadOrStore(name, &SubNode{})
value.(*SubNode).Subscribe(topics[1:], clientId, qos)
}
}
func (sn *SubNode) UnSubscribe(topics []string, clientId string) {
if len(topics) == 0 {
sn.clients.Delete(clientId)
} else {
name := topics[0]
if name == "#" {
if sn.mw != nil {
sn.mw.UnSubscribe(topics[1:1], clientId)
}
} else if name == "+" {
if sn.sw != nil {
sn.sw.UnSubscribe(topics[1:], clientId)
}
} else {
sn.children.Range(func(key, value interface{}) bool {
value.(*SubNode).UnSubscribe(topics[1:], clientId)
return true
})
}
}
}
func (sn *SubNode) ClearClient(clientId string) {
sn.clients.Delete(clientId)
if sn.mw != nil {
sn.mw.ClearClient(clientId)
}
if sn.sw != nil {
sn.sw.ClearClient(clientId)
}
sn.children.Range(func(key, value interface{}) bool {
value.(*SubNode).ClearClient(clientId)
return true
})
}
type SubTree struct {
//tree root
root SubNode
}
func (st *SubTree) Publish(topic []byte, subs map[string]packet.MsgQos) {
topics := strings.Split(string(topic), "/")
if topics[0] == "" {
topics[0] = "/"
}
st.root.Publish(topics, subs)
}
func (st *SubTree) Subscribe(topic []byte, clientId string, qos packet.MsgQos) {
topics := strings.Split(string(topic), "/")
if topics[0] == "" {
topics[0] = "/"
}
st.root.Subscribe(topics, clientId, qos)
}
func (st *SubTree) UnSubscribe(topic []byte, clientId string) {
topics := strings.Split(string(topic), "/")
if topics[0] == "" {
topics[0] = "/"
}
st.root.UnSubscribe(topics, clientId)
}
func (st *SubTree) ClearClient(clientId string) {
st.root.ClearClient(clientId)
}