修改,将订阅树改为sync.Map

This commit is contained in:
Jason
2020-09-17 10:14:58 +08:00
parent 5285df49f8
commit 9b4d8508f7
4 changed files with 78 additions and 86 deletions

View File

@@ -1,9 +1,16 @@
package main package main
import "git.zgwit.com/iot/beeq" import (
"git.zgwit.com/iot/beeq"
"time"
)
func main() { func main() {
hive := beeq.NewHive() hive := beeq.NewHive()
hive.ListenAndServe(":1883") hive.ListenAndServe(":1883")
for {
time.Sleep(time.Minute)
}
} }

12
hive.go
View File

@@ -19,10 +19,10 @@ func reAlloc(buf []byte, l int) []byte {
type Hive struct { type Hive struct {
//Subscribe tree //Subscribe tree
subTree *SubTree subTree SubTree
//Retain tree //Retain tree
retainTree *RetainTree retainTree RetainTree
//ClientId->Bee //ClientId->Bee
bees sync.Map // map[string]*Bee bees sync.Map // map[string]*Bee
@@ -34,11 +34,9 @@ type Hive struct {
onDisconnect func(*packet.DisConnect, *Bee) onDisconnect func(*packet.DisConnect, *Bee)
} }
//TODO 添加参数
func NewHive() *Hive { func NewHive() *Hive {
return &Hive{ return &Hive{}
subTree: NewSubTree(),
retainTree: NewRetainTree(),
}
} }
func (h *Hive) ListenAndServe(addr string) error { func (h *Hive) ListenAndServe(addr string) error {
@@ -46,7 +44,7 @@ func (h *Hive) ListenAndServe(addr string) error {
if err != nil { if err != nil {
return err return err
} }
h.Serve(ln) go h.Serve(ln)
return nil return nil
} }

View File

@@ -3,53 +3,53 @@ package beeq
import ( import (
"git.zgwit.com/iot/beeq/packet" "git.zgwit.com/iot/beeq/packet"
"strings" "strings"
"sync"
) )
type RetainNode struct { type RetainNode struct {
//Subscribed retains //Subscribed retains
//clientId //clientId
retains map[string]*packet.Publish //retains map[string]*packet.Publish
retains sync.Map
//Sub level //Sub level
//topic->children //topic->children
children map[string]*RetainNode //children map[string]*RetainNode
} children sync.Map
func NewRetainNode() *RetainNode {
return &RetainNode{
retains: make(map[string]*packet.Publish),
children: make(map[string]*RetainNode),
}
} }
func (rn *RetainNode) Fetch(topics []string, cb func(clientId string, pub *packet.Publish)) { func (rn *RetainNode) Fetch(topics []string, cb func(clientId string, pub *packet.Publish)) {
if len(topics) == 0 { if len(topics) == 0 {
// Publish all matched retains // Publish all matched retains
for clientId, pub := range rn.retains { rn.retains.Range(func(key, value interface{}) bool {
cb(clientId, pub) cb(key.(string), value.(*packet.Publish))
} return true
})
} else { } else {
name := topics[0] name := topics[0]
if name == "#" { if name == "#" {
//All retains //All retains
for clientId, pub := range rn.retains { rn.retains.Range(func(key, value interface{}) bool {
cb(clientId, pub) cb(key.(string), value.(*packet.Publish))
} return true
})
//And all children //And all children
for _, sub := range rn.children { rn.children.Range(func(key, value interface{}) bool {
sub.Fetch(topics, cb) value.(*RetainNode).Fetch(topics, cb)
} return true
})
} else if name == "+" { } else if name == "+" {
//Children //Children
for _, sub := range rn.children { rn.children.Range(func(key, value interface{}) bool {
sub.Fetch(topics[1:], cb) value.(*RetainNode).Fetch(topics[1:], cb)
} return true
})
} else { } else {
// Sub-Level // Sub-Level
if sub, ok := rn.children[name]; ok { if value, ok := rn.children.Load(name); ok {
sub.Fetch(topics[1:], cb) value.(*RetainNode).Fetch(topics[1:], cb)
} }
} }
} }
@@ -58,33 +58,27 @@ func (rn *RetainNode) Fetch(topics []string, cb func(clientId string, pub *packe
func (rn *RetainNode) Retain(topics []string, clientId string, pub *packet.Publish) *RetainNode { func (rn *RetainNode) Retain(topics []string, clientId string, pub *packet.Publish) *RetainNode {
if len(topics) == 0 { if len(topics) == 0 {
// Publish to specific client // Publish to specific client
rn.retains[clientId] = pub rn.retains.Store(clientId, pub)
return rn return rn
} else { } else {
name := topics[0] name := topics[0]
// Sub-Level // Sub-Level
if _, ok := rn.children[name]; !ok { value, _ := rn.children.LoadOrStore(name, &RetainNode{})
rn.children[name] = NewRetainNode() return value.(*RetainNode).Retain(topics[1:], clientId, pub)
}
return rn.children[name].Retain(topics[1:], clientId, pub)
} }
} }
type RetainTree struct { type RetainTree struct {
//root //root
root *RetainNode root RetainNode
//tree index //tree index
//ClientId -> Node (hold Publish message) //ClientId -> Node (hold Publish message)
retains map[string]*RetainNode //retains map[string]*RetainNode
retains sync.Map
} }
func NewRetainTree() *RetainTree {
return &RetainTree{
root: NewRetainNode(),
}
}
func (rt *RetainTree) Fetch(topic []byte, cb func(clientId string, pub *packet.Publish)) { func (rt *RetainTree) Fetch(topic []byte, cb func(clientId string, pub *packet.Publish)) {
topics := strings.Split(string(topic), "/") topics := strings.Split(string(topic), "/")
@@ -105,12 +99,13 @@ func (rt *RetainTree) Retain(topic []byte, clientId string, pub *packet.Publish)
node := rt.root.Retain(topics, clientId, pub) node := rt.root.Retain(topics, clientId, pub)
//indexed node //indexed node
rt.retains[clientId] = node rt.retains.Store(clientId, node)
} }
func (rt *RetainTree) UnRetain(clientId string) { func (rt *RetainTree) UnRetain(clientId string) {
if node, ok := rt.retains[clientId]; ok { if value, ok := rt.retains.Load(clientId); ok {
delete(node.retains, clientId) node := value.(*RetainNode)
delete(rt.retains, clientId) node.retains.Delete(clientId)
rt.retains.Delete(clientId)
} }
} }

View File

@@ -3,16 +3,19 @@ package beeq
import ( import (
"git.zgwit.com/iot/beeq/packet" "git.zgwit.com/iot/beeq/packet"
"strings" "strings"
"sync"
) )
type SubNode struct { type SubNode struct {
//Subscribed clients //Subscribed clients
//clientId //clientId
clients map[string]packet.MsgQos //clients map[string]packet.MsgQos
clients sync.Map
//Sub level //Sub level
//topic->children //topic->children
children map[string]*SubNode //children map[string]*SubNode
children sync.Map
//Multi Wildcard # //Multi Wildcard #
mw *SubNode mw *SubNode
@@ -21,19 +24,13 @@ type SubNode struct {
sw *SubNode sw *SubNode
} }
func NewSubNode() *SubNode {
return &SubNode{
clients: make(map[string]packet.MsgQos),
children: make(map[string]*SubNode),
//mw: NewSubNode(),
//sw: NewSubNode(),
}
}
func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) { func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) {
if len(topics) == 0 { if len(topics) == 0 {
// Publish all matched clients // Publish all matched clients
for clientId, qos := range sn.clients { sn.clients.Range(func(key, value interface{}) bool {
clientId := key.(string)
qos := value.(packet.MsgQos)
if sub, ok := subs[clientId]; ok { if sub, ok := subs[clientId]; ok {
//rewrite by larger Qos //rewrite by larger Qos
if sub < qos { if sub < qos {
@@ -42,12 +39,15 @@ func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) {
} else { } else {
subs[clientId] = qos subs[clientId] = qos
} }
}
return true
})
} else { } else {
name := topics[0] name := topics[0]
// Sub-Level // Sub-Level
if sub, ok := sn.children[name]; ok {
sub.Publish(topics[1:], subs) if sub, ok := sn.children.Load(name); ok {
sub.(*SubNode).Publish(topics[1:], subs)
} }
// Multi wildcard // Multi wildcard
if sn.mw != nil { if sn.mw != nil {
@@ -62,32 +62,30 @@ func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) {
func (sn *SubNode) Subscribe(topics []string, clientId string, qos packet.MsgQos) { func (sn *SubNode) Subscribe(topics []string, clientId string, qos packet.MsgQos) {
if len(topics) == 0 { if len(topics) == 0 {
sn.clients[clientId] = qos sn.clients.Store(clientId, qos)
return return
} }
name := topics[0] name := topics[0]
if name == "#" { if name == "#" {
if sn.mw == nil { if sn.mw == nil {
sn.mw = NewSubNode() sn.mw = &SubNode{}
} }
sn.mw.Subscribe(topics[1:1], clientId, qos) sn.mw.Subscribe(topics[1:1], clientId, qos)
} else if name == "+" { } else if name == "+" {
if sn.sw == nil { if sn.sw == nil {
sn.sw = NewSubNode() sn.sw = &SubNode{}
} }
sn.sw.Subscribe(topics[1:], clientId, qos) sn.sw.Subscribe(topics[1:], clientId, qos)
} else { } else {
if _, ok := sn.children[name]; !ok { value, _ := sn.children.LoadOrStore(name, &SubNode{})
sn.children[name] = NewSubNode() value.(*SubNode).Subscribe(topics[1:], clientId, qos)
}
sn.children[name].Subscribe(topics[1:], clientId, qos)
} }
} }
func (sn *SubNode) UnSubscribe(topics []string, clientId string) { func (sn *SubNode) UnSubscribe(topics []string, clientId string) {
if len(topics) == 0 { if len(topics) == 0 {
delete(sn.clients, clientId) sn.clients.Delete(clientId)
} else { } else {
name := topics[0] name := topics[0]
if name == "#" { if name == "#" {
@@ -99,17 +97,16 @@ func (sn *SubNode) UnSubscribe(topics []string, clientId string) {
sn.sw.UnSubscribe(topics[1:], clientId) sn.sw.UnSubscribe(topics[1:], clientId)
} }
} else { } else {
if sub, ok := sn.children[name]; ok { sn.children.Range(func(key, value interface{}) bool {
sub.UnSubscribe(topics[1:], clientId) value.(*SubNode).UnSubscribe(topics[1:], clientId)
} return true
})
} }
} }
} }
func (sn *SubNode) ClearClient(clientId string) { func (sn *SubNode) ClearClient(clientId string) {
if _, ok := sn.clients[clientId]; ok { sn.clients.Delete(clientId)
delete(sn.clients, clientId)
}
if sn.mw != nil { if sn.mw != nil {
sn.mw.ClearClient(clientId) sn.mw.ClearClient(clientId)
@@ -118,20 +115,15 @@ func (sn *SubNode) ClearClient(clientId string) {
sn.sw.ClearClient(clientId) sn.sw.ClearClient(clientId)
} }
for _, sub := range sn.children { sn.children.Range(func(key, value interface{}) bool {
sub.ClearClient(clientId) value.(*SubNode).ClearClient(clientId)
} return true
})
} }
type SubTree struct { type SubTree struct {
//tree root //tree root
root *SubNode root SubNode
}
func NewSubTree() *SubTree {
return &SubTree{
root: NewSubNode(),
}
} }
func (st *SubTree) Publish(topic []byte, subs map[string]packet.MsgQos) { func (st *SubTree) Publish(topic []byte, subs map[string]packet.MsgQos) {