mirror of
https://github.com/zgwit/beeq.git
synced 2025-09-26 19:51:13 +08:00
一些简单的优化
This commit is contained in:
21
bee.go
21
bee.go
@@ -61,23 +61,22 @@ func (b *Bee) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Bee) send(msg packet.Message) {
|
||||
func (b *Bee) send(msg packet.Message) error{
|
||||
//log.Printf("Send message to %s: %s QOS(%d) DUP(%t) RETAIN(%t)", b.clientId, msg.Type().Name(), msg.Qos(), msg.Dup(), msg.Retain())
|
||||
if head, payload, err := msg.Encode(); err != nil {
|
||||
//TODO log
|
||||
log.Print("Message encode error: ", err)
|
||||
return err
|
||||
} else {
|
||||
//err := b.conn.SetWriteDeadline(time.Now().Add(b.timeout))
|
||||
_, err = b.conn.Write(head)
|
||||
if err != nil {
|
||||
//TODO 关闭bee
|
||||
//return err
|
||||
// 关闭bee
|
||||
return err
|
||||
}
|
||||
if payload != nil && len(payload) > 0 {
|
||||
_, err = b.conn.Write(payload)
|
||||
if err != nil {
|
||||
//TODO 关闭bee
|
||||
//return err
|
||||
// 关闭bee
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -91,6 +90,8 @@ func (b *Bee) send(msg packet.Message) {
|
||||
b.pub2.Store(pub.PacketId(), pub)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Bee) dispatch(msg packet.Message) {
|
||||
@@ -99,7 +100,11 @@ func (b *Bee) dispatch(msg packet.Message) {
|
||||
return
|
||||
}
|
||||
|
||||
b.send(msg)
|
||||
err := b.send(msg)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
//TODO 关闭
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
9
hive.go
9
hive.go
@@ -260,7 +260,7 @@ func (h *Hive) handlePublish(msg *packet.Publish, bee *Bee) {
|
||||
|
||||
if err := ValidTopic(msg.Topic()); err != nil {
|
||||
//TODO log
|
||||
log.Print("Topic invalid ", err)
|
||||
log.Println("Topic invalid ", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -280,7 +280,7 @@ func (h *Hive) handlePublish(msg *packet.Publish, bee *Bee) {
|
||||
for clientId, qos := range subs {
|
||||
if b, ok := h.bees.Load(clientId); ok {
|
||||
bb := b.(*Bee)
|
||||
if bb.conn == nil {
|
||||
if bb.closed {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -310,7 +310,7 @@ func (h *Hive) handleSubscribe(msg *packet.Subscribe, bee *Bee) {
|
||||
}
|
||||
|
||||
for _, st := range msg.Topics() {
|
||||
log.Print("Subscribe ", string(st.Topic()))
|
||||
//log.Print("Subscribe ", string(st.Topic()))
|
||||
if err := ValidSubscribe(st.Topic()); err != nil {
|
||||
log.Println("Invalid topic ", err)
|
||||
//log error
|
||||
@@ -341,9 +341,10 @@ func (h *Hive) handleUnSubscribe(msg *packet.UnSubscribe, bee *Bee) {
|
||||
|
||||
ack := packet.UNSUBACK.NewMessage().(*packet.UnSubAck)
|
||||
for _, t := range msg.Topics() {
|
||||
log.Print("UnSubscribe ", string(t))
|
||||
//log.Print("UnSubscribe ", string(t))
|
||||
if err := ValidSubscribe(t); err != nil {
|
||||
//TODO log
|
||||
log.Println(err)
|
||||
} else {
|
||||
h.subTree.UnSubscribe(t, bee.clientId)
|
||||
}
|
||||
|
Reference in New Issue
Block a user