重新整理解析部分,OK了

This commit is contained in:
Jason
2020-09-14 17:15:53 +08:00
parent 7751f90a37
commit fb5d5e9077
5 changed files with 202 additions and 197 deletions

237
bee.go
View File

@@ -1,7 +1,6 @@
package beeq
import (
"encoding/binary"
"git.zgwit.com/iot/beeq/packet"
"log"
"net"
@@ -33,6 +32,9 @@ type Bee struct {
hive *Hive
//消息发送队列,避免主协程任务过重
msgQueue chan packet.Message
timeout time.Duration
}
@@ -44,104 +46,78 @@ func NewBee(conn net.Conn, hive *Hive) *Bee {
}
}
func (b *Bee) receiver() {
//Abort error
//defer func() {
// if r := recover(); r != nil {
// log.Print("b receiver panic ", r)
// }
//}()
readHead := true
buf := Alloc(6)
offset := 0
total := 0
for {
err := b.conn.SetReadDeadline(time.Now().Add(b.timeout))
if err != nil {
//return 0, err
break
}
if l, err := b.conn.Read(buf[offset:]); err != nil {
log.Print("Receive Failed: ", err)
//net.ErroTim
break
} else {
offset += l
}
//Parse header
if readHead && offset >= 2 {
for i := 1; i <= offset; i++ {
if buf[i] < 0x80 {
rl, rll := binary.Uvarint(buf[1:])
//binary.MaxVarintLen32
//binary.PutUvarint()
remainLength := int(rl)
total = remainLength + rll + 1
if total > 6 {
buf = ReAlloc(buf, total)
}
readHead = false
break
}
}
}
//Parse Message
if !readHead && offset >= total {
readHead = true
bb := Alloc(6)
if msg, l, err := packet.Decode(buf); err != nil {
//TODO log err
log.Println("", err)
offset = 0 //clear data
} else {
//处理消息
b.handleMessage(msg)
//Only message less than 6 bytes
if offset > l {
copy(bb, bb[l:])
offset -= l
} else {
offset = 0 //clear data
}
}
buf = bb
}
}
b.conn = nil
}
func (b *Bee) handleMessage(msg packet.Message) {
log.Printf("Received message from %s: %s QOS(%d) DUP(%t) RETAIN(%t)", b.clientId, msg.Type().Name(), msg.Qos(), msg.Dup(), msg.Retain())
//log.Print("recv Message:", msg.Type().Name())
switch msg.Type() {
case packet.CONNECT:
b.handleConnect(msg.(*packet.Connect))
case packet.PUBLISH:
b.handlePublish(msg.(*packet.Publish))
case packet.PUBACK:
b.handlePubAck(msg.(*packet.PubAck))
case packet.PUBREC:
b.handlePubRec(msg.(*packet.PubRec))
case packet.PUBREL:
b.handlePubRel(msg.(*packet.PubRel))
case packet.PUBCOMP:
b.handlePubComp(msg.(*packet.PubComp))
case packet.SUBSCRIBE:
b.handleSubscribe(msg.(*packet.Subscribe))
case packet.UNSUBSCRIBE:
b.handleUnSubscribe(msg.(*packet.UnSubscribe))
case packet.PINGREQ:
b.handlePingReq(msg.(*packet.PingReq))
case packet.DISCONNECT:
b.handleDisconnect(msg.(*packet.DisConnect))
}
}
//
//func (b *Bee) receiver() {
// //Abort error
// //defer func() {
// // if r := recover(); r != nil {
// // log.Print("b receiver panic ", r)
// // }
// //}()
//
// readHead := true
// buf := Alloc(6)
// offset := 0
// total := 0
// for {
//
// err := b.conn.SetReadDeadline(time.Now().Add(b.timeout))
// if err != nil {
// //return 0, err
// break
// }
// if l, err := b.conn.Read(buf[offset:]); err != nil {
// log.Print("Receive Failed: ", err)
// //net.ErroTim
// break
// } else {
// offset += l
// }
//
// //Parse header
// if readHead && offset >= 2 {
// for i := 1; i <= offset; i++ {
// if buf[i] < 0x80 {
// rl, rll := binary.Uvarint(buf[1:])
// //binary.MaxVarintLen32
// //binary.PutUvarint()
// remainLength := int(rl)
// total = remainLength + rll + 1
// if total > 6 {
// buf = ReAlloc(buf, total)
// }
// readHead = false
// break
// }
// }
// }
//
// //Parse Message
// if !readHead && offset >= total {
// readHead = true
// bb := Alloc(6)
// if msg, l, err := packet.Decode(buf); err != nil {
// //TODO log err
// log.Println("", err)
// offset = 0 //clear data
// } else {
// //处理消息
// b.handleMessage(msg)
//
// //Only message less than 6 bytes
// if offset > l {
// copy(bb, bb[l:])
// offset -= l
// } else {
// offset = 0 //clear data
// }
// }
// buf = bb
// }
// }
//
// b.conn = nil
//}
func (b *Bee) handleConnect(msg *packet.Connect) {
b.clientId = string(msg.ClientId())
@@ -161,58 +137,19 @@ func (b *Bee) handlePublish(msg *packet.Publish) {
//Reply PUBACK
ack := packet.PUBACK.NewMessage().(*packet.PubAck)
ack.SetPacketId(msg.PacketId())
b.dispatchMessage(ack)
b.dispatch(ack)
} else if qos == packet.Qos2 {
//Save & Send PUBREC
b.recvPub2.Store(msg.PacketId(), msg)
ack := packet.PUBREC.NewMessage().(*packet.PubRec)
ack.SetPacketId(msg.PacketId())
b.dispatchMessage(ack)
b.dispatch(ack)
} else {
//error
}
}
func (b *Bee) handlePubAck(msg *packet.PubAck) {
//if _, ok := b.pub1.Load(msg.PacketId()); ok {
b.pub1.Delete(msg.PacketId())
//}
}
func (b *Bee) handlePubRec(msg *packet.PubRec) {
msg.SetType(packet.PUBREL)
b.dispatchMessage(msg)
}
func (b *Bee) handlePubRel(msg *packet.PubRel) {
msg.SetType(packet.PUBCOMP)
b.dispatchMessage(msg)
}
func (b *Bee) handlePubComp(msg *packet.PubComp) {
//if _, ok := b.pub2.Load(msg.PacketId()); ok {
b.pub2.Delete(msg.PacketId())
//}
}
func (b *Bee) handleSubscribe(msg *packet.Subscribe) {
b.hive.handleSubscribe(msg, b)
}
func (b *Bee) handleUnSubscribe(msg *packet.UnSubscribe) {
b.hive.handleUnSubscribe(msg, b)
}
func (b *Bee) handlePingReq(msg *packet.PingReq) {
msg.SetType(packet.PINGRESP)
b.dispatchMessage(msg)
}
func (b *Bee) handleDisconnect(msg *packet.DisConnect) {
b.hive.handleDisconnect(msg, b)
}
func (b *Bee) dispatchMessage(msg packet.Message) {
func (b *Bee) send(msg packet.Message) {
//log.Printf("Send message to %s: %s QOS(%d) DUP(%t) RETAIN(%t)", b.clientId, msg.Type().Name(), msg.Qos(), msg.Dup(), msg.Retain())
if head, payload, err := msg.Encode(); err != nil {
//TODO log
@@ -243,3 +180,21 @@ func (b *Bee) dispatchMessage(msg packet.Message) {
}
}
}
func (b *Bee) dispatch(msg packet.Message) {
if b.msgQueue != nil {
b.msgQueue <- msg
return
}
b.send(msg)
}
func (b *Bee) sender() {
for b.conn != nil {
//TODO select or close
msg := <- b.msgQueue
b.send(msg)
}
}

150
hive.go
View File

@@ -53,61 +53,92 @@ func (h *Hive) Serve(ln net.Listener) {
func (h *Hive) Receive(conn net.Conn) {
//TODO 先解析第一个包而且必须是Connect
bee := NewBee(conn, h)
bufSize := 6
buf := make([]byte, bufSize)
of := 0
for {
n, err := conn.Read(buf[of:])
if err != nil {
log.Println(err)
return
}
ln := of + n
//TODO 解析包头,包体
if ln < 2 {
//TODO error
return
}
//读取Remain Length
rl, rll := binary.Uvarint(buf[1:])
remainLength := int(rl)
packLen := remainLength + rll + 1
b := NewBee(conn, h)
go b.receiver()
}
//读取未读完的包体
if packLen > bufSize {
buf = ReAlloc(buf, packLen)
func (h*Hive) receive(conn net.Conn) {
buf := make([]byte, 6)
n, err := conn.Read(buf)
if err != nil {
log.Println(err)
return
}
//TODO 解析包头,包体
if n < 2 {
//TODO error
return
}
//读取Remain Length
rl, rll := binary.Uvarint(buf[1:])
remainLength := int(rl)
packLen := remainLength + rll + 1
if packLen > 6 {
buf = ReAlloc(buf, packLen)
//直至将全部包体读完
offset := n
for offset< packLen {
n, err = conn.Read(buf[offset:])
if err != nil {
log.Println(err)
return
//直至将全部包体读完
o := n
for o < packLen {
n, err = conn.Read(buf[o:])
if err != nil {
log.Println(err)
return
}
o += n
}
offset += n
}
//解析消息
msg, err := packet.Decode(buf[:packLen])
if err != nil {
log.Println(err)
return
}
//TODO 处理消息
h.handle(msg, bee)
//TODO 剩余内容
if packLen < bufSize {
buf = ReAlloc(buf[packLen:], packLen -bufSize)
} else {
buf = make([]byte, bufSize)
}
}
}
msg, err := packet.Decode(buf[:packLen])
if err != nil {
log.Println(err)
return
func (h *Hive) handle(msg packet.Message, bee *Bee) {
switch msg.Type() {
case packet.CONNECT:
h.handleConnect(msg.(*packet.Connect), bee)
case packet.PUBLISH:
h.handlePublish(msg.(*packet.Publish), bee)
case packet.PUBACK:
bee.pub1.Delete(msg.(*packet.PubAck).PacketId())
case packet.PUBREC:
msg.SetType(packet.PUBREL)
bee.dispatch(msg)
case packet.PUBREL:
msg.SetType(packet.PUBCOMP)
bee.dispatch(msg)
case packet.PUBCOMP:
bee.pub2.Delete(msg.(*packet.PubComp).PacketId())
case packet.SUBSCRIBE:
h.handleSubscribe(msg.(*packet.Subscribe), bee)
case packet.UNSUBSCRIBE:
h.handleUnSubscribe(msg.(*packet.UnSubscribe), bee)
case packet.PINGREQ:
msg.SetType(packet.PINGRESP)
bee.dispatch(msg)
case packet.DISCONNECT:
h.handleDisconnect(msg.(*packet.DisConnect), bee)
}
//TODO 处理消息
h.handle(msg)
//TODO 剩余内容
if packLen < 6 {
b := make([]byte, )
}
}
func (h *Hive) handleConnect(msg *packet.Connect, bee *Bee) {
@@ -161,12 +192,31 @@ func (h *Hive) handleConnect(msg *packet.Connect, bee *Bee) {
}
//ack.SetCode(packet.CONNACK_ACCEPTED)
bee.dispatchMessage(ack)
bee.dispatch(ack)
//TODO 如果发生错误,与客户端断开连接
}
func (h *Hive) handlePublish(msg *packet.Publish, bee *Bee) {
qos := msg.Qos()
if qos == packet.Qos0 {
} else if qos == packet.Qos1 {
//Reply PUBACK
ack := packet.PUBACK.NewMessage().(*packet.PubAck)
ack.SetPacketId(msg.PacketId())
bee.dispatch(ack)
} else if qos == packet.Qos2 {
//Save & Send PUBREC
bee.recvPub2.Store(msg.PacketId(), msg)
ack := packet.PUBREC.NewMessage().(*packet.PubRec)
ack.SetPacketId(msg.PacketId())
bee.dispatch(ack)
} else {
//TODO error
}
if err := ValidTopic(msg.Topic()); err != nil {
//TODO log
log.Print("Topic invalid ", err)
@@ -199,7 +249,7 @@ func (h *Hive) handlePublish(msg *packet.Publish, bee *Bee) {
if msg.Qos() > qos {
pub.SetQos(qos)
}
bb.dispatchMessage(&pub)
bb.dispatch(&pub)
}
}
}
@@ -224,11 +274,11 @@ func (h *Hive) handleSubscribe(msg *packet.Subscribe, bee *Bee) {
if msg.Qos() > st.Qos() {
p.SetQos(st.Qos())
}
bee.dispatchMessage(&p)
bee.dispatch(&p)
})
}
}
bee.dispatchMessage(ack)
bee.dispatch(ack)
}
func (h *Hive) handleUnSubscribe(msg *packet.UnSubscribe, bee *Bee) {
@@ -241,7 +291,7 @@ func (h *Hive) handleUnSubscribe(msg *packet.UnSubscribe, bee *Bee) {
h.subTree.UnSubscribe(t, bee.clientId)
}
}
bee.dispatchMessage(ack)
bee.dispatch(ack)
}
func (h *Hive) handleDisconnect(msg *packet.DisConnect, bee *Bee) {

View File

@@ -14,7 +14,7 @@ func (msg *DisConnect) Decode(buf []byte) error {
//Tips. remain length is fixed 0 & total is fixed 2
total := len(buf)
if total < 2 {
return 0, fmt.Errorf("DisConnect expect fixed 2 bytes, got %d", total)
return fmt.Errorf("DisConnect expect fixed 2 bytes, got %d", total)
}
offset := 0
@@ -27,7 +27,7 @@ func (msg *DisConnect) Decode(buf []byte) error {
if l, n, err := ReadRemainLength(buf[offset:]); err != nil {
return err
} else if l != 0 {
return 0, fmt.Errorf("Remain length must be 0, got %d", l)
return fmt.Errorf("Remain length must be 0, got %d", l)
} else {
msg.remainLength = l
offset += n

View File

@@ -14,7 +14,7 @@ func (msg *PingReq) Decode(buf []byte) error {
//Tips. remain length is fixed 0 & total is fixed 2
total := len(buf)
if total < 2 {
return 0, fmt.Errorf("Ping expect fixed 2 bytes, got %d", total)
return fmt.Errorf("Ping expect fixed 2 bytes, got %d", total)
}
offset := 0
@@ -27,7 +27,7 @@ func (msg *PingReq) Decode(buf []byte) error {
if l, n, err := ReadRemainLength(buf[offset:]); err != nil {
return err
} else if l != 0 {
return 0, fmt.Errorf("Remain length must be 0, got %d", l)
return fmt.Errorf("Remain length must be 0, got %d", l)
} else {
msg.remainLength = l
offset += n

View File

@@ -26,7 +26,7 @@ func (msg *PubAck) Decode(buf []byte) error {
//Tips. remain length is fixed 2 & total is fixed 4
total := len(buf)
if total < 4 {
return 0, fmt.Errorf("Connack expect fixed 4 bytes (%d)", total)
return fmt.Errorf("Connack expect fixed 4 bytes (%d)", total)
}
offset := 0
@@ -39,7 +39,7 @@ func (msg *PubAck) Decode(buf []byte) error {
if l, n, err := ReadRemainLength(buf[offset:]); err != nil {
return err
} else if l != 2 {
return 0, fmt.Errorf("Remain length must be 2, got %d", l)
return fmt.Errorf("Remain length must be 2, got %d", l)
} else {
msg.remainLength = l
offset += n