Compare commits

...

2 Commits

Author SHA1 Message Date
JB
a5ff466029 Merge branch 'main' into recommended-minimum-keepalive 2023-10-01 21:44:32 +01:00
mochi-co
2225c25b1e Emit warning if client keepalive is less than recommended minimum 2023-09-29 20:04:39 +01:00
3 changed files with 42 additions and 2 deletions

View File

@@ -8,6 +8,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
@@ -21,8 +22,13 @@ import (
)
const (
defaultKeepalive uint16 = 10 // the default connection keepalive value in seconds
defaultKeepalive uint16 = 10 // the default connection keepalive value in seconds.
defaultClientProtocolVersion byte = 4 // the default mqtt protocol version of connecting clients (if somehow unspecified).
minimumKeepalive uint16 = 5 // the minimum recommended keepalive - values under with display a warning.
)
var (
ErrMinimumKeepalive = errors.New("client keepalive is below minimum recommended value and may exhibit connection instability")
)
// ReadFn is the function signature for the function used for reading and processing new packets.
@@ -211,6 +217,15 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
cl.Properties.Clean = pk.Connect.Clean
cl.Properties.Props = pk.Properties.Copy(false)
if pk.Connect.Keepalive <= minimumKeepalive {
cl.ops.log.Warn(
ErrMinimumKeepalive.Error(),
"client", cl.ID,
"keepalive", pk.Connect.Keepalive,
"recommended", minimumKeepalive,
)
}
cl.State.Keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
cl.State.Inflight.ResetReceiveQuota(int32(cl.ops.options.Capabilities.ReceiveMaximum)) // server receive max per client
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum)) // client receive max

View File

@@ -5,10 +5,14 @@
package mqtt
import (
"bufio"
"bytes"
"context"
"errors"
"io"
"log/slog"
"net"
"strings"
"sync/atomic"
"testing"
"time"
@@ -210,6 +214,27 @@ func TestClientParseConnectNoID(t *testing.T) {
require.NotEmpty(t, cl.ID)
}
func TestClientParseConnectBelowMinimumKeepalive(t *testing.T) {
cl, _, _ := newTestClient()
var b bytes.Buffer
x := bufio.NewWriter(&b)
cl.ops.log = slog.New(slog.NewTextHandler(x, nil))
pk := packets.Packet{
ProtocolVersion: 4,
Connect: packets.ConnectParams{
ProtocolName: []byte{'M', 'Q', 'T', 'T'},
Keepalive: minimumKeepalive - 1,
ClientIdentifier: "mochi",
},
}
cl.ParseConnect("tcp1", pk)
err := x.Flush()
require.NoError(t, err)
require.True(t, strings.Contains(b.String(), ErrMinimumKeepalive.Error()))
require.NotEmpty(t, cl.ID)
}
func TestClientNextPacketID(t *testing.T) {
cl, _, _ := newTestClient()

View File

@@ -26,7 +26,7 @@ import (
)
const (
Version = "2.4.0" // the current server version.
Version = "2.4.1" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
LocalListener = "local"
InlineClientId = "inline"