diff --git a/hive.go b/hive.go index 399c63f..83f519b 100644 --- a/hive.go +++ b/hive.go @@ -64,11 +64,37 @@ func (h *Hive) Serve(ln net.Listener) { func (h *Hive) Receive(conn net.Conn) { //TODO 先解析第一个包,而且必须是Connect bee := NewBee(conn) + var parser packet.Parser + + buf := make([]byte, 1024) + for { + n, err := conn.Read(buf) + if err != nil { + log.Println(err) + break + } + + ms := parser.Parse(buf[:n]) + + //处理消息 + //TODO 可以放入队列 + for _, msg := range ms { + h.handle(msg, bee) + } + } + + _ = bee.Close() +} + +func (h *Hive) Receive2(conn net.Conn) { + //TODO 先解析第一个包,而且必须是Connect + bee := NewBee(conn) bufSize := 6 buf := make([]byte, bufSize) of := 0 for { + //TODO 先解析 n, err := conn.Read(buf[of:]) if err != nil { log.Println(err) diff --git a/packet/parser.go b/packet/parser.go new file mode 100644 index 0000000..1d8f4af --- /dev/null +++ b/packet/parser.go @@ -0,0 +1,75 @@ +package packet + +import ( + "encoding/binary" + "log" +) + +type Parser struct { + buf []byte +} + +func (p *Parser) Parse(buf []byte) []Message { + var b []byte + + //上次剩余 + if p.buf != nil { + b = append(p.buf, buf...) + p.buf = nil + } else { + //复制内存,避免覆盖 + b := make([]byte, len(buf)) + copy(b, buf) + + } + + //解析 + return p.parse(b) +} + + +func (p *Parser) parse(buf []byte) []Message { + + messages := make([]Message, 0) + + for { + remain := len(buf) + + if remain < 2 { + //包头都不够,等待剩余内容 + //可能需要 超时处理 + break + } + + + //读取Remain Length + rl, rll := binary.Uvarint(buf[1:]) + //TODO 判断是否够 + + remainLength := int(rl) + packLen := remainLength + rll + 1 + if packLen > remain { + //等待包体 + break + } + + msg, err := Decode(buf[:packLen]) + if err != nil { + log.Println(err) + buf = buf[packLen:] + continue + } + + messages = append(messages, msg) + + //切片,继续解析 + buf = buf[packLen:] + } + + if len(buf) > 0 { + //p.buf = buf[:] + p.buf = buf + } + + return messages +}