diff --git a/cmd/main.go b/cmd/main.go index f0016ae..cc927a4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,9 +1,16 @@ package main -import "git.zgwit.com/iot/beeq" +import ( + "git.zgwit.com/iot/beeq" + "time" +) func main() { hive := beeq.NewHive() hive.ListenAndServe(":1883") + + for { + time.Sleep(time.Minute) + } } \ No newline at end of file diff --git a/hive.go b/hive.go index db298e9..4da2e80 100644 --- a/hive.go +++ b/hive.go @@ -19,10 +19,10 @@ func reAlloc(buf []byte, l int) []byte { type Hive struct { //Subscribe tree - subTree *SubTree + subTree SubTree //Retain tree - retainTree *RetainTree + retainTree RetainTree //ClientId->Bee bees sync.Map // map[string]*Bee @@ -34,11 +34,9 @@ type Hive struct { onDisconnect func(*packet.DisConnect, *Bee) } +//TODO 添加参数 func NewHive() *Hive { - return &Hive{ - subTree: NewSubTree(), - retainTree: NewRetainTree(), - } + return &Hive{} } func (h *Hive) ListenAndServe(addr string) error { @@ -46,7 +44,7 @@ func (h *Hive) ListenAndServe(addr string) error { if err != nil { return err } - h.Serve(ln) + go h.Serve(ln) return nil } diff --git a/retaintree.go b/retaintree.go index a197fbe..1829ee8 100644 --- a/retaintree.go +++ b/retaintree.go @@ -3,53 +3,53 @@ package beeq import ( "git.zgwit.com/iot/beeq/packet" "strings" + "sync" ) type RetainNode struct { //Subscribed retains //clientId - retains map[string]*packet.Publish + //retains map[string]*packet.Publish + retains sync.Map //Sub level //topic->children - children map[string]*RetainNode -} - -func NewRetainNode() *RetainNode { - return &RetainNode{ - retains: make(map[string]*packet.Publish), - children: make(map[string]*RetainNode), - } + //children map[string]*RetainNode + children sync.Map } func (rn *RetainNode) Fetch(topics []string, cb func(clientId string, pub *packet.Publish)) { if len(topics) == 0 { // Publish all matched retains - for clientId, pub := range rn.retains { - cb(clientId, pub) - } + rn.retains.Range(func(key, value interface{}) bool { + cb(key.(string), value.(*packet.Publish)) + return true + }) } else { name := topics[0] if name == "#" { //All retains - for clientId, pub := range rn.retains { - cb(clientId, pub) - } + rn.retains.Range(func(key, value interface{}) bool { + cb(key.(string), value.(*packet.Publish)) + return true + }) //And all children - for _, sub := range rn.children { - sub.Fetch(topics, cb) - } + rn.children.Range(func(key, value interface{}) bool { + value.(*RetainNode).Fetch(topics, cb) + return true + }) } else if name == "+" { //Children - for _, sub := range rn.children { - sub.Fetch(topics[1:], cb) - } + rn.children.Range(func(key, value interface{}) bool { + value.(*RetainNode).Fetch(topics[1:], cb) + return true + }) } else { // Sub-Level - if sub, ok := rn.children[name]; ok { - sub.Fetch(topics[1:], cb) + if value, ok := rn.children.Load(name); ok { + value.(*RetainNode).Fetch(topics[1:], cb) } } } @@ -58,33 +58,27 @@ func (rn *RetainNode) Fetch(topics []string, cb func(clientId string, pub *packe func (rn *RetainNode) Retain(topics []string, clientId string, pub *packet.Publish) *RetainNode { if len(topics) == 0 { // Publish to specific client - rn.retains[clientId] = pub + rn.retains.Store(clientId, pub) return rn } else { name := topics[0] // Sub-Level - if _, ok := rn.children[name]; !ok { - rn.children[name] = NewRetainNode() - } - return rn.children[name].Retain(topics[1:], clientId, pub) + value, _ := rn.children.LoadOrStore(name, &RetainNode{}) + return value.(*RetainNode).Retain(topics[1:], clientId, pub) } } type RetainTree struct { //root - root *RetainNode + root RetainNode //tree index //ClientId -> Node (hold Publish message) - retains map[string]*RetainNode + //retains map[string]*RetainNode + retains sync.Map } -func NewRetainTree() *RetainTree { - return &RetainTree{ - root: NewRetainNode(), - } -} func (rt *RetainTree) Fetch(topic []byte, cb func(clientId string, pub *packet.Publish)) { topics := strings.Split(string(topic), "/") @@ -105,12 +99,13 @@ func (rt *RetainTree) Retain(topic []byte, clientId string, pub *packet.Publish) node := rt.root.Retain(topics, clientId, pub) //indexed node - rt.retains[clientId] = node + rt.retains.Store(clientId, node) } func (rt *RetainTree) UnRetain(clientId string) { - if node, ok := rt.retains[clientId]; ok { - delete(node.retains, clientId) - delete(rt.retains, clientId) + if value, ok := rt.retains.Load(clientId); ok { + node := value.(*RetainNode) + node.retains.Delete(clientId) + rt.retains.Delete(clientId) } } diff --git a/subtree.go b/subtree.go index 157e1b5..fd44319 100644 --- a/subtree.go +++ b/subtree.go @@ -3,16 +3,19 @@ package beeq import ( "git.zgwit.com/iot/beeq/packet" "strings" + "sync" ) type SubNode struct { //Subscribed clients //clientId - clients map[string]packet.MsgQos + //clients map[string]packet.MsgQos + clients sync.Map //Sub level //topic->children - children map[string]*SubNode + //children map[string]*SubNode + children sync.Map //Multi Wildcard # mw *SubNode @@ -21,19 +24,13 @@ type SubNode struct { sw *SubNode } -func NewSubNode() *SubNode { - return &SubNode{ - clients: make(map[string]packet.MsgQos), - children: make(map[string]*SubNode), - //mw: NewSubNode(), - //sw: NewSubNode(), - } -} - func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) { if len(topics) == 0 { // Publish all matched clients - for clientId, qos := range sn.clients { + sn.clients.Range(func(key, value interface{}) bool { + clientId := key.(string) + qos := value.(packet.MsgQos) + if sub, ok := subs[clientId]; ok { //rewrite by larger Qos if sub < qos { @@ -42,12 +39,15 @@ func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) { } else { subs[clientId] = qos } - } + + return true + }) } else { name := topics[0] // Sub-Level - if sub, ok := sn.children[name]; ok { - sub.Publish(topics[1:], subs) + + if sub, ok := sn.children.Load(name); ok { + sub.(*SubNode).Publish(topics[1:], subs) } // Multi wildcard if sn.mw != nil { @@ -62,32 +62,30 @@ func (sn *SubNode) Publish(topics []string, subs map[string]packet.MsgQos) { func (sn *SubNode) Subscribe(topics []string, clientId string, qos packet.MsgQos) { if len(topics) == 0 { - sn.clients[clientId] = qos + sn.clients.Store(clientId, qos) return } name := topics[0] if name == "#" { if sn.mw == nil { - sn.mw = NewSubNode() + sn.mw = &SubNode{} } sn.mw.Subscribe(topics[1:1], clientId, qos) } else if name == "+" { if sn.sw == nil { - sn.sw = NewSubNode() + sn.sw = &SubNode{} } sn.sw.Subscribe(topics[1:], clientId, qos) } else { - if _, ok := sn.children[name]; !ok { - sn.children[name] = NewSubNode() - } - sn.children[name].Subscribe(topics[1:], clientId, qos) + value, _ := sn.children.LoadOrStore(name, &SubNode{}) + value.(*SubNode).Subscribe(topics[1:], clientId, qos) } } func (sn *SubNode) UnSubscribe(topics []string, clientId string) { if len(topics) == 0 { - delete(sn.clients, clientId) + sn.clients.Delete(clientId) } else { name := topics[0] if name == "#" { @@ -99,17 +97,16 @@ func (sn *SubNode) UnSubscribe(topics []string, clientId string) { sn.sw.UnSubscribe(topics[1:], clientId) } } else { - if sub, ok := sn.children[name]; ok { - sub.UnSubscribe(topics[1:], clientId) - } + sn.children.Range(func(key, value interface{}) bool { + value.(*SubNode).UnSubscribe(topics[1:], clientId) + return true + }) } } } func (sn *SubNode) ClearClient(clientId string) { - if _, ok := sn.clients[clientId]; ok { - delete(sn.clients, clientId) - } + sn.clients.Delete(clientId) if sn.mw != nil { sn.mw.ClearClient(clientId) @@ -118,20 +115,15 @@ func (sn *SubNode) ClearClient(clientId string) { sn.sw.ClearClient(clientId) } - for _, sub := range sn.children { - sub.ClearClient(clientId) - } + sn.children.Range(func(key, value interface{}) bool { + value.(*SubNode).ClearClient(clientId) + return true + }) } type SubTree struct { //tree root - root *SubNode -} - -func NewSubTree() *SubTree { - return &SubTree{ - root: NewSubNode(), - } + root SubNode } func (st *SubTree) Publish(topic []byte, subs map[string]packet.MsgQos) {