Buffer optimizations (#355)

* Avoid creating buffer if pkt larger than ClientNetWriteBufferSize

* Use mempool for Properties Encode

* Use the more efficient Write instead of Write for Buffer to Buffer write

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
This commit is contained in:
thedevop
2024-01-10 00:15:06 -08:00
committed by GitHub
parent 10a02ab3c7
commit 83db7fff56
3 changed files with 28 additions and 20 deletions

View File

@@ -582,7 +582,7 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
}
n, err := func() (n int64, err error) {
n, err := func() (int64, error) {
cl.Lock()
defer cl.Unlock()
if len(cl.State.outbound) == 0 {
@@ -591,23 +591,26 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
}
// first write to buffer, then flush buffer
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
err = cl.flushOutbuf()
return
return int64(n), err
}
// there are more writes in the queue
if cl.Net.outbuf == nil {
if buf.Len() >= cl.ops.options.ClientNetWriteBufferSize {
return buf.WriteTo(cl.Net.Conn)
}
cl.Net.outbuf = new(bytes.Buffer)
}
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
if cl.Net.outbuf.Len() < cl.ops.options.ClientNetWriteBufferSize {
return
return int64(n), nil
}
err = cl.flushOutbuf()
return
return int64(n), err
}()
if err != nil {
return err

View File

@@ -348,7 +348,7 @@ func (pk *Packet) ConnectEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -512,7 +512,8 @@ func (pk *Packet) ConnackEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -557,7 +558,7 @@ func (pk *Packet) DisconnectEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -628,7 +629,7 @@ func (pk *Packet) PublishEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len() + len(pk.Payload)
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
buf.Write(pk.Payload)
return nil
@@ -719,7 +720,7 @@ func (pk *Packet) encodePubAckRelRecComp(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -858,7 +859,7 @@ func (pk *Packet) SubackEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -918,7 +919,7 @@ func (pk *Packet) SubscribeEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -1014,7 +1015,7 @@ func (pk *Packet) UnsubackEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -1070,7 +1071,7 @@ func (pk *Packet) UnsubscribeEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}
@@ -1132,7 +1133,7 @@ func (pk *Packet) AuthEncode(buf *bytes.Buffer) error {
pk.FixedHeader.Remaining = nb.Len()
pk.FixedHeader.Encode(buf)
_, _ = nb.WriteTo(buf)
buf.Write(nb.Bytes())
return nil
}

View File

@@ -8,6 +8,8 @@ import (
"bytes"
"fmt"
"strings"
"github.com/mochi-mqtt/server/v2/mempool"
)
const (
@@ -199,7 +201,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
return
}
var buf bytes.Buffer
buf := mempool.GetBuffer()
defer mempool.PutBuffer(buf)
if p.canEncode(pkt, PropPayloadFormat) && p.PayloadFormatFlag {
buf.WriteByte(PropPayloadFormat)
buf.WriteByte(p.PayloadFormat)
@@ -230,7 +233,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
for _, v := range p.SubscriptionIdentifier {
if v > 0 {
buf.WriteByte(PropSubscriptionIdentifier)
encodeLength(&buf, int64(v))
encodeLength(buf, int64(v))
}
}
}
@@ -321,7 +324,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
}
if !mods.DisallowProblemInfo && p.canEncode(pkt, PropUser) {
pb := bytes.NewBuffer([]byte{})
pb := mempool.GetBuffer()
defer mempool.PutBuffer(pb)
for _, v := range p.User {
pb.WriteByte(PropUser)
pb.Write(encodeString(v.Key))
@@ -355,7 +359,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
}
encodeLength(b, int64(buf.Len()))
_, _ = buf.WriteTo(b) // [MQTT-3.1.3-10]
b.Write(buf.Bytes()) // [MQTT-3.1.3-10]
}
// Decode decodes property bytes into a properties struct.