mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-12-24 12:58:05 +08:00
Compare commits
22 Commits
v2.4.4
...
file-based
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d84e10cb20 | ||
|
|
3447af233f | ||
|
|
e42f69987a | ||
|
|
e12bf9f0a4 | ||
|
|
3ff5054303 | ||
|
|
acd7d38fcf | ||
|
|
053f0b1f11 | ||
|
|
42fa81dd5c | ||
|
|
0b366d05eb | ||
|
|
8466fd078b | ||
|
|
26720c2f6e | ||
|
|
d30592b95b | ||
|
|
40e9cdb383 | ||
|
|
e9f72154b6 | ||
|
|
69412dd23c | ||
|
|
686c35ac0c | ||
|
|
65c78534dc | ||
|
|
32c577082d | ||
|
|
83db7fff56 | ||
|
|
10a02ab3c7 | ||
|
|
fedfb92470 | ||
|
|
ac5863644a |
13
Dockerfile
13
Dockerfile
@@ -11,21 +11,12 @@ RUN go mod download
|
||||
|
||||
COPY . ./
|
||||
|
||||
RUN go build -o /app/mochi ./cmd
|
||||
|
||||
RUN go build -o /app/mochi ./cmd/docker
|
||||
|
||||
FROM alpine
|
||||
|
||||
WORKDIR /
|
||||
COPY --from=builder /app/mochi .
|
||||
|
||||
# tcp
|
||||
EXPOSE 1883
|
||||
|
||||
# websockets
|
||||
EXPOSE 1882
|
||||
|
||||
# dashboard
|
||||
EXPOSE 8080
|
||||
|
||||
ENTRYPOINT [ "/mochi" ]
|
||||
CMD ["/cmd/docker", "--config", "config.yaml"]
|
||||
41
README.md
41
README.md
@@ -60,7 +60,6 @@ Unless it's a critical issue, new releases typically go out over the weekend.
|
||||
- Please [open an issue](https://github.com/mochi-mqtt/server/issues) to request new features or event hooks!
|
||||
- Cluster support.
|
||||
- Enhanced Metrics support.
|
||||
- File-based server configuration (supporting docker).
|
||||
|
||||
## Quick Start
|
||||
### Running the Broker with Go
|
||||
@@ -77,18 +76,50 @@ You can now pull and run the [official Mochi MQTT image](https://hub.docker.com/
|
||||
```sh
|
||||
docker pull mochimqtt/server
|
||||
or
|
||||
docker run mochimqtt/server
|
||||
docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server
|
||||
```
|
||||
|
||||
This is a work in progress, and a [file-based configuration](https://github.com/orgs/mochi-mqtt/projects/2) is being developed to better support this implementation. _More substantial docker support is being discussed [here](https://github.com/orgs/mochi-mqtt/discussions/281#discussion-5544545) and [here](https://github.com/orgs/mochi-mqtt/discussions/209). Please join the discussion if you use Mochi-MQTT in this environment._
|
||||
For most use cases, you can use File Based Configuration to configure the server, by specifying a valid `yaml` or `json` config file.
|
||||
|
||||
A simple Dockerfile is provided for running the [cmd/main.go](cmd/main.go) Websocket, TCP, and Stats server:
|
||||
A simple Dockerfile is provided for running the [cmd/main.go](cmd/main.go) Websocket, TCP, and Stats server, using the `allow-all` auth hook.
|
||||
|
||||
```sh
|
||||
docker build -t mochi:latest .
|
||||
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest
|
||||
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest
|
||||
```
|
||||
|
||||
### File Based Configuration
|
||||
You can use File Based Configuration with either the Docker image (described above), or by running the build binary with the `--config=config.yaml` or `--config=config.json` parameter.
|
||||
|
||||
Configuration files provide a convenient mechanism for easily preparing a server with the most common configurations. You can enable and configure built-in hooks and listeners, and specify server options and compatibilities:
|
||||
|
||||
```yaml
|
||||
listeners:
|
||||
- type: "tcp"
|
||||
id: "tcp12"
|
||||
address: ":1883"
|
||||
- type: "ws"
|
||||
id: "ws1"
|
||||
address: ":1882"
|
||||
- type: "sysinfo"
|
||||
id: "stats"
|
||||
address: ":1880"
|
||||
hooks:
|
||||
auth:
|
||||
allow_all: true
|
||||
options:
|
||||
inline_client: true
|
||||
```
|
||||
|
||||
Please review the examples found in `examples/config` for all available configuration options.
|
||||
|
||||
There are a few conditions to note:
|
||||
1. If you use file-based configuration, you can only have one of each hook type.
|
||||
2. You can only use built in hooks with file-based configuration, as the type and configuration structure needs to be known by the server in order for it to be applied.
|
||||
3. You can only use built in listeners, for the reasons above.
|
||||
|
||||
If you need to implement custom hooks or listeners, please do so using the traditional manner indicated in `cmd/main.go`.
|
||||
|
||||
## Developing with Mochi MQTT
|
||||
### Importing as a package
|
||||
Importing Mochi MQTT as a package requires just a few lines of code to get started.
|
||||
|
||||
19
clients.go
19
clients.go
@@ -215,6 +215,10 @@ func (cl *Client) ParseConnect(lid string, pk packets.Packet) {
|
||||
cl.Properties.Clean = pk.Connect.Clean
|
||||
cl.Properties.Props = pk.Properties.Copy(false)
|
||||
|
||||
if cl.Properties.Props.ReceiveMaximum > cl.ops.options.Capabilities.MaximumInflight { // 3.3.4 Non-normative
|
||||
cl.Properties.Props.ReceiveMaximum = cl.ops.options.Capabilities.MaximumInflight
|
||||
}
|
||||
|
||||
if pk.Connect.Keepalive <= minimumKeepalive {
|
||||
cl.ops.log.Warn(
|
||||
ErrMinimumKeepalive.Error(),
|
||||
@@ -582,7 +586,7 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
return packets.ErrPacketTooLarge // [MQTT-3.1.2-24] [MQTT-3.1.2-25]
|
||||
}
|
||||
|
||||
n, err := func() (n int64, err error) {
|
||||
n, err := func() (int64, error) {
|
||||
cl.Lock()
|
||||
defer cl.Unlock()
|
||||
if len(cl.State.outbound) == 0 {
|
||||
@@ -591,23 +595,26 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
|
||||
}
|
||||
|
||||
// first write to buffer, then flush buffer
|
||||
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
|
||||
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
|
||||
err = cl.flushOutbuf()
|
||||
return
|
||||
return int64(n), err
|
||||
}
|
||||
|
||||
// there are more writes in the queue
|
||||
if cl.Net.outbuf == nil {
|
||||
if buf.Len() >= cl.ops.options.ClientNetWriteBufferSize {
|
||||
return buf.WriteTo(cl.Net.Conn)
|
||||
}
|
||||
cl.Net.outbuf = new(bytes.Buffer)
|
||||
}
|
||||
|
||||
n, _ = buf.WriteTo(cl.Net.outbuf) // will always be successful
|
||||
n, _ := cl.Net.outbuf.Write(buf.Bytes()) // will always be successful
|
||||
if cl.Net.outbuf.Len() < cl.ops.options.ClientNetWriteBufferSize {
|
||||
return
|
||||
return int64(n), nil
|
||||
}
|
||||
|
||||
err = cl.flushOutbuf()
|
||||
return
|
||||
return int64(n), err
|
||||
}()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -37,6 +37,7 @@ func newTestClient() (cl *Client, r net.Conn, w net.Conn) {
|
||||
options: &Options{
|
||||
Capabilities: &Capabilities{
|
||||
ReceiveMaximum: 10,
|
||||
MaximumInflight: 5,
|
||||
TopicAliasMaximum: 10000,
|
||||
MaximumClientWritesPending: 3,
|
||||
maximumPacketID: 10,
|
||||
@@ -183,6 +184,45 @@ func TestClientParseConnect(t *testing.T) {
|
||||
require.Equal(t, int32(pk.Properties.ReceiveMaximum), cl.State.Inflight.maximumSendQuota)
|
||||
}
|
||||
|
||||
func TestClientParseConnectReceiveMaxExceedMaxInflight(t *testing.T) {
|
||||
const MaxInflight uint16 = 1
|
||||
cl, _, _ := newTestClient()
|
||||
cl.ops.options.Capabilities.MaximumInflight = MaxInflight
|
||||
|
||||
pk := packets.Packet{
|
||||
ProtocolVersion: 4,
|
||||
Connect: packets.ConnectParams{
|
||||
ProtocolName: []byte{'M', 'Q', 'T', 'T'},
|
||||
Clean: true,
|
||||
Keepalive: 60,
|
||||
ClientIdentifier: "mochi",
|
||||
WillFlag: true,
|
||||
WillTopic: "lwt",
|
||||
WillPayload: []byte("lol gg"),
|
||||
WillQos: 1,
|
||||
WillRetain: false,
|
||||
},
|
||||
Properties: packets.Properties{
|
||||
ReceiveMaximum: uint16(5),
|
||||
},
|
||||
}
|
||||
|
||||
cl.ParseConnect("tcp1", pk)
|
||||
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
|
||||
require.Equal(t, pk.Connect.Keepalive, cl.State.Keepalive)
|
||||
require.Equal(t, pk.Connect.Clean, cl.Properties.Clean)
|
||||
require.Equal(t, pk.Connect.ClientIdentifier, cl.ID)
|
||||
require.Equal(t, pk.Connect.WillTopic, cl.Properties.Will.TopicName)
|
||||
require.Equal(t, pk.Connect.WillPayload, cl.Properties.Will.Payload)
|
||||
require.Equal(t, pk.Connect.WillQos, cl.Properties.Will.Qos)
|
||||
require.Equal(t, pk.Connect.WillRetain, cl.Properties.Will.Retain)
|
||||
require.Equal(t, uint32(1), cl.Properties.Will.Flag)
|
||||
require.Equal(t, int32(cl.ops.options.Capabilities.ReceiveMaximum), cl.State.Inflight.receiveQuota)
|
||||
require.Equal(t, int32(cl.ops.options.Capabilities.ReceiveMaximum), cl.State.Inflight.maximumReceiveQuota)
|
||||
require.Equal(t, int32(MaxInflight), cl.State.Inflight.sendQuota)
|
||||
require.Equal(t, int32(MaxInflight), cl.State.Inflight.maximumSendQuota)
|
||||
}
|
||||
|
||||
func TestClientParseConnectOverrideWillDelay(t *testing.T) {
|
||||
cl, _, _ := newTestClient()
|
||||
|
||||
|
||||
66
cmd/docker/main.go
Normal file
66
cmd/docker/main.go
Normal file
@@ -0,0 +1,66 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-mqtt
|
||||
// SPDX-FileContributor: dgduncan, mochi-co
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/mochi-mqtt/server/v2/config"
|
||||
"log"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
)
|
||||
|
||||
func main() {
|
||||
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, nil))) // set basic logger to ensure logs before configuration are in a consistent format
|
||||
|
||||
configFile := flag.String("config", "config.yaml", "path to mochi config yaml or json file")
|
||||
flag.Parse()
|
||||
|
||||
entries, err := os.ReadDir("./")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, e := range entries {
|
||||
fmt.Println(e.Name())
|
||||
}
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
done := make(chan bool, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigs
|
||||
done <- true
|
||||
}()
|
||||
|
||||
configBytes, err := os.ReadFile(*configFile)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
options, err := config.FromBytes(configBytes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
server := mqtt.New(options)
|
||||
|
||||
go func() {
|
||||
err := server.Serve()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
<-done
|
||||
server.Log.Warn("caught signal, stopping...")
|
||||
_ = server.Close()
|
||||
server.Log.Info("mochi mqtt shutdown complete")
|
||||
}
|
||||
21
cmd/main.go
21
cmd/main.go
@@ -33,19 +33,31 @@ func main() {
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
tcp := listeners.NewTCP("t1", *tcpAddr, nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: *tcpAddr,
|
||||
})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
ws := listeners.NewWebsocket("ws1", *wsAddr, nil)
|
||||
ws := listeners.NewWebsocket(listeners.Config{
|
||||
ID: "ws1",
|
||||
Address: *wsAddr,
|
||||
})
|
||||
err = server.AddListener(ws)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
stats := listeners.NewHTTPStats("stats", *infoAddr, nil, server.Info)
|
||||
stats := listeners.NewHTTPStats(
|
||||
listeners.Config{
|
||||
ID: "info",
|
||||
Address: *infoAddr,
|
||||
},
|
||||
server.Info,
|
||||
)
|
||||
err = server.AddListener(stats)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -61,6 +73,5 @@ func main() {
|
||||
<-done
|
||||
server.Log.Warn("caught signal, stopping...")
|
||||
_ = server.Close()
|
||||
server.Log.Info("main.go finished")
|
||||
|
||||
server.Log.Info("mochi mqtt shutdown complete")
|
||||
}
|
||||
|
||||
15
config.yaml
Normal file
15
config.yaml
Normal file
@@ -0,0 +1,15 @@
|
||||
listeners:
|
||||
- type: "tcp"
|
||||
id: "tcp12"
|
||||
address: ":1883"
|
||||
- type: "ws"
|
||||
id: "ws1"
|
||||
address: ":1882"
|
||||
- type: "sysinfo"
|
||||
id: "stats"
|
||||
address: ":1880"
|
||||
hooks:
|
||||
auth:
|
||||
allow_all: true
|
||||
options:
|
||||
inline_client: true
|
||||
144
config/config.go
Normal file
144
config/config.go
Normal file
@@ -0,0 +1,144 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/auth"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/debug"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/bolt"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/redis"
|
||||
"github.com/mochi-mqtt/server/v2/listeners"
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
)
|
||||
|
||||
// config defines the structure of configuration data to be parsed from a config source.
|
||||
type config struct {
|
||||
Options mqtt.Options
|
||||
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
|
||||
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
|
||||
}
|
||||
|
||||
// HookConfigs contains configurations to enable individual hooks.
|
||||
type HookConfigs struct {
|
||||
Auth *HookAuthConfig `yaml:"auth" json:"auth"`
|
||||
Storage *HookStorageConfig `yaml:"storage" json:"storage"`
|
||||
Debug *debug.Options `yaml:"debug" json:"debug"`
|
||||
}
|
||||
|
||||
// HookAuthConfig contains configurations for the auth hook.
|
||||
type HookAuthConfig struct {
|
||||
Ledger auth.Ledger `yaml:"ledger" json:"ledger"`
|
||||
AllowAll bool `yaml:"allow_all" json:"allow_all"`
|
||||
}
|
||||
|
||||
// HookStorageConfig contains configurations for the different storage hooks.
|
||||
type HookStorageConfig struct {
|
||||
Badger *badger.Options `yaml:"badger" json:"badger"`
|
||||
Bolt *bolt.Options `yaml:"bolt" json:"bolt"`
|
||||
Redis *redis.Options `yaml:"redis" json:"redis"`
|
||||
}
|
||||
|
||||
// ToHooks converts Hook file configurations into Hooks to be added to the server.
|
||||
func (hc HookConfigs) ToHooks() []mqtt.HookLoadConfig {
|
||||
var hlc []mqtt.HookLoadConfig
|
||||
|
||||
if hc.Auth != nil {
|
||||
hlc = append(hlc, hc.toHooksAuth()...)
|
||||
}
|
||||
|
||||
if hc.Storage != nil {
|
||||
hlc = append(hlc, hc.toHooksStorage()...)
|
||||
}
|
||||
|
||||
if hc.Debug != nil {
|
||||
hlc = append(hlc, mqtt.HookLoadConfig{
|
||||
Hook: new(debug.Hook),
|
||||
Config: hc.Debug,
|
||||
})
|
||||
}
|
||||
|
||||
return hlc
|
||||
}
|
||||
|
||||
// toHooksAuth converts auth hook configurations into auth hooks.
|
||||
func (hc HookConfigs) toHooksAuth() []mqtt.HookLoadConfig {
|
||||
var hlc []mqtt.HookLoadConfig
|
||||
if hc.Auth.AllowAll {
|
||||
hlc = append(hlc, mqtt.HookLoadConfig{
|
||||
Hook: new(auth.AllowHook),
|
||||
})
|
||||
} else {
|
||||
hlc = append(hlc, mqtt.HookLoadConfig{
|
||||
Hook: new(auth.Hook),
|
||||
Config: &auth.Options{
|
||||
Ledger: &auth.Ledger{ // avoid copying sync.Locker
|
||||
Users: hc.Auth.Ledger.Users,
|
||||
Auth: hc.Auth.Ledger.Auth,
|
||||
ACL: hc.Auth.Ledger.ACL,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
return hlc
|
||||
}
|
||||
|
||||
// toHooksAuth converts storage hook configurations into storage hooks.
|
||||
func (hc HookConfigs) toHooksStorage() []mqtt.HookLoadConfig {
|
||||
var hlc []mqtt.HookLoadConfig
|
||||
if hc.Storage.Badger != nil {
|
||||
hlc = append(hlc, mqtt.HookLoadConfig{
|
||||
Hook: new(badger.Hook),
|
||||
Config: hc.Storage.Badger,
|
||||
})
|
||||
}
|
||||
|
||||
if hc.Storage.Bolt != nil {
|
||||
hlc = append(hlc, mqtt.HookLoadConfig{
|
||||
Hook: new(bolt.Hook),
|
||||
Config: hc.Storage.Bolt,
|
||||
})
|
||||
}
|
||||
|
||||
if hc.Storage.Redis != nil {
|
||||
hlc = append(hlc, mqtt.HookLoadConfig{
|
||||
Hook: new(redis.Hook),
|
||||
Config: hc.Storage.Redis,
|
||||
})
|
||||
}
|
||||
return hlc
|
||||
}
|
||||
|
||||
// FromBytes unmarshals a byte slice of JSON or YAML config data into a valid server options value.
|
||||
// Any hooks configurations are converted into Hooks using the toHooks methods in this package.
|
||||
func FromBytes(b []byte) (*mqtt.Options, error) {
|
||||
c := new(config)
|
||||
o := mqtt.Options{}
|
||||
|
||||
if len(b) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if b[0] == '{' {
|
||||
err := json.Unmarshal(b, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err := yaml.Unmarshal(b, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
o = c.Options
|
||||
o.Hooks = c.HookConfigs.ToHooks()
|
||||
o.Listeners = c.Listeners
|
||||
|
||||
return &o, nil
|
||||
}
|
||||
212
config/config_test.go
Normal file
212
config/config_test.go
Normal file
@@ -0,0 +1,212 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/mochi-mqtt/server/v2/hooks/auth"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/bolt"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/redis"
|
||||
"github.com/mochi-mqtt/server/v2/listeners"
|
||||
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
yamlBytes = []byte(`
|
||||
listeners:
|
||||
- type: "tcp"
|
||||
id: "file-tcp1"
|
||||
address: ":1883"
|
||||
hooks:
|
||||
auth:
|
||||
allow_all: true
|
||||
options:
|
||||
client_net_write_buffer_size: 2048
|
||||
capabilities:
|
||||
minimum_protocol_version: 3
|
||||
compatibilities:
|
||||
restore_sys_info_on_restart: true
|
||||
`)
|
||||
|
||||
jsonBytes = []byte(`{
|
||||
"listeners": [
|
||||
{
|
||||
"type": "tcp",
|
||||
"id": "file-tcp1",
|
||||
"address": ":1883"
|
||||
}
|
||||
],
|
||||
"hooks": {
|
||||
"auth": {
|
||||
"allow_all": true
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
"client_net_write_buffer_size": 2048,
|
||||
"capabilities": {
|
||||
"minimum_protocol_version": 3,
|
||||
"compatibilities": {
|
||||
"restore_sys_info_on_restart": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`)
|
||||
|
||||
parsedOptions = mqtt.Options{
|
||||
Listeners: []listeners.Config{
|
||||
{
|
||||
Type: listeners.TypeTCP,
|
||||
ID: "file-tcp1",
|
||||
Address: ":1883",
|
||||
},
|
||||
},
|
||||
Hooks: []mqtt.HookLoadConfig{
|
||||
{
|
||||
Hook: new(auth.AllowHook),
|
||||
},
|
||||
},
|
||||
ClientNetWriteBufferSize: 2048,
|
||||
Capabilities: &mqtt.Capabilities{
|
||||
MinimumProtocolVersion: 3,
|
||||
Compatibilities: mqtt.Compatibilities{
|
||||
RestoreSysInfoOnRestart: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestFromBytesEmptyL(t *testing.T) {
|
||||
_, err := FromBytes([]byte{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestFromBytesYAML(t *testing.T) {
|
||||
o, err := FromBytes(yamlBytes)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, parsedOptions, *o)
|
||||
}
|
||||
|
||||
func TestFromBytesYAMLError(t *testing.T) {
|
||||
_, err := FromBytes(append(yamlBytes, 'a'))
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestFromBytesJSON(t *testing.T) {
|
||||
o, err := FromBytes(jsonBytes)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, parsedOptions, *o)
|
||||
}
|
||||
|
||||
func TestFromBytesJSONError(t *testing.T) {
|
||||
_, err := FromBytes(append(jsonBytes, 'a'))
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestToHooksAuthAllowAll(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Auth: &HookAuthConfig{
|
||||
AllowAll: true,
|
||||
},
|
||||
}
|
||||
|
||||
th := hc.toHooksAuth()
|
||||
expect := []mqtt.HookLoadConfig{
|
||||
{Hook: new(auth.AllowHook)},
|
||||
}
|
||||
require.Equal(t, expect, th)
|
||||
}
|
||||
|
||||
func TestToHooksAuthAllowLedger(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Auth: &HookAuthConfig{
|
||||
Ledger: auth.Ledger{
|
||||
Auth: auth.AuthRules{
|
||||
{Username: "peach", Password: "password1", Allow: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
th := hc.toHooksAuth()
|
||||
expect := []mqtt.HookLoadConfig{
|
||||
{
|
||||
Hook: new(auth.Hook),
|
||||
Config: &auth.Options{
|
||||
Ledger: &auth.Ledger{ // avoid copying sync.Locker
|
||||
Auth: auth.AuthRules{
|
||||
{Username: "peach", Password: "password1", Allow: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expect, th)
|
||||
}
|
||||
|
||||
func TestToHooksStorageBadger(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Storage: &HookStorageConfig{
|
||||
Badger: &badger.Options{
|
||||
Path: "badger",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
th := hc.toHooksStorage()
|
||||
expect := []mqtt.HookLoadConfig{
|
||||
{
|
||||
Hook: new(badger.Hook),
|
||||
Config: hc.Storage.Badger,
|
||||
},
|
||||
}
|
||||
|
||||
require.Equal(t, expect, th)
|
||||
}
|
||||
|
||||
func TestToHooksStorageBolt(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Storage: &HookStorageConfig{
|
||||
Bolt: &bolt.Options{
|
||||
Path: "bolt",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
th := hc.toHooksStorage()
|
||||
expect := []mqtt.HookLoadConfig{
|
||||
{
|
||||
Hook: new(bolt.Hook),
|
||||
Config: hc.Storage.Bolt,
|
||||
},
|
||||
}
|
||||
|
||||
require.Equal(t, expect, th)
|
||||
}
|
||||
|
||||
func TestToHooksStorageRedis(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Storage: &HookStorageConfig{
|
||||
Redis: &redis.Options{
|
||||
Username: "test",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
th := hc.toHooksStorage()
|
||||
expect := []mqtt.HookLoadConfig{
|
||||
{
|
||||
Hook: new(redis.Hook),
|
||||
Config: hc.Storage.Redis,
|
||||
},
|
||||
}
|
||||
|
||||
require.Equal(t, expect, th)
|
||||
}
|
||||
@@ -63,7 +63,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -45,7 +45,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -32,7 +32,10 @@ func main() {
|
||||
server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
tcp := listeners.NewTCP("t1", *tcpAddr, nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: *tcpAddr,
|
||||
})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
92
examples/config/config.json
Normal file
92
examples/config/config.json
Normal file
@@ -0,0 +1,92 @@
|
||||
{
|
||||
"listeners": [
|
||||
{
|
||||
"type": "tcp",
|
||||
"id": "file-tcp1",
|
||||
"address": ":1883"
|
||||
},
|
||||
{
|
||||
"type": "ws",
|
||||
"id": "file-websocket",
|
||||
"address": ":1882"
|
||||
},
|
||||
{
|
||||
"type": "healthcheck",
|
||||
"id": "file-healthcheck",
|
||||
"address": ":1880"
|
||||
}
|
||||
],
|
||||
"hooks": {
|
||||
"debug": {
|
||||
"enable": true
|
||||
},
|
||||
"storage": {
|
||||
"badger": {
|
||||
"path": "badger.db",
|
||||
"gc_interval": 3,
|
||||
"gc_discard_ratio": 0.5
|
||||
},
|
||||
"bolt": {
|
||||
"path": "bolt.db"
|
||||
},
|
||||
"redis": {
|
||||
"h_prefix": "mc",
|
||||
"username": "mochi",
|
||||
"password": "melon",
|
||||
"address": "localhost:6379",
|
||||
"database": 1
|
||||
}
|
||||
},
|
||||
"auth": {
|
||||
"allow_all": false,
|
||||
"ledger": {
|
||||
"auth": [
|
||||
{
|
||||
"username": "peach",
|
||||
"password": "password1",
|
||||
"allow": true
|
||||
}
|
||||
],
|
||||
"acl": [
|
||||
{
|
||||
"remote": "127.0.0.1:*"
|
||||
},
|
||||
{
|
||||
"username": "melon",
|
||||
"filters": null,
|
||||
"melon/#": 3,
|
||||
"updates/#": 2
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
"client_net_write_buffer_size": 2048,
|
||||
"client_net_read_buffer_size": 2048,
|
||||
"sys_topic_resend_interval": 10,
|
||||
"inline_client": true,
|
||||
"capabilities": {
|
||||
"maximum_message_expiry_interval": 100,
|
||||
"maximum_client_writes_pending": 8192,
|
||||
"maximum_session_expiry_interval": 86400,
|
||||
"maximum_packet_size": 0,
|
||||
"receive_maximum": 1024,
|
||||
"maximum_inflight": 8192,
|
||||
"topic_alias_maximum": 65535,
|
||||
"shared_sub_available": 1,
|
||||
"minimum_protocol_version": 3,
|
||||
"maximum_qos": 2,
|
||||
"retain_available": 1,
|
||||
"wildcard_sub_available": 1,
|
||||
"sub_id_available": 1,
|
||||
"compatibilities": {
|
||||
"obscure_not_authorized": true,
|
||||
"passive_client_disconnect": false,
|
||||
"always_return_response_info": false,
|
||||
"restore_sys_info_on_restart": false,
|
||||
"no_inherited_properties_on_ack": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
64
examples/config/config.yaml
Normal file
64
examples/config/config.yaml
Normal file
@@ -0,0 +1,64 @@
|
||||
listeners:
|
||||
- type: "tcp"
|
||||
id: "file-tcp1"
|
||||
address: ":1883"
|
||||
- type: "ws"
|
||||
id: "file-websocket"
|
||||
address: ":1882"
|
||||
- type: "healthcheck"
|
||||
id: "file-healthcheck"
|
||||
address: ":1880"
|
||||
hooks:
|
||||
debug:
|
||||
enable: true
|
||||
storage:
|
||||
badger:
|
||||
path: badger.db
|
||||
gc_interval: 3
|
||||
gc_discard_ratio: 0.5
|
||||
bolt:
|
||||
path: bolt.db
|
||||
redis:
|
||||
h_prefix: "mc"
|
||||
username: "mochi"
|
||||
password: "melon"
|
||||
address: "localhost:6379"
|
||||
database: 1
|
||||
auth:
|
||||
allow_all: false
|
||||
ledger:
|
||||
auth:
|
||||
- username: peach
|
||||
password: password1
|
||||
allow: true
|
||||
acl:
|
||||
- remote: 127.0.0.1:*
|
||||
- username: melon
|
||||
filters:
|
||||
melon/#: 3
|
||||
updates/#: 2
|
||||
options:
|
||||
client_net_write_buffer_size: 2048
|
||||
client_net_read_buffer_size: 2048
|
||||
sys_topic_resend_interval: 10
|
||||
inline_client: true
|
||||
capabilities:
|
||||
maximum_message_expiry_interval: 100
|
||||
maximum_client_writes_pending: 8192
|
||||
maximum_session_expiry_interval: 86400
|
||||
maximum_packet_size: 0
|
||||
receive_maximum: 1024
|
||||
maximum_inflight: 8192
|
||||
topic_alias_maximum: 65535
|
||||
shared_sub_available: 1
|
||||
minimum_protocol_version: 3
|
||||
maximum_qos: 2
|
||||
retain_available: 1
|
||||
wildcard_sub_available: 1
|
||||
sub_id_available: 1
|
||||
compatibilities:
|
||||
obscure_not_authorized: true
|
||||
passive_client_disconnect: false
|
||||
always_return_response_info: false
|
||||
restore_sys_info_on_restart: false
|
||||
no_inherited_properties_on_ack: false
|
||||
49
examples/config/main.go
Normal file
49
examples/config/main.go
Normal file
@@ -0,0 +1,49 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/mochi-mqtt/server/v2/config"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
)
|
||||
|
||||
func main() {
|
||||
sigs := make(chan os.Signal, 1)
|
||||
done := make(chan bool, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigs
|
||||
done <- true
|
||||
}()
|
||||
|
||||
configBytes, err := os.ReadFile("config.json")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
options, err := config.FromBytes(configBytes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
server := mqtt.New(options)
|
||||
|
||||
go func() {
|
||||
err := server.Serve()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
<-done
|
||||
server.Log.Warn("caught signal, stopping...")
|
||||
_ = server.Close()
|
||||
server.Log.Info("main.go finished")
|
||||
}
|
||||
@@ -46,7 +46,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -33,7 +33,10 @@ func main() {
|
||||
})
|
||||
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -31,7 +31,10 @@ func main() {
|
||||
server.Options.Capabilities.Compatibilities.NoInheritedPropertiesOnAck = true
|
||||
|
||||
_ = server.AddHook(new(pahoAuthHook), nil)
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -5,15 +5,16 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
badgerdb "github.com/dgraph-io/badger"
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/auth"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
|
||||
"github.com/mochi-mqtt/server/v2/listeners"
|
||||
"github.com/timshannon/badgerhold"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -31,14 +32,39 @@ func main() {
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// AddHook adds a BadgerDB hook to the server with the specified options.
|
||||
// GcInterval specifies the interval at which BadgerDB garbage collection process runs.
|
||||
// Refer to https://dgraph.io/docs/badger/get-started/#garbage-collection for more information.
|
||||
err := server.AddHook(new(badger.Hook), &badger.Options{
|
||||
Path: badgerPath,
|
||||
|
||||
// Set the interval for garbage collection. Adjust according to your actual scenario.
|
||||
GcInterval: 5 * 60,
|
||||
|
||||
// GcDiscardRatio specifies the ratio of log discard compared to the maximum possible log discard.
|
||||
// Setting it to a higher value would result in fewer space reclaims, while setting it to a lower value
|
||||
// would result in more space reclaims at the cost of increased activity on the LSM tree.
|
||||
// discardRatio must be in the range (0.0, 1.0), both endpoints excluded, otherwise, it will be set to the default value of 0.5.
|
||||
// Adjust according to your actual scenario.
|
||||
GcDiscardRatio: 0.5,
|
||||
|
||||
Options: &badgerhold.Options{
|
||||
// BadgerDB options. Adjust according to your actual scenario.
|
||||
Options: badgerdb.Options{
|
||||
NumCompactors: 2, // Number of compactors. Compactions can be expensive.
|
||||
MaxTableSize: 64 << 20, // Maximum size of each table (64 MB).
|
||||
ValueLogFileSize: 100 * (1 << 20), // Set the default size of the log file to 100 MB.
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -40,7 +40,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -48,7 +48,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -38,7 +38,10 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -79,7 +79,9 @@ func main() {
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
tcp := listeners.NewTCP("t1", ":1883", &listeners.Config{
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: ":1883",
|
||||
TLSConfig: tlsConfig,
|
||||
})
|
||||
err = server.AddListener(tcp)
|
||||
@@ -87,7 +89,9 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
ws := listeners.NewWebsocket("ws1", ":1882", &listeners.Config{
|
||||
ws := listeners.NewWebsocket(listeners.Config{
|
||||
ID: "ws1",
|
||||
Address: ":1882",
|
||||
TLSConfig: tlsConfig,
|
||||
})
|
||||
err = server.AddListener(ws)
|
||||
@@ -95,9 +99,13 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
stats := listeners.NewHTTPStats("stats", ":8080", &listeners.Config{
|
||||
TLSConfig: tlsConfig,
|
||||
}, server.Info)
|
||||
stats := listeners.NewHTTPStats(
|
||||
listeners.Config{
|
||||
ID: "stats",
|
||||
Address: ":8080",
|
||||
TLSConfig: tlsConfig,
|
||||
}, server.Info,
|
||||
)
|
||||
err = server.AddListener(stats)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -27,7 +27,10 @@ func main() {
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
ws := listeners.NewWebsocket("ws1", ":1882", nil)
|
||||
ws := listeners.NewWebsocket(listeners.Config{
|
||||
ID: "ws1",
|
||||
Address: ":1882",
|
||||
})
|
||||
err := server.AddListener(ws)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
2
go.mod
2
go.mod
@@ -33,5 +33,5 @@ require (
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
google.golang.org/protobuf v1.28.1 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
)
|
||||
|
||||
4
go.sum
4
go.sum
@@ -124,8 +124,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
|
||||
google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM=
|
||||
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
|
||||
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
7
hooks.go
7
hooks.go
@@ -62,6 +62,12 @@ var (
|
||||
ErrInvalidConfigType = errors.New("invalid config type provided")
|
||||
)
|
||||
|
||||
// HookLoadConfig contains the hook and configuration as loaded from a configuration (usually file).
|
||||
type HookLoadConfig struct {
|
||||
Hook Hook
|
||||
Config any
|
||||
}
|
||||
|
||||
// Hook provides an interface of handlers for different events which occur
|
||||
// during the lifecycle of the broker.
|
||||
type Hook interface {
|
||||
@@ -70,6 +76,7 @@ type Hook interface {
|
||||
Init(config any) error
|
||||
Stop() error
|
||||
SetOpts(l *slog.Logger, o *HookOptions)
|
||||
|
||||
OnStarted()
|
||||
OnStopped()
|
||||
OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
|
||||
|
||||
@@ -16,9 +16,10 @@ import (
|
||||
|
||||
// Options contains configuration settings for the debug output.
|
||||
type Options struct {
|
||||
ShowPacketData bool // include decoded packet data (default false)
|
||||
ShowPings bool // show ping requests and responses (default false)
|
||||
ShowPasswords bool // show connecting user passwords (default false)
|
||||
Enable bool `yaml:"enable" json:"enable"` // non-zero field for enabling hook using file-based config
|
||||
ShowPacketData bool `yaml:"show_packet_data" json:"show_packet_data"` // include decoded packet data (default false)
|
||||
ShowPings bool `yaml:"show_pings" json:"show_pings"` // show ping requests and responses (default false)
|
||||
ShowPasswords bool `yaml:"show_passwords" json:"show_passwords"` // show connecting user passwords (default false)
|
||||
}
|
||||
|
||||
// Hook is a debugging hook which logs additional low-level information from the server.
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage"
|
||||
@@ -20,7 +21,9 @@ import (
|
||||
|
||||
const (
|
||||
// defaultDbFile is the default file path for the badger db file.
|
||||
defaultDbFile = ".badger"
|
||||
defaultDbFile = ".badger"
|
||||
defaultGcInterval = 5 * 60 // gc interval in seconds
|
||||
defaultGcDiscardRatio = 0.5
|
||||
)
|
||||
|
||||
// clientKey returns a primary key for a client.
|
||||
@@ -51,14 +54,21 @@ func sysInfoKey() string {
|
||||
// Options contains configuration settings for the BadgerDB instance.
|
||||
type Options struct {
|
||||
Options *badgerhold.Options
|
||||
Path string
|
||||
Path string `yaml:"path" json:"path"`
|
||||
// GcDiscardRatio specifies the ratio of log discard compared to the maximum possible log discard.
|
||||
// Setting it to a higher value would result in fewer space reclaims, while setting it to a lower value
|
||||
// would result in more space reclaims at the cost of increased activity on the LSM tree.
|
||||
// discardRatio must be in the range (0.0, 1.0), both endpoints excluded, otherwise, it will be set to the default value of 0.5.
|
||||
GcDiscardRatio float64 `yaml:"gc_discard_ratio" json:"gc_discard_ratio"`
|
||||
GcInterval int64 `yaml:"gc_interval" json:"gc_interval"`
|
||||
}
|
||||
|
||||
// Hook is a persistent storage hook based using BadgerDB file store as a backend.
|
||||
type Hook struct {
|
||||
mqtt.HookBase
|
||||
config *Options // options for configuring the BadgerDB instance.
|
||||
db *badgerhold.Store // the BadgerDB instance.
|
||||
config *Options // options for configuring the BadgerDB instance.
|
||||
gcTicker *time.Ticker // Ticker for BadgerDB garbage collection.
|
||||
db *badgerhold.Store // the BadgerDB instance.
|
||||
}
|
||||
|
||||
// ID returns the id of the hook.
|
||||
@@ -89,6 +99,21 @@ func (h *Hook) Provides(b byte) bool {
|
||||
}, []byte{b})
|
||||
}
|
||||
|
||||
// GcLoop periodically runs the garbage collection process to reclaim space in the value log files.
|
||||
// It uses a ticker to trigger the garbage collection at regular intervals specified by the configuration.
|
||||
// Refer to: https://dgraph.io/docs/badger/get-started/#garbage-collection
|
||||
func (h *Hook) GcLoop() {
|
||||
for range h.gcTicker.C {
|
||||
again:
|
||||
// Run the garbage collection process with a threshold.
|
||||
// If the process returns nil (success), repeat the process.
|
||||
err := h.db.Badger().RunValueLogGC(h.config.GcDiscardRatio)
|
||||
if err == nil {
|
||||
goto again // Retry garbage collection if successful.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes and connects to the badger instance.
|
||||
func (h *Hook) Init(config any) error {
|
||||
if _, ok := config.(*Options); !ok && config != nil {
|
||||
@@ -104,6 +129,14 @@ func (h *Hook) Init(config any) error {
|
||||
h.config.Path = defaultDbFile
|
||||
}
|
||||
|
||||
if h.config.GcInterval == 0 {
|
||||
h.config.GcInterval = defaultGcInterval
|
||||
}
|
||||
|
||||
if h.config.GcDiscardRatio <= 0.0 || h.config.GcDiscardRatio >= 1.0 {
|
||||
h.config.GcDiscardRatio = defaultGcDiscardRatio
|
||||
}
|
||||
|
||||
options := badgerhold.DefaultOptions
|
||||
options.Dir = h.config.Path
|
||||
options.ValueDir = h.config.Path
|
||||
@@ -115,11 +148,17 @@ func (h *Hook) Init(config any) error {
|
||||
return err
|
||||
}
|
||||
|
||||
h.gcTicker = time.NewTicker(time.Duration(h.config.GcInterval) * time.Second)
|
||||
go h.GcLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop closes the badger instance.
|
||||
func (h *Hook) Stop() error {
|
||||
if h.gcTicker != nil {
|
||||
h.gcTicker.Stop()
|
||||
}
|
||||
return h.db.Close()
|
||||
}
|
||||
|
||||
@@ -182,7 +221,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
|
||||
return
|
||||
}
|
||||
|
||||
if cl.StopCause() == packets.ErrSessionTakenOver {
|
||||
if errors.Is(cl.StopCause(), packets.ErrSessionTakenOver) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
badgerdb "github.com/dgraph-io/badger"
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage"
|
||||
"github.com/mochi-mqtt/server/v2/packets"
|
||||
@@ -702,3 +703,21 @@ func TestDebugf(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
h.Debugf("test", 1, 2, 3)
|
||||
}
|
||||
|
||||
func TestGcLoop(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
h.Init(&Options{
|
||||
GcInterval: 2, // Set the interval for garbage collection.
|
||||
Options: &badgerhold.Options{
|
||||
// BadgerDB options. Modify as needed.
|
||||
Options: badgerdb.Options{
|
||||
ValueLogFileSize: 1 << 20, // Set the default size of the log file to 1 MB.
|
||||
},
|
||||
},
|
||||
})
|
||||
defer teardown(t, h.config.Path, h)
|
||||
h.OnSessionEstablished(client, packets.Packet{})
|
||||
h.OnDisconnect(client, nil, true)
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func sysInfoKey() string {
|
||||
// Options contains configuration settings for the bolt instance.
|
||||
type Options struct {
|
||||
Options *bbolt.Options
|
||||
Path string
|
||||
Path string `yaml:"path" json:"path"`
|
||||
}
|
||||
|
||||
// Hook is a persistent storage hook based using boltdb file store as a backend.
|
||||
|
||||
@@ -51,8 +51,12 @@ func sysInfoKey() string {
|
||||
|
||||
// Options contains configuration settings for the bolt instance.
|
||||
type Options struct {
|
||||
HPrefix string
|
||||
Options *redis.Options
|
||||
Address string `yaml:"address" json:"address"`
|
||||
Username string `yaml:"username" json:"username"`
|
||||
Password string `yaml:"password" json:"password"`
|
||||
Database int `yaml:"database" json:"database"`
|
||||
HPrefix string `yaml:"h_prefix" json:"h_prefix"`
|
||||
Options *redis.Options
|
||||
}
|
||||
|
||||
// Hook is a persistent storage hook based using Redis as a backend.
|
||||
@@ -105,23 +109,31 @@ func (h *Hook) Init(config any) error {
|
||||
h.ctx = context.Background()
|
||||
|
||||
if config == nil {
|
||||
config = &Options{
|
||||
Options: &redis.Options{
|
||||
Addr: defaultAddr,
|
||||
},
|
||||
config = new(Options)
|
||||
}
|
||||
h.config = config.(*Options)
|
||||
if h.config.Options == nil {
|
||||
h.config.Options = &redis.Options{
|
||||
Addr: defaultAddr,
|
||||
}
|
||||
h.config.Options.Addr = h.config.Address
|
||||
h.config.Options.DB = h.config.Database
|
||||
h.config.Options.Username = h.config.Username
|
||||
h.config.Options.Password = h.config.Password
|
||||
}
|
||||
|
||||
h.config = config.(*Options)
|
||||
if h.config.HPrefix == "" {
|
||||
h.config.HPrefix = defaultHPrefix
|
||||
}
|
||||
|
||||
h.Log.Info("connecting to redis service",
|
||||
h.Log.Info(
|
||||
"connecting to redis service",
|
||||
"prefix", h.config.HPrefix,
|
||||
"address", h.config.Options.Addr,
|
||||
"username", h.config.Options.Username,
|
||||
"password-len", len(h.config.Options.Password),
|
||||
"db", h.config.Options.DB)
|
||||
"db", h.config.Options.DB,
|
||||
)
|
||||
|
||||
h.db = redis.NewClient(h.config.Options)
|
||||
_, err := h.db.Ping(context.Background()).Result()
|
||||
|
||||
@@ -135,6 +135,29 @@ func TestInitUseDefaults(t *testing.T) {
|
||||
require.Equal(t, defaultAddr, h.config.Options.Addr)
|
||||
}
|
||||
|
||||
func TestInitUsePassConfig(t *testing.T) {
|
||||
s := miniredis.RunT(t)
|
||||
s.StartAddr(defaultAddr)
|
||||
defer s.Close()
|
||||
|
||||
h := newHook(t, "")
|
||||
h.SetOpts(logger, nil)
|
||||
|
||||
err := h.Init(&Options{
|
||||
Address: defaultAddr,
|
||||
Username: "username",
|
||||
Password: "password",
|
||||
Database: 2,
|
||||
})
|
||||
require.Error(t, err)
|
||||
h.db.FlushAll(h.ctx)
|
||||
|
||||
require.Equal(t, defaultAddr, h.config.Options.Addr)
|
||||
require.Equal(t, "username", h.config.Options.Username)
|
||||
require.Equal(t, "password", h.config.Options.Password)
|
||||
require.Equal(t, 2, h.config.Options.DB)
|
||||
}
|
||||
|
||||
func TestInitBadConfig(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
|
||||
@@ -13,24 +13,23 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const TypeHealthCheck = "healthcheck"
|
||||
|
||||
// HTTPHealthCheck is a listener for providing an HTTP healthcheck endpoint.
|
||||
type HTTPHealthCheck struct {
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener
|
||||
address string // the network address to bind to
|
||||
config *Config // configuration values for the listener
|
||||
config Config // configuration values for the listener
|
||||
listen *http.Server // the http server
|
||||
end uint32 // ensure the close methods are only called once
|
||||
}
|
||||
|
||||
// NewHTTPHealthCheck initialises and returns a new HTTP listener, listening on an address.
|
||||
func NewHTTPHealthCheck(id, address string, config *Config) *HTTPHealthCheck {
|
||||
if config == nil {
|
||||
config = new(Config)
|
||||
}
|
||||
// NewHTTPHealthCheck initializes and returns a new HTTP listener, listening on an address.
|
||||
func NewHTTPHealthCheck(config Config) *HTTPHealthCheck {
|
||||
return &HTTPHealthCheck{
|
||||
id: id,
|
||||
address: address,
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,47 +14,44 @@ import (
|
||||
)
|
||||
|
||||
func TestNewHTTPHealthCheck(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, "healthcheck", l.id)
|
||||
require.Equal(t, testAddr, l.address)
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
require.Equal(t, basicConfig.ID, l.id)
|
||||
require.Equal(t, basicConfig.Address, l.address)
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckID(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, "healthcheck", l.ID())
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
require.Equal(t, basicConfig.ID, l.ID())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckAddress(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
require.Equal(t, testAddr, l.Address())
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
require.Equal(t, basicConfig.Address, l.Address())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckProtocol(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
require.Equal(t, "http", l.Protocol())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckTLSProtocol(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
|
||||
l := NewHTTPHealthCheck(tlsConfig)
|
||||
_ = l.Init(logger)
|
||||
require.Equal(t, "https", l.Protocol())
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckInit(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NotNil(t, l.listen)
|
||||
require.Equal(t, testAddr, l.listen.Addr)
|
||||
require.Equal(t, basicConfig.Address, l.listen.Addr)
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckServeAndClose(t *testing.T) {
|
||||
// setup http stats listener
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -90,7 +87,7 @@ func TestHTTPHealthCheckServeAndClose(t *testing.T) {
|
||||
|
||||
func TestHTTPHealthCheckServeAndCloseMethodNotAllowed(t *testing.T) {
|
||||
// setup http stats listener
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, nil)
|
||||
l := NewHTTPHealthCheck(basicConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -125,10 +122,7 @@ func TestHTTPHealthCheckServeAndCloseMethodNotAllowed(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHTTPHealthCheckServeTLSAndClose(t *testing.T) {
|
||||
l := NewHTTPHealthCheck("healthcheck", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
|
||||
l := NewHTTPHealthCheck(tlsConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -17,27 +17,26 @@ import (
|
||||
"github.com/mochi-mqtt/server/v2/system"
|
||||
)
|
||||
|
||||
const TypeSysInfo = "sysinfo"
|
||||
|
||||
// HTTPStats is a listener for presenting the server $SYS stats on a JSON http endpoint.
|
||||
type HTTPStats struct {
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener
|
||||
address string // the network address to bind to
|
||||
config *Config // configuration values for the listener
|
||||
config Config // configuration values for the listener
|
||||
listen *http.Server // the http server
|
||||
sysInfo *system.Info // pointers to the server data
|
||||
log *slog.Logger // server logger
|
||||
end uint32 // ensure the close methods are only called once
|
||||
}
|
||||
|
||||
// NewHTTPStats initialises and returns a new HTTP listener, listening on an address.
|
||||
func NewHTTPStats(id, address string, config *Config, sysInfo *system.Info) *HTTPStats {
|
||||
if config == nil {
|
||||
config = new(Config)
|
||||
}
|
||||
// NewHTTPStats initializes and returns a new HTTP listener, listening on an address.
|
||||
func NewHTTPStats(config Config, sysInfo *system.Info) *HTTPStats {
|
||||
return &HTTPStats{
|
||||
id: id,
|
||||
address: address,
|
||||
sysInfo: sysInfo,
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,38 +17,35 @@ import (
|
||||
)
|
||||
|
||||
func TestNewHTTPStats(t *testing.T) {
|
||||
l := NewHTTPStats("t1", testAddr, nil, nil)
|
||||
l := NewHTTPStats(basicConfig, nil)
|
||||
require.Equal(t, "t1", l.id)
|
||||
require.Equal(t, testAddr, l.address)
|
||||
}
|
||||
|
||||
func TestHTTPStatsID(t *testing.T) {
|
||||
l := NewHTTPStats("t1", testAddr, nil, nil)
|
||||
l := NewHTTPStats(basicConfig, nil)
|
||||
require.Equal(t, "t1", l.ID())
|
||||
}
|
||||
|
||||
func TestHTTPStatsAddress(t *testing.T) {
|
||||
l := NewHTTPStats("t1", testAddr, nil, nil)
|
||||
l := NewHTTPStats(basicConfig, nil)
|
||||
require.Equal(t, testAddr, l.Address())
|
||||
}
|
||||
|
||||
func TestHTTPStatsProtocol(t *testing.T) {
|
||||
l := NewHTTPStats("t1", testAddr, nil, nil)
|
||||
l := NewHTTPStats(basicConfig, nil)
|
||||
require.Equal(t, "http", l.Protocol())
|
||||
}
|
||||
|
||||
func TestHTTPStatsTLSProtocol(t *testing.T) {
|
||||
l := NewHTTPStats("t1", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
}, nil)
|
||||
|
||||
l := NewHTTPStats(tlsConfig, nil)
|
||||
_ = l.Init(logger)
|
||||
require.Equal(t, "https", l.Protocol())
|
||||
}
|
||||
|
||||
func TestHTTPStatsInit(t *testing.T) {
|
||||
sysInfo := new(system.Info)
|
||||
l := NewHTTPStats("t1", testAddr, nil, sysInfo)
|
||||
l := NewHTTPStats(basicConfig, sysInfo)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -64,7 +61,7 @@ func TestHTTPStatsServeAndClose(t *testing.T) {
|
||||
}
|
||||
|
||||
// setup http stats listener
|
||||
l := NewHTTPStats("t1", testAddr, nil, sysInfo)
|
||||
l := NewHTTPStats(basicConfig, sysInfo)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -109,9 +106,7 @@ func TestHTTPStatsServeTLSAndClose(t *testing.T) {
|
||||
Version: "test",
|
||||
}
|
||||
|
||||
l := NewHTTPStats("t1", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
}, sysInfo)
|
||||
l := NewHTTPStats(tlsConfig, sysInfo)
|
||||
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
@@ -132,7 +127,9 @@ func TestHTTPStatsFailedToServe(t *testing.T) {
|
||||
}
|
||||
|
||||
// setup http stats listener
|
||||
l := NewHTTPStats("t1", "wrong_addr", nil, sysInfo)
|
||||
config := basicConfig
|
||||
config.Address = "wrong_addr"
|
||||
l := NewHTTPStats(config, sysInfo)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -14,8 +14,10 @@ import (
|
||||
|
||||
// Config contains configuration values for a listener.
|
||||
type Config struct {
|
||||
// TLSConfig is a tls.Config configuration to be used with the listener.
|
||||
// See examples folder for basic and mutual-tls use.
|
||||
Type string
|
||||
ID string
|
||||
Address string
|
||||
// TLSConfig is a tls.Config configuration to be used with the listener. See examples folder for basic and mutual-tls use.
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,9 @@ import (
|
||||
const testAddr = ":22222"
|
||||
|
||||
var (
|
||||
basicConfig = Config{ID: "t1", Address: testAddr}
|
||||
tlsConfig = Config{ID: "t1", Address: testAddr, TLSConfig: tlsConfigBasic}
|
||||
|
||||
logger = slog.New(slog.NewTextHandler(os.Stdout, nil))
|
||||
|
||||
testCertificate = []byte(`-----BEGIN CERTIFICATE-----
|
||||
@@ -65,6 +68,7 @@ func init() {
|
||||
MinVersion: tls.VersionTLS12,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}
|
||||
tlsConfig.TLSConfig = tlsConfigBasic
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
|
||||
@@ -12,6 +12,8 @@ import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
const TypeMock = "mock"
|
||||
|
||||
// MockEstablisher is a function signature which can be used in testing.
|
||||
func MockEstablisher(id string, c net.Conn) error {
|
||||
return nil
|
||||
|
||||
@@ -13,26 +13,24 @@ import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
const TypeTCP = "tcp"
|
||||
|
||||
// TCP is a listener for establishing client connections on basic TCP protocol.
|
||||
type TCP struct { // [MQTT-4.2.0-1]
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener
|
||||
address string // the network address to bind to
|
||||
listen net.Listener // a net.Listener which will listen for new clients
|
||||
config *Config // configuration values for the listener
|
||||
config Config // configuration values for the listener
|
||||
log *slog.Logger // server logger
|
||||
end uint32 // ensure the close methods are only called once
|
||||
}
|
||||
|
||||
// NewTCP initialises and returns a new TCP listener, listening on an address.
|
||||
func NewTCP(id, address string, config *Config) *TCP {
|
||||
if config == nil {
|
||||
config = new(Config)
|
||||
}
|
||||
|
||||
// NewTCP initializes and returns a new TCP listener, listening on an address.
|
||||
func NewTCP(config Config) *TCP {
|
||||
return &TCP{
|
||||
id: id,
|
||||
address: address,
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,45 +14,40 @@ import (
|
||||
)
|
||||
|
||||
func TestNewTCP(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
require.Equal(t, "t1", l.id)
|
||||
require.Equal(t, testAddr, l.address)
|
||||
}
|
||||
|
||||
func TestTCPID(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
require.Equal(t, "t1", l.ID())
|
||||
}
|
||||
|
||||
func TestTCPAddress(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
require.Equal(t, testAddr, l.Address())
|
||||
}
|
||||
|
||||
func TestTCPProtocol(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
require.Equal(t, "tcp", l.Protocol())
|
||||
}
|
||||
|
||||
func TestTCPProtocolTLS(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
|
||||
l := NewTCP(tlsConfig)
|
||||
_ = l.Init(logger)
|
||||
defer l.listen.Close()
|
||||
require.Equal(t, "tcp", l.Protocol())
|
||||
}
|
||||
|
||||
func TestTCPInit(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
err := l.Init(logger)
|
||||
l.Close(MockCloser)
|
||||
require.NoError(t, err)
|
||||
|
||||
l2 := NewTCP("t2", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
l2 := NewTCP(tlsConfig)
|
||||
err = l2.Init(logger)
|
||||
l2.Close(MockCloser)
|
||||
require.NoError(t, err)
|
||||
@@ -60,7 +55,7 @@ func TestTCPInit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTCPServeAndClose(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -85,9 +80,7 @@ func TestTCPServeAndClose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTCPServeTLSAndClose(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
l := NewTCP(tlsConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -109,7 +102,7 @@ func TestTCPServeTLSAndClose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTCPEstablishThenEnd(t *testing.T) {
|
||||
l := NewTCP("t1", testAddr, nil)
|
||||
l := NewTCP(basicConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -13,21 +13,25 @@ import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
const TypeUnix = "unix"
|
||||
|
||||
// UnixSock is a listener for establishing client connections on basic UnixSock protocol.
|
||||
type UnixSock struct {
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener.
|
||||
address string // the network address to bind to.
|
||||
config Config // configuration values for the listener
|
||||
listen net.Listener // a net.Listener which will listen for new clients.
|
||||
log *slog.Logger // server logger
|
||||
end uint32 // ensure the close methods are only called once.
|
||||
}
|
||||
|
||||
// NewUnixSock initialises and returns a new UnixSock listener, listening on an address.
|
||||
func NewUnixSock(id, address string) *UnixSock {
|
||||
// NewUnixSock initializes and returns a new UnixSock listener, listening on an address.
|
||||
func NewUnixSock(config Config) *UnixSock {
|
||||
return &UnixSock{
|
||||
id: id,
|
||||
address: address,
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,41 +15,47 @@ import (
|
||||
|
||||
const testUnixAddr = "mochi.sock"
|
||||
|
||||
var (
|
||||
unixConfig = Config{ID: "t1", Address: testUnixAddr}
|
||||
)
|
||||
|
||||
func TestNewUnixSock(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
require.Equal(t, "t1", l.id)
|
||||
require.Equal(t, testUnixAddr, l.address)
|
||||
}
|
||||
|
||||
func TestUnixSockID(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
require.Equal(t, "t1", l.ID())
|
||||
}
|
||||
|
||||
func TestUnixSockAddress(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
require.Equal(t, testUnixAddr, l.Address())
|
||||
}
|
||||
|
||||
func TestUnixSockProtocol(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
require.Equal(t, "unix", l.Protocol())
|
||||
}
|
||||
|
||||
func TestUnixSockInit(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
err := l.Init(logger)
|
||||
l.Close(MockCloser)
|
||||
require.NoError(t, err)
|
||||
|
||||
l2 := NewUnixSock("t2", testUnixAddr)
|
||||
t2Config := unixConfig
|
||||
t2Config.ID = "t2"
|
||||
l2 := NewUnixSock(t2Config)
|
||||
err = l2.Init(logger)
|
||||
l2.Close(MockCloser)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestUnixSockServeAndClose(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -74,7 +80,7 @@ func TestUnixSockServeAndClose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnixSockEstablishThenEnd(t *testing.T) {
|
||||
l := NewUnixSock("t1", testUnixAddr)
|
||||
l := NewUnixSock(unixConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ import (
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const TypeWS = "ws"
|
||||
|
||||
var (
|
||||
// ErrInvalidMessage indicates that a message payload was not valid.
|
||||
ErrInvalidMessage = errors.New("message type not binary")
|
||||
@@ -29,7 +31,7 @@ type Websocket struct { // [MQTT-4.2.0-1]
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener
|
||||
address string // the network address to bind to
|
||||
config *Config // configuration values for the listener
|
||||
config Config // configuration values for the listener
|
||||
listen *http.Server // a http server for serving websocket connections
|
||||
log *slog.Logger // server logger
|
||||
establish EstablishFn // the server's establish connection handler
|
||||
@@ -37,15 +39,11 @@ type Websocket struct { // [MQTT-4.2.0-1]
|
||||
end uint32 // ensure the close methods are only called once
|
||||
}
|
||||
|
||||
// NewWebsocket initialises and returns a new Websocket listener, listening on an address.
|
||||
func NewWebsocket(id, address string, config *Config) *Websocket {
|
||||
if config == nil {
|
||||
config = new(Config)
|
||||
}
|
||||
|
||||
// NewWebsocket initializes and returns a new Websocket listener, listening on an address.
|
||||
func NewWebsocket(config Config) *Websocket {
|
||||
return &Websocket{
|
||||
id: id,
|
||||
address: address,
|
||||
id: config.ID,
|
||||
address: config.Address,
|
||||
config: config,
|
||||
upgrader: &websocket.Upgrader{
|
||||
Subprotocols: []string{"mqtt"},
|
||||
|
||||
@@ -17,35 +17,33 @@ import (
|
||||
)
|
||||
|
||||
func TestNewWebsocket(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
require.Equal(t, "t1", l.id)
|
||||
require.Equal(t, testAddr, l.address)
|
||||
}
|
||||
|
||||
func TestWebsocketID(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
require.Equal(t, "t1", l.ID())
|
||||
}
|
||||
|
||||
func TestWebsocketAddress(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
require.Equal(t, testAddr, l.Address())
|
||||
}
|
||||
|
||||
func TestWebsocketProtocol(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
require.Equal(t, "ws", l.Protocol())
|
||||
}
|
||||
|
||||
func TestWebsocketProtocolTLS(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
l := NewWebsocket(tlsConfig)
|
||||
require.Equal(t, "wss", l.Protocol())
|
||||
}
|
||||
|
||||
func TestWebsocketInit(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
require.Nil(t, l.listen)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
@@ -53,7 +51,7 @@ func TestWebsocketInit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebsocketServeAndClose(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
_ = l.Init(logger)
|
||||
|
||||
o := make(chan bool)
|
||||
@@ -74,9 +72,7 @@ func TestWebsocketServeAndClose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebsocketServeTLSAndClose(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
l := NewWebsocket(tlsConfig)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -96,9 +92,9 @@ func TestWebsocketServeTLSAndClose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebsocketFailedToServe(t *testing.T) {
|
||||
l := NewWebsocket("t1", "wrong_addr", &Config{
|
||||
TLSConfig: tlsConfigBasic,
|
||||
})
|
||||
config := tlsConfig
|
||||
config.Address = "wrong_addr"
|
||||
l := NewWebsocket(config)
|
||||
err := l.Init(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -117,7 +113,7 @@ func TestWebsocketFailedToServe(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebsocketUpgrade(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
_ = l.Init(logger)
|
||||
|
||||
e := make(chan bool)
|
||||
@@ -136,7 +132,7 @@ func TestWebsocketUpgrade(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWebsocketConnectionReads(t *testing.T) {
|
||||
l := NewWebsocket("t1", testAddr, nil)
|
||||
l := NewWebsocket(basicConfig)
|
||||
_ = l.Init(nil)
|
||||
|
||||
recv := make(chan []byte)
|
||||
|
||||
@@ -348,7 +348,7 @@ func (pk *Packet) ConnectEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -512,7 +512,8 @@ func (pk *Packet) ConnackEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -557,7 +558,7 @@ func (pk *Packet) DisconnectEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -628,7 +629,7 @@ func (pk *Packet) PublishEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len() + len(pk.Payload)
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
buf.Write(pk.Payload)
|
||||
|
||||
return nil
|
||||
@@ -719,7 +720,7 @@ func (pk *Packet) encodePubAckRelRecComp(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -858,7 +859,7 @@ func (pk *Packet) SubackEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -918,7 +919,7 @@ func (pk *Packet) SubscribeEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1009,13 +1010,12 @@ func (pk *Packet) UnsubackEncode(buf *bytes.Buffer) error {
|
||||
defer mempool.PutBuffer(pb)
|
||||
pk.Properties.Encode(pk.FixedHeader.Type, pk.Mods, pb, nb.Len())
|
||||
nb.Write(pb.Bytes())
|
||||
nb.Write(pk.ReasonCodes)
|
||||
}
|
||||
|
||||
nb.Write(pk.ReasonCodes)
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1071,7 +1071,7 @@ func (pk *Packet) UnsubscribeEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1133,7 +1133,7 @@ func (pk *Packet) AuthEncode(buf *bytes.Buffer) error {
|
||||
|
||||
pk.FixedHeader.Remaining = nb.Len()
|
||||
pk.FixedHeader.Encode(buf)
|
||||
_, _ = nb.WriteTo(buf)
|
||||
buf.Write(nb.Bytes())
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/mochi-mqtt/server/v2/mempool"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -199,7 +201,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
|
||||
return
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
buf := mempool.GetBuffer()
|
||||
defer mempool.PutBuffer(buf)
|
||||
if p.canEncode(pkt, PropPayloadFormat) && p.PayloadFormatFlag {
|
||||
buf.WriteByte(PropPayloadFormat)
|
||||
buf.WriteByte(p.PayloadFormat)
|
||||
@@ -230,7 +233,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
|
||||
for _, v := range p.SubscriptionIdentifier {
|
||||
if v > 0 {
|
||||
buf.WriteByte(PropSubscriptionIdentifier)
|
||||
encodeLength(&buf, int64(v))
|
||||
encodeLength(buf, int64(v))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -321,7 +324,8 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
|
||||
}
|
||||
|
||||
if !mods.DisallowProblemInfo && p.canEncode(pkt, PropUser) {
|
||||
pb := bytes.NewBuffer([]byte{})
|
||||
pb := mempool.GetBuffer()
|
||||
defer mempool.PutBuffer(pb)
|
||||
for _, v := range p.User {
|
||||
pb.WriteByte(PropUser)
|
||||
pb.Write(encodeString(v.Key))
|
||||
@@ -355,7 +359,7 @@ func (p *Properties) Encode(pkt byte, mods Mods, b *bytes.Buffer, n int) {
|
||||
}
|
||||
|
||||
encodeLength(b, int64(buf.Len()))
|
||||
_, _ = buf.WriteTo(b) // [MQTT-3.1.3-10]
|
||||
b.Write(buf.Bytes()) // [MQTT-3.1.3-10]
|
||||
}
|
||||
|
||||
// Decode decodes property bytes into a properties struct.
|
||||
|
||||
223
server.go
223
server.go
@@ -14,6 +14,7 @@ import (
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -26,91 +27,106 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "2.4.4" // the current server version.
|
||||
Version = "2.6.0" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
LocalListener = "local"
|
||||
InlineClientId = "inline"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultServerCapabilities defines the default features and capabilities provided by the server.
|
||||
DefaultServerCapabilities = &Capabilities{
|
||||
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
|
||||
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
|
||||
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
|
||||
MaximumQos: 2, // maximum qos value available to clients
|
||||
RetainAvailable: 1, // retain messages is available
|
||||
MaximumPacketSize: 0, // no maximum packet size
|
||||
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
|
||||
WildcardSubAvailable: 1, // wildcard subscriptions are available
|
||||
SubIDAvailable: 1, // subscription identifiers are available
|
||||
SharedSubAvailable: 1, // shared subscriptions are available
|
||||
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
|
||||
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
|
||||
}
|
||||
// Deprecated: Use NewDefaultServerCapabilities to avoid data race issue.
|
||||
DefaultServerCapabilities = NewDefaultServerCapabilities()
|
||||
|
||||
ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists
|
||||
ErrConnectionClosed = errors.New("connection not open") // connection is closed
|
||||
ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default
|
||||
ErrOptionsUnreadable = errors.New("unable to read options from bytes")
|
||||
)
|
||||
|
||||
// Capabilities indicates the capabilities and features provided by the server.
|
||||
type Capabilities struct {
|
||||
MaximumMessageExpiryInterval int64
|
||||
MaximumClientWritesPending int32
|
||||
MaximumSessionExpiryInterval uint32
|
||||
MaximumPacketSize uint32
|
||||
maximumPacketID uint32 // unexported, used for testing only
|
||||
ReceiveMaximum uint16
|
||||
TopicAliasMaximum uint16
|
||||
SharedSubAvailable byte
|
||||
MinimumProtocolVersion byte
|
||||
Compatibilities Compatibilities
|
||||
MaximumQos byte
|
||||
RetainAvailable byte
|
||||
WildcardSubAvailable byte
|
||||
SubIDAvailable byte
|
||||
MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` // maximum message expiry if message expiry is 0 or over
|
||||
MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"` // maximum number of pending message writes for a client
|
||||
MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` // maximum number of seconds to keep disconnected sessions
|
||||
MaximumPacketSize uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"` // maximum packet size, no limit if 0
|
||||
maximumPacketID uint32 // unexported, used for testing only
|
||||
ReceiveMaximum uint16 `yaml:"receive_maximum" json:"receive_maximum"` // maximum number of concurrent qos messages per client
|
||||
MaximumInflight uint16 `yaml:"maximum_inflight" json:"maximum_inflight"` // maximum number of qos > 0 messages can be stored, 0(=8192)-65535
|
||||
TopicAliasMaximum uint16 `yaml:"topic_alias_maximum" json:"topic_alias_maximum"` // maximum topic alias value
|
||||
SharedSubAvailable byte `yaml:"shared_sub_available" json:"shared_sub_available"` // support of shared subscriptions
|
||||
MinimumProtocolVersion byte `yaml:"minimum_protocol_version" json:"minimum_protocol_version"` // minimum supported mqtt version
|
||||
Compatibilities Compatibilities `yaml:"compatibilities" json:"compatibilities"` // version compatibilities the server provides
|
||||
MaximumQos byte `yaml:"maximum_qos" json:"maximum_qos"` // maximum qos value available to clients
|
||||
RetainAvailable byte `yaml:"retain_available" json:"retain_available"` // support of retain messages
|
||||
WildcardSubAvailable byte `yaml:"wildcard_sub_available" json:"wildcard_sub_available"` // support of wildcard subscriptions
|
||||
SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"` // support of subscription identifiers
|
||||
}
|
||||
|
||||
// NewDefaultServerCapabilities defines the default features and capabilities provided by the server.
|
||||
func NewDefaultServerCapabilities() *Capabilities {
|
||||
return &Capabilities{
|
||||
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
|
||||
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
|
||||
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
|
||||
MaximumPacketSize: 0, // no maximum packet size
|
||||
maximumPacketID: math.MaxUint16,
|
||||
ReceiveMaximum: 1024, // maximum number of concurrent qos messages per client
|
||||
MaximumInflight: 1024 * 8, // maximum number of qos > 0 messages can be stored
|
||||
TopicAliasMaximum: math.MaxUint16, // maximum topic alias value
|
||||
SharedSubAvailable: 1, // shared subscriptions are available
|
||||
MinimumProtocolVersion: 3, // minimum supported mqtt version (3.0.0)
|
||||
MaximumQos: 2, // maximum qos value available to clients
|
||||
RetainAvailable: 1, // retain messages is available
|
||||
WildcardSubAvailable: 1, // wildcard subscriptions are available
|
||||
SubIDAvailable: 1, // subscription identifiers are available
|
||||
}
|
||||
}
|
||||
|
||||
// Compatibilities provides flags for using compatibility modes.
|
||||
type Compatibilities struct {
|
||||
ObscureNotAuthorized bool // return unspecified errors instead of not authorized
|
||||
PassiveClientDisconnect bool // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation)
|
||||
AlwaysReturnResponseInfo bool // always return response info (useful for testing)
|
||||
RestoreSysInfoOnRestart bool // restore system info from store as if server never stopped
|
||||
NoInheritedPropertiesOnAck bool // don't allow inherited user properties on ack (paho - spec violation)
|
||||
ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` // return unspecified errors instead of not authorized
|
||||
PassiveClientDisconnect bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"` // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation)
|
||||
AlwaysReturnResponseInfo bool `yaml:"always_return_response_info" json:"always_return_response_info"` // always return response info (useful for testing)
|
||||
RestoreSysInfoOnRestart bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"` // restore system info from store as if server never stopped
|
||||
NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"` // don't allow inherited user properties on ack (paho - spec violation)
|
||||
}
|
||||
|
||||
// Options contains configurable options for the server.
|
||||
type Options struct {
|
||||
// Listeners specifies any listeners which should be dynamically added on serve. Used when setting listeners by config.
|
||||
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
|
||||
|
||||
// Hooks specifies any hooks which should be dynamically added on serve. Used when setting hooks by config.
|
||||
Hooks []HookLoadConfig `yaml:"hooks" json:"hooks"`
|
||||
|
||||
// Capabilities defines the server features and behaviour. If you only wish to modify
|
||||
// several of these values, set them explicitly - e.g.
|
||||
// server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024
|
||||
Capabilities *Capabilities
|
||||
Capabilities *Capabilities `yaml:"capabilities" json:"capabilities"`
|
||||
|
||||
// ClientNetWriteBufferSize specifies the size of the client *bufio.Writer write buffer.
|
||||
ClientNetWriteBufferSize int
|
||||
ClientNetWriteBufferSize int `yaml:"client_net_write_buffer_size" json:"client_net_write_buffer_size"`
|
||||
|
||||
// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
|
||||
ClientNetReadBufferSize int
|
||||
ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"`
|
||||
|
||||
// Logger specifies a custom configured implementation of zerolog to override
|
||||
// the servers default logger configuration. If you wish to change the log level,
|
||||
// of the default logger, you can do so by setting
|
||||
// server := mqtt.New(nil)
|
||||
// of the default logger, you can do so by setting:
|
||||
// server := mqtt.New(nil)
|
||||
// level := new(slog.LevelVar)
|
||||
// server.Slog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
// Level: level,
|
||||
// }))
|
||||
// level.Set(slog.LevelDebug)
|
||||
Logger *slog.Logger
|
||||
Logger *slog.Logger `yaml:"-" json:"-"`
|
||||
|
||||
// SysTopicResendInterval specifies the interval between $SYS topic updates in seconds.
|
||||
SysTopicResendInterval int64
|
||||
SysTopicResendInterval int64 `yaml:"sys_topic_resend_interval" json:"sys_topic_resend_interval"`
|
||||
|
||||
// Enable Inline client to allow direct subscribing and publishing from the parent codebase,
|
||||
// with negligible performance difference (disabled by default to prevent confusion in statistics).
|
||||
InlineClient bool
|
||||
InlineClient bool `yaml:"inline_client" json:"inline_client"`
|
||||
}
|
||||
|
||||
// Server is an MQTT broker server. It should be created with server.New()
|
||||
@@ -190,11 +206,15 @@ func New(opts *Options) *Server {
|
||||
// ensureDefaults ensures that the server starts with sane default values, if none are provided.
|
||||
func (o *Options) ensureDefaults() {
|
||||
if o.Capabilities == nil {
|
||||
o.Capabilities = DefaultServerCapabilities
|
||||
o.Capabilities = NewDefaultServerCapabilities()
|
||||
}
|
||||
|
||||
o.Capabilities.maximumPacketID = math.MaxUint16 // spec maximum is 65535
|
||||
|
||||
if o.Capabilities.MaximumInflight == 0 {
|
||||
o.Capabilities.MaximumInflight = 1024 * 8
|
||||
}
|
||||
|
||||
if o.SysTopicResendInterval == 0 {
|
||||
o.SysTopicResendInterval = defaultSysTopicInterval
|
||||
}
|
||||
@@ -250,6 +270,17 @@ func (s *Server) AddHook(hook Hook, config any) error {
|
||||
return s.hooks.Add(hook, config)
|
||||
}
|
||||
|
||||
// AddHooksFromConfig adds hooks to the server which were specified in the hooks config (usually from a config file).
|
||||
// New built-in hooks should be added to this list.
|
||||
func (s *Server) AddHooksFromConfig(hooks []HookLoadConfig) error {
|
||||
for _, h := range hooks {
|
||||
if err := s.AddHook(h.Hook, h.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddListener adds a new network listener to the server, for receiving incoming client connections.
|
||||
func (s *Server) AddListener(l listeners.Listener) error {
|
||||
if _, ok := s.Listeners.Get(l.ID()); ok {
|
||||
@@ -268,12 +299,55 @@ func (s *Server) AddListener(l listeners.Listener) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddListenersFromConfig adds listeners to the server which were specified in the listeners config (usually from a config file).
|
||||
// New built-in listeners should be added to this list.
|
||||
func (s *Server) AddListenersFromConfig(configs []listeners.Config) error {
|
||||
for _, conf := range configs {
|
||||
var l listeners.Listener
|
||||
switch strings.ToLower(conf.Type) {
|
||||
case listeners.TypeTCP:
|
||||
l = listeners.NewTCP(conf)
|
||||
case listeners.TypeWS:
|
||||
l = listeners.NewWebsocket(conf)
|
||||
case listeners.TypeUnix:
|
||||
l = listeners.NewUnixSock(conf)
|
||||
case listeners.TypeHealthCheck:
|
||||
l = listeners.NewHTTPHealthCheck(conf)
|
||||
case listeners.TypeSysInfo:
|
||||
l = listeners.NewHTTPStats(conf, s.Info)
|
||||
case listeners.TypeMock:
|
||||
l = listeners.NewMockListener(conf.ID, conf.Address)
|
||||
default:
|
||||
s.Log.Error("listener type unavailable by config", "listener", conf.Type)
|
||||
continue
|
||||
}
|
||||
if err := s.AddListener(l); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve starts the event loops responsible for establishing client connections
|
||||
// on all attached listeners, publishing the system topics, and starting all hooks.
|
||||
func (s *Server) Serve() error {
|
||||
s.Log.Info("mochi mqtt starting", "version", Version)
|
||||
defer s.Log.Info("mochi mqtt server started")
|
||||
|
||||
if len(s.Options.Listeners) > 0 {
|
||||
err := s.AddListenersFromConfig(s.Options.Listeners)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.Options.Hooks) > 0 {
|
||||
err := s.AddHooksFromConfig(s.Options.Hooks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if s.hooks.Provides(
|
||||
StoredClients,
|
||||
StoredInflightMessages,
|
||||
@@ -969,9 +1043,17 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
|
||||
}
|
||||
|
||||
if out.FixedHeader.Qos > 0 {
|
||||
if cl.State.Inflight.Len() >= int(s.Options.Capabilities.MaximumInflight) {
|
||||
// add hook?
|
||||
atomic.AddInt64(&s.Info.InflightDropped, 1)
|
||||
s.Log.Warn("client store quota reached", "client", cl.ID, "listener", cl.Net.Listener)
|
||||
return out, packets.ErrQuotaExceeded
|
||||
}
|
||||
|
||||
i, err := cl.NextPacketID() // [MQTT-4.3.2-1] [MQTT-4.3.3-1]
|
||||
if err != nil {
|
||||
s.hooks.OnPacketIDExhausted(cl, pk)
|
||||
atomic.AddInt64(&s.Info.InflightDropped, 1)
|
||||
s.Log.Warn("packet ids exhausted", "error", err, "client", cl.ID, "listener", cl.Net.Listener)
|
||||
return out, packets.ErrQuotaExceeded
|
||||
}
|
||||
@@ -1002,8 +1084,10 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
|
||||
default:
|
||||
atomic.AddInt64(&s.Info.MessagesDropped, 1)
|
||||
cl.ops.hooks.OnPublishDropped(cl, pk)
|
||||
cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
|
||||
cl.State.Inflight.IncreaseSendQuota()
|
||||
if out.FixedHeader.Qos > 0 {
|
||||
cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
|
||||
cl.State.Inflight.IncreaseSendQuota()
|
||||
}
|
||||
return out, packets.ErrPendingClientWritesExceeded
|
||||
}
|
||||
|
||||
@@ -1351,27 +1435,28 @@ func (s *Server) publishSysTopics() {
|
||||
atomic.StoreInt64(&s.Info.ClientsTotal, int64(s.Clients.Len()))
|
||||
atomic.StoreInt64(&s.Info.ClientsDisconnected, atomic.LoadInt64(&s.Info.ClientsTotal)-atomic.LoadInt64(&s.Info.ClientsConnected))
|
||||
|
||||
info := s.Info.Clone()
|
||||
topics := map[string]string{
|
||||
SysPrefix + "/broker/version": s.Info.Version,
|
||||
SysPrefix + "/broker/time": AtomicItoa(&s.Info.Time),
|
||||
SysPrefix + "/broker/uptime": AtomicItoa(&s.Info.Uptime),
|
||||
SysPrefix + "/broker/started": AtomicItoa(&s.Info.Started),
|
||||
SysPrefix + "/broker/load/bytes/received": AtomicItoa(&s.Info.BytesReceived),
|
||||
SysPrefix + "/broker/load/bytes/sent": AtomicItoa(&s.Info.BytesSent),
|
||||
SysPrefix + "/broker/clients/connected": AtomicItoa(&s.Info.ClientsConnected),
|
||||
SysPrefix + "/broker/clients/disconnected": AtomicItoa(&s.Info.ClientsDisconnected),
|
||||
SysPrefix + "/broker/clients/maximum": AtomicItoa(&s.Info.ClientsMaximum),
|
||||
SysPrefix + "/broker/clients/total": AtomicItoa(&s.Info.ClientsTotal),
|
||||
SysPrefix + "/broker/packets/received": AtomicItoa(&s.Info.PacketsReceived),
|
||||
SysPrefix + "/broker/packets/sent": AtomicItoa(&s.Info.PacketsSent),
|
||||
SysPrefix + "/broker/messages/received": AtomicItoa(&s.Info.MessagesReceived),
|
||||
SysPrefix + "/broker/messages/sent": AtomicItoa(&s.Info.MessagesSent),
|
||||
SysPrefix + "/broker/messages/dropped": AtomicItoa(&s.Info.MessagesDropped),
|
||||
SysPrefix + "/broker/messages/inflight": AtomicItoa(&s.Info.Inflight),
|
||||
SysPrefix + "/broker/retained": AtomicItoa(&s.Info.Retained),
|
||||
SysPrefix + "/broker/subscriptions": AtomicItoa(&s.Info.Subscriptions),
|
||||
SysPrefix + "/broker/system/memory": AtomicItoa(&s.Info.MemoryAlloc),
|
||||
SysPrefix + "/broker/system/threads": AtomicItoa(&s.Info.Threads),
|
||||
SysPrefix + "/broker/time": Int64toa(info.Time),
|
||||
SysPrefix + "/broker/uptime": Int64toa(info.Uptime),
|
||||
SysPrefix + "/broker/started": Int64toa(info.Started),
|
||||
SysPrefix + "/broker/load/bytes/received": Int64toa(info.BytesReceived),
|
||||
SysPrefix + "/broker/load/bytes/sent": Int64toa(info.BytesSent),
|
||||
SysPrefix + "/broker/clients/connected": Int64toa(info.ClientsConnected),
|
||||
SysPrefix + "/broker/clients/disconnected": Int64toa(info.ClientsDisconnected),
|
||||
SysPrefix + "/broker/clients/maximum": Int64toa(info.ClientsMaximum),
|
||||
SysPrefix + "/broker/clients/total": Int64toa(info.ClientsTotal),
|
||||
SysPrefix + "/broker/packets/received": Int64toa(info.PacketsReceived),
|
||||
SysPrefix + "/broker/packets/sent": Int64toa(info.PacketsSent),
|
||||
SysPrefix + "/broker/messages/received": Int64toa(info.MessagesReceived),
|
||||
SysPrefix + "/broker/messages/sent": Int64toa(info.MessagesSent),
|
||||
SysPrefix + "/broker/messages/dropped": Int64toa(info.MessagesDropped),
|
||||
SysPrefix + "/broker/messages/inflight": Int64toa(info.Inflight),
|
||||
SysPrefix + "/broker/retained": Int64toa(info.Retained),
|
||||
SysPrefix + "/broker/subscriptions": Int64toa(info.Subscriptions),
|
||||
SysPrefix + "/broker/system/memory": Int64toa(info.MemoryAlloc),
|
||||
SysPrefix + "/broker/system/threads": Int64toa(info.Threads),
|
||||
}
|
||||
|
||||
for topic, payload := range topics {
|
||||
@@ -1381,7 +1466,7 @@ func (s *Server) publishSysTopics() {
|
||||
s.publishToSubscribers(pk)
|
||||
}
|
||||
|
||||
s.hooks.OnSysInfoTick(s.Info)
|
||||
s.hooks.OnSysInfoTick(info)
|
||||
}
|
||||
|
||||
// Close attempts to gracefully shut down the server, all listeners, clients, and stores.
|
||||
@@ -1650,7 +1735,7 @@ func (s *Server) sendDelayedLWT(dt int64) {
|
||||
}
|
||||
}
|
||||
|
||||
// AtomicItoa converts an int64 point to a string.
|
||||
func AtomicItoa(ptr *int64) string {
|
||||
return strconv.FormatInt(atomic.LoadInt64(ptr), 10)
|
||||
// Int64toa converts an int64 to a string.
|
||||
func Int64toa(v int64) string {
|
||||
return strconv.FormatInt(v, 10)
|
||||
}
|
||||
|
||||
192
server_test.go
192
server_test.go
@@ -96,24 +96,24 @@ func (h *DelayHook) OnDisconnect(cl *Client, err error, expire bool) {
|
||||
}
|
||||
|
||||
func newServer() *Server {
|
||||
cc := *DefaultServerCapabilities
|
||||
cc := NewDefaultServerCapabilities()
|
||||
cc.MaximumMessageExpiryInterval = 0
|
||||
cc.ReceiveMaximum = 0
|
||||
s := New(&Options{
|
||||
Logger: logger,
|
||||
Capabilities: &cc,
|
||||
Capabilities: cc,
|
||||
})
|
||||
_ = s.AddHook(new(AllowHook), nil)
|
||||
return s
|
||||
}
|
||||
|
||||
func newServerWithInlineClient() *Server {
|
||||
cc := *DefaultServerCapabilities
|
||||
cc := NewDefaultServerCapabilities()
|
||||
cc.MaximumMessageExpiryInterval = 0
|
||||
cc.ReceiveMaximum = 0
|
||||
s := New(&Options{
|
||||
Logger: logger,
|
||||
Capabilities: &cc,
|
||||
Capabilities: cc,
|
||||
InlineClient: true,
|
||||
})
|
||||
_ = s.AddHook(new(AllowHook), nil)
|
||||
@@ -125,7 +125,7 @@ func TestOptionsSetDefaults(t *testing.T) {
|
||||
opts.ensureDefaults()
|
||||
|
||||
require.Equal(t, defaultSysTopicInterval, opts.SysTopicResendInterval)
|
||||
require.Equal(t, DefaultServerCapabilities, opts.Capabilities)
|
||||
require.Equal(t, NewDefaultServerCapabilities(), opts.Capabilities)
|
||||
|
||||
opts = new(Options)
|
||||
opts.ensureDefaults()
|
||||
@@ -220,6 +220,34 @@ func TestServerAddListener(t *testing.T) {
|
||||
require.Equal(t, ErrListenerIDExists, err)
|
||||
}
|
||||
|
||||
func TestServerAddHooksFromConfig(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
s.Log = logger
|
||||
|
||||
hooks := []HookLoadConfig{
|
||||
{Hook: new(modifiedHookBase)},
|
||||
}
|
||||
|
||||
err := s.AddHooksFromConfig(hooks)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestServerAddHooksFromConfigError(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
s.Log = logger
|
||||
|
||||
hooks := []HookLoadConfig{
|
||||
{Hook: new(modifiedHookBase), Config: map[string]interface{}{}},
|
||||
}
|
||||
|
||||
err := s.AddHooksFromConfig(hooks)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestServerAddListenerInitFailure(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
@@ -232,6 +260,60 @@ func TestServerAddListenerInitFailure(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestServerAddListenersFromConfig(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
s.Log = logger
|
||||
|
||||
lc := []listeners.Config{
|
||||
{Type: listeners.TypeTCP, ID: "tcp", Address: ":1883"},
|
||||
{Type: listeners.TypeWS, ID: "ws", Address: ":1882"},
|
||||
{Type: listeners.TypeHealthCheck, ID: "health", Address: ":1881"},
|
||||
{Type: listeners.TypeSysInfo, ID: "info", Address: ":1880"},
|
||||
{Type: listeners.TypeUnix, ID: "unix", Address: "mochi.sock"},
|
||||
{Type: listeners.TypeMock, ID: "mock", Address: "0"},
|
||||
{Type: "unknown", ID: "unknown"},
|
||||
}
|
||||
|
||||
err := s.AddListenersFromConfig(lc)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 6, s.Listeners.Len())
|
||||
|
||||
tcp, _ := s.Listeners.Get("tcp")
|
||||
require.Equal(t, "[::]:1883", tcp.Address())
|
||||
|
||||
ws, _ := s.Listeners.Get("ws")
|
||||
require.Equal(t, ":1882", ws.Address())
|
||||
|
||||
health, _ := s.Listeners.Get("health")
|
||||
require.Equal(t, ":1881", health.Address())
|
||||
|
||||
info, _ := s.Listeners.Get("info")
|
||||
require.Equal(t, ":1880", info.Address())
|
||||
|
||||
unix, _ := s.Listeners.Get("unix")
|
||||
require.Equal(t, "mochi.sock", unix.Address())
|
||||
|
||||
mock, _ := s.Listeners.Get("mock")
|
||||
require.Equal(t, "0", mock.Address())
|
||||
}
|
||||
|
||||
func TestServerAddListenersFromConfigError(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
s.Log = logger
|
||||
|
||||
lc := []listeners.Config{
|
||||
{Type: listeners.TypeTCP, ID: "tcp", Address: "x"},
|
||||
}
|
||||
|
||||
err := s.AddListenersFromConfig(lc)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, 0, s.Listeners.Len())
|
||||
}
|
||||
|
||||
func TestServerServe(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
@@ -253,6 +335,57 @@ func TestServerServe(t *testing.T) {
|
||||
require.Equal(t, true, listener.(*listeners.MockListener).IsServing())
|
||||
}
|
||||
|
||||
func TestServerServeFromConfig(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
|
||||
s.Options.Listeners = []listeners.Config{
|
||||
{Type: listeners.TypeMock, ID: "mock", Address: "0"},
|
||||
}
|
||||
|
||||
s.Options.Hooks = []HookLoadConfig{
|
||||
{Hook: new(modifiedHookBase)},
|
||||
}
|
||||
|
||||
err := s.Serve()
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
require.Equal(t, 1, s.Listeners.Len())
|
||||
listener, ok := s.Listeners.Get("mock")
|
||||
|
||||
require.Equal(t, true, ok)
|
||||
require.Equal(t, true, listener.(*listeners.MockListener).IsServing())
|
||||
}
|
||||
|
||||
func TestServerServeFromConfigListenerError(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
|
||||
s.Options.Listeners = []listeners.Config{
|
||||
{Type: listeners.TypeTCP, ID: "tcp", Address: "x"},
|
||||
}
|
||||
|
||||
err := s.Serve()
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestServerServeFromConfigHookError(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
require.NotNil(t, s)
|
||||
|
||||
s.Options.Hooks = []HookLoadConfig{
|
||||
{Hook: new(modifiedHookBase), Config: map[string]interface{}{}},
|
||||
}
|
||||
|
||||
err := s.Serve()
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestServerServeReadStoreFailure(t *testing.T) {
|
||||
s := newServer()
|
||||
defer s.Close()
|
||||
@@ -1529,10 +1662,10 @@ func TestServerProcessPublishACLCheckDeny(t *testing.T) {
|
||||
|
||||
for _, tx := range tt {
|
||||
t.Run(tx.name, func(t *testing.T) {
|
||||
cc := *DefaultServerCapabilities
|
||||
cc := NewDefaultServerCapabilities()
|
||||
s := New(&Options{
|
||||
Logger: logger,
|
||||
Capabilities: &cc,
|
||||
Capabilities: cc,
|
||||
})
|
||||
_ = s.AddHook(new(DenyHook), nil)
|
||||
_ = s.Serve()
|
||||
@@ -1907,6 +2040,7 @@ func TestPublishToClientSubscriptionDowngradeQos(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPublishToClientExceedClientWritesPending(t *testing.T) {
|
||||
var sendQuota uint16 = 5
|
||||
s := newServer()
|
||||
|
||||
_, w := net.Pipe()
|
||||
@@ -1917,9 +2051,12 @@ func TestPublishToClientExceedClientWritesPending(t *testing.T) {
|
||||
options: &Options{
|
||||
Capabilities: &Capabilities{
|
||||
MaximumClientWritesPending: 3,
|
||||
maximumPacketID: 10,
|
||||
},
|
||||
},
|
||||
})
|
||||
cl.Properties.Props.ReceiveMaximum = sendQuota
|
||||
cl.State.Inflight.ResetSendQuota(int32(cl.Properties.Props.ReceiveMaximum))
|
||||
|
||||
s.Clients.Add(cl)
|
||||
|
||||
@@ -1928,9 +2065,20 @@ func TestPublishToClientExceedClientWritesPending(t *testing.T) {
|
||||
atomic.AddInt32(&cl.State.outboundQty, 1)
|
||||
}
|
||||
|
||||
id, _ := cl.NextPacketID()
|
||||
cl.State.Inflight.Set(packets.Packet{PacketID: uint16(id)})
|
||||
cl.State.Inflight.DecreaseSendQuota()
|
||||
sendQuota--
|
||||
|
||||
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 2}, packets.Packet{})
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, packets.ErrPendingClientWritesExceeded, err)
|
||||
require.Equal(t, int32(sendQuota), atomic.LoadInt32(&cl.State.Inflight.sendQuota))
|
||||
|
||||
_, err = s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 2}, packets.Packet{FixedHeader: packets.FixedHeader{Qos: 1}})
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, packets.ErrPendingClientWritesExceeded, err)
|
||||
require.Equal(t, int32(sendQuota), atomic.LoadInt32(&cl.State.Inflight.sendQuota))
|
||||
}
|
||||
|
||||
func TestPublishToClientServerTopicAlias(t *testing.T) {
|
||||
@@ -1986,6 +2134,22 @@ func TestPublishToClientMqtt5RetainAsPublishedTrueLeverageNoConn(t *testing.T) {
|
||||
require.ErrorIs(t, err, packets.CodeDisconnect)
|
||||
}
|
||||
|
||||
func TestPublishToClientExceedMaximumInflight(t *testing.T) {
|
||||
const MaxInflight uint16 = 5
|
||||
s := newServer()
|
||||
cl, _, _ := newTestClient()
|
||||
s.Options.Capabilities.MaximumInflight = MaxInflight
|
||||
cl.ops.options.Capabilities.MaximumInflight = MaxInflight
|
||||
for i := uint16(0); i < MaxInflight; i++ {
|
||||
cl.State.Inflight.Set(packets.Packet{PacketID: i})
|
||||
}
|
||||
|
||||
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 1}, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, packets.ErrQuotaExceeded)
|
||||
require.Equal(t, int64(1), atomic.LoadInt64(&s.Info.InflightDropped))
|
||||
}
|
||||
|
||||
func TestPublishToClientExhaustedPacketID(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, _, _ := newTestClient()
|
||||
@@ -1996,6 +2160,7 @@ func TestPublishToClientExhaustedPacketID(t *testing.T) {
|
||||
_, err := s.publishToClient(cl, packets.Subscription{Filter: "a/b/c", Qos: 1}, *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, packets.ErrQuotaExceeded)
|
||||
require.Equal(t, int64(1), atomic.LoadInt64(&s.Info.InflightDropped))
|
||||
}
|
||||
|
||||
func TestPublishToClientACLNotAuthorized(t *testing.T) {
|
||||
@@ -3131,22 +3296,22 @@ func TestServerLoadClients(t *testing.T) {
|
||||
{ID: "v3-clean", ProtocolVersion: 4, Clean: true},
|
||||
{ID: "v3-not-clean", ProtocolVersion: 4, Clean: false},
|
||||
{
|
||||
ID: "v5-clean",
|
||||
ID: "v5-clean",
|
||||
ProtocolVersion: 5,
|
||||
Clean: true,
|
||||
Clean: true,
|
||||
Properties: storage.ClientProperties{
|
||||
SessionExpiryInterval: 10,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "v5-expire-interval-0",
|
||||
ID: "v5-expire-interval-0",
|
||||
ProtocolVersion: 5,
|
||||
Properties: storage.ClientProperties{
|
||||
SessionExpiryInterval: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "v5-expire-interval-not-0",
|
||||
ID: "v5-expire-interval-not-0",
|
||||
ProtocolVersion: 5,
|
||||
Properties: storage.ClientProperties{
|
||||
SessionExpiryInterval: 10,
|
||||
@@ -3388,10 +3553,9 @@ func TestLoadServerInfoRestoreOnRestart(t *testing.T) {
|
||||
require.Equal(t, int64(60), s.Info.BytesReceived)
|
||||
}
|
||||
|
||||
func TestAtomicItoa(t *testing.T) {
|
||||
func TestItoa(t *testing.T) {
|
||||
i := int64(22)
|
||||
ip := &i
|
||||
require.Equal(t, "22", AtomicItoa(ip))
|
||||
require.Equal(t, "22", Int64toa(i))
|
||||
}
|
||||
|
||||
func TestServerSubscribe(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user