mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-27 20:42:19 +08:00
Compare commits
2 Commits
v2.4.5
...
recommende
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a5ff466029 | ||
![]() |
2225c25b1e |
17
clients.go
17
clients.go
@@ -8,6 +8,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
@@ -21,8 +22,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultKeepalive uint16 = 10 // the default connection keepalive value in seconds
|
defaultKeepalive uint16 = 10 // the default connection keepalive value in seconds.
|
||||||
defaultClientProtocolVersion byte = 4 // the default mqtt protocol version of connecting clients (if somehow unspecified).
|
defaultClientProtocolVersion byte = 4 // the default mqtt protocol version of connecting clients (if somehow unspecified).
|
||||||
|
minimumKeepalive uint16 = 5 // the minimum recommended keepalive - values under with display a warning.
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrMinimumKeepalive = errors.New("client keepalive is below minimum recommended value and may exhibit connection instability")
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadFn is the function signature for the function used for reading and processing new packets.
|
// ReadFn is the function signature for the function used for reading and processing new packets.
|
||||||
@@ -211,6 +217,15 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
|
|||||||
cl.Properties.Clean = pk.Connect.Clean
|
cl.Properties.Clean = pk.Connect.Clean
|
||||||
cl.Properties.Props = pk.Properties.Copy(false)
|
cl.Properties.Props = pk.Properties.Copy(false)
|
||||||
|
|
||||||
|
if pk.Connect.Keepalive <= minimumKeepalive {
|
||||||
|
cl.ops.log.Warn(
|
||||||
|
ErrMinimumKeepalive.Error(),
|
||||||
|
"client", cl.ID,
|
||||||
|
"keepalive", pk.Connect.Keepalive,
|
||||||
|
"recommended", minimumKeepalive,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
cl.State.Keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
|
cl.State.Keepalive = pk.Connect.Keepalive // [MQTT-3.2.2-22]
|
||||||
cl.State.Inflight.ResetReceiveQuota(int32(cl.ops.options.Capabilities.ReceiveMaximum)) // server receive max per client
|
cl.State.Inflight.ResetReceiveQuota(int32(cl.ops.options.Capabilities.ReceiveMaximum)) // server receive max per client
|
||||||
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum)) // client receive max
|
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum)) // client receive max
|
||||||
|
@@ -5,10 +5,14 @@
|
|||||||
package mqtt
|
package mqtt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -210,6 +214,27 @@ func TestClientParseConnectNoID(t *testing.T) {
|
|||||||
require.NotEmpty(t, cl.ID)
|
require.NotEmpty(t, cl.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientParseConnectBelowMinimumKeepalive(t *testing.T) {
|
||||||
|
cl, _, _ := newTestClient()
|
||||||
|
var b bytes.Buffer
|
||||||
|
x := bufio.NewWriter(&b)
|
||||||
|
cl.ops.log = slog.New(slog.NewTextHandler(x, nil))
|
||||||
|
|
||||||
|
pk := packets.Packet{
|
||||||
|
ProtocolVersion: 4,
|
||||||
|
Connect: packets.ConnectParams{
|
||||||
|
ProtocolName: []byte{'M', 'Q', 'T', 'T'},
|
||||||
|
Keepalive: minimumKeepalive - 1,
|
||||||
|
ClientIdentifier: "mochi",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
cl.ParseConnect("tcp1", pk)
|
||||||
|
err := x.Flush()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, strings.Contains(b.String(), ErrMinimumKeepalive.Error()))
|
||||||
|
require.NotEmpty(t, cl.ID)
|
||||||
|
}
|
||||||
|
|
||||||
func TestClientNextPacketID(t *testing.T) {
|
func TestClientNextPacketID(t *testing.T) {
|
||||||
cl, _, _ := newTestClient()
|
cl, _, _ := newTestClient()
|
||||||
|
|
||||||
|
@@ -26,7 +26,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Version = "2.4.0" // the current server version.
|
Version = "2.4.1" // the current server version.
|
||||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||||
LocalListener = "local"
|
LocalListener = "local"
|
||||||
InlineClientId = "inline"
|
InlineClientId = "inline"
|
||||||
|
Reference in New Issue
Block a user