mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-10-05 00:02:52 +08:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5966c7fe0d | ||
![]() |
b26e03a433 | ||
![]() |
6fc4027a78 | ||
![]() |
b9d2dfb824 | ||
![]() |
d5d9b02b28 | ||
![]() |
64ea905c41 | ||
![]() |
57997ef0c1 | ||
![]() |
21491d9b4e | ||
![]() |
cb217cd3b3 |
@@ -149,7 +149,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// 在标1883端口上创建一个 TCP 服务端。
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -414,7 +414,7 @@ if err != nil {
|
||||
|
||||
### 内联客户端 (Inline Client v2.4.0+支持)
|
||||
|
||||
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
|
||||
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。目前,内联客户端暂时还不支持共享订阅。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
|
||||
|
||||
```go
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
|
@@ -119,7 +119,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// Create a TCP listener on a standard port.
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
@@ -149,7 +149,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// Create a TCP listener on a standard port.
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -399,7 +399,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found
|
||||
If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`.
|
||||
|
||||
### Inline Client (v2.4.0+)
|
||||
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
|
||||
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
|
||||
```go
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
InlineClient: true,
|
||||
|
@@ -6,6 +6,8 @@ package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
"github.com/mochi-mqtt/server/v2/hooks/auth"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/debug"
|
||||
@@ -21,9 +23,27 @@ import (
|
||||
|
||||
// config defines the structure of configuration data to be parsed from a config source.
|
||||
type config struct {
|
||||
Options mqtt.Options
|
||||
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
|
||||
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
|
||||
Options mqtt.Options
|
||||
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
|
||||
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
|
||||
LoggingConfig LoggingConfig `yaml:"logging" json:"logging"`
|
||||
}
|
||||
|
||||
type LoggingConfig struct {
|
||||
Level string
|
||||
}
|
||||
|
||||
func (lc LoggingConfig) ToLogger() *slog.Logger {
|
||||
var level slog.Level
|
||||
if err := level.UnmarshalText([]byte(lc.Level)); err != nil {
|
||||
level = slog.LevelInfo
|
||||
}
|
||||
|
||||
leveler := new(slog.LevelVar)
|
||||
leveler.Set(level)
|
||||
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: leveler,
|
||||
}))
|
||||
}
|
||||
|
||||
// HookConfigs contains configurations to enable individual hooks.
|
||||
@@ -149,6 +169,7 @@ func FromBytes(b []byte) (*mqtt.Options, error) {
|
||||
o = c.Options
|
||||
o.Hooks = c.HookConfigs.ToHooks()
|
||||
o.Listeners = c.Listeners
|
||||
o.Logger = c.LoggingConfig.ToLogger()
|
||||
|
||||
return &o, nil
|
||||
}
|
||||
|
@@ -5,6 +5,8 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -60,7 +62,6 @@ options:
|
||||
}
|
||||
}
|
||||
`)
|
||||
|
||||
parsedOptions = mqtt.Options{
|
||||
Listeners: []listeners.Config{
|
||||
{
|
||||
@@ -81,6 +82,9 @@ options:
|
||||
RestoreSysInfoOnRestart: true,
|
||||
},
|
||||
},
|
||||
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: new(slog.LevelVar),
|
||||
})),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -176,7 +180,8 @@ func TestToHooksStorageBolt(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Storage: &HookStorageConfig{
|
||||
Bolt: &bolt.Options{
|
||||
Path: "bolt",
|
||||
Path: "bolt",
|
||||
Bucket: "mochi",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@@ -31,7 +31,8 @@
|
||||
"gc_discard_ratio": 0.5
|
||||
},
|
||||
"bolt": {
|
||||
"path": "bolt.db"
|
||||
"path": "bolt.db",
|
||||
"bucket": "mochi"
|
||||
},
|
||||
"redis": {
|
||||
"h_prefix": "mc",
|
||||
|
@@ -21,6 +21,7 @@ hooks:
|
||||
mode: "NoSync"
|
||||
bolt:
|
||||
path: bolt.db
|
||||
bucket: "mochi"
|
||||
redis:
|
||||
h_prefix: "mc"
|
||||
username: "mochi"
|
||||
@@ -65,3 +66,5 @@ options:
|
||||
always_return_response_info: false
|
||||
restore_sys_info_on_restart: false
|
||||
no_inherited_properties_on_ack: false
|
||||
logging:
|
||||
level: INFO
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package main
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package main
|
||||
|
||||
@@ -19,6 +19,9 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
boltPath := ".bolt"
|
||||
defer os.RemoveAll(boltPath) // remove the example db files at the end
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
done := make(chan bool, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
@@ -31,7 +34,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
err := server.AddHook(new(bolt.Hook), &bolt.Options{
|
||||
Path: "bolt.db",
|
||||
Path: boltPath,
|
||||
Options: &bbolt.Options{
|
||||
Timeout: 500 * time.Millisecond,
|
||||
},
|
||||
|
8
go.mod
8
go.mod
@@ -4,8 +4,6 @@ go 1.21
|
||||
|
||||
require (
|
||||
github.com/alicebob/miniredis/v2 v2.23.0
|
||||
github.com/asdine/storm v2.1.2+incompatible
|
||||
github.com/asdine/storm/v3 v3.2.1
|
||||
github.com/cockroachdb/pebble v1.1.0
|
||||
github.com/dgraph-io/badger/v4 v4.2.0
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
@@ -51,8 +49,8 @@ require (
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
|
||||
go.opencensus.io v0.22.5 // indirect
|
||||
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/net v0.23.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
)
|
||||
|
25
go.sum
25
go.sum
@@ -33,11 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
|
||||
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863 h1:BRrxwOZBolJN4gIwvZMJY1tzqBvQgpaZiQRuIDD40jM=
|
||||
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863/go.mod h1:D0JMgToj/WdxCgd30Kc1UcA9E+WdZoJqeVOuYW7iTBM=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -47,10 +44,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
|
||||
github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw=
|
||||
github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
|
||||
github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q=
|
||||
github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ=
|
||||
github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac=
|
||||
github.com/asdine/storm/v3 v3.2.1/go.mod h1:LEpXwGt4pIqrE/XcTvCnZHT5MgZCV6Ub9q7yQzOFWr0=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
@@ -147,7 +140,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
@@ -275,15 +267,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
|
||||
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
|
||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
|
||||
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
|
||||
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
|
||||
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
@@ -345,7 +334,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@@ -362,8 +350,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
|
||||
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@@ -421,8 +409,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
|
||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@@ -430,8 +418,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
@@ -502,7 +490,6 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
|
||||
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
|
||||
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
|
||||
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co, gsagula
|
||||
// SPDX-FileContributor: mochi-co, gsagula, werbenhu
|
||||
|
||||
package badger
|
||||
|
||||
@@ -490,23 +490,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
||||
}
|
||||
|
||||
// Errorf satisfies the badger interface for an error logger.
|
||||
func (h *Hook) Errorf(m string, v ...interface{}) {
|
||||
func (h *Hook) Errorf(m string, v ...any) {
|
||||
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
|
||||
}
|
||||
|
||||
// Warningf satisfies the badger interface for a warning logger.
|
||||
func (h *Hook) Warningf(m string, v ...interface{}) {
|
||||
func (h *Hook) Warningf(m string, v ...any) {
|
||||
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Infof satisfies the badger interface for an info logger.
|
||||
func (h *Hook) Infof(m string, v ...interface{}) {
|
||||
func (h *Hook) Infof(m string, v ...any) {
|
||||
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Debugf satisfies the badger interface for a debug logger.
|
||||
func (h *Hook) Debugf(m string, v ...interface{}) {
|
||||
func (h *Hook) Debugf(m string, v ...any) {
|
||||
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package badger
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
// Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
|
||||
package bolt
|
||||
@@ -14,23 +14,27 @@ import (
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage"
|
||||
"github.com/mochi-mqtt/server/v2/packets"
|
||||
"github.com/mochi-mqtt/server/v2/system"
|
||||
|
||||
sgob "github.com/asdine/storm/codec/gob"
|
||||
"github.com/asdine/storm/v3"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrBucketNotFound = errors.New("bucket not found")
|
||||
ErrKeyNotFound = errors.New("key not found")
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultDbFile is the default file path for the boltdb file.
|
||||
defaultDbFile = "bolt.db"
|
||||
defaultDbFile = ".bolt"
|
||||
|
||||
// defaultTimeout is the default time to hold a connection to the file.
|
||||
defaultTimeout = 250 * time.Millisecond
|
||||
|
||||
defaultBucket = "mochi"
|
||||
)
|
||||
|
||||
// clientKey returns a primary key for a client.
|
||||
func clientKey(cl *mqtt.Client) string {
|
||||
return cl.ID
|
||||
return storage.ClientKey + "_" + cl.ID
|
||||
}
|
||||
|
||||
// subscriptionKey returns a primary key for a subscription.
|
||||
@@ -56,6 +60,7 @@ func sysInfoKey() string {
|
||||
// Options contains configuration settings for the bolt instance.
|
||||
type Options struct {
|
||||
Options *bbolt.Options
|
||||
Bucket string `yaml:"bucket" json:"bucket"`
|
||||
Path string `yaml:"path" json:"path"`
|
||||
}
|
||||
|
||||
@@ -63,7 +68,7 @@ type Options struct {
|
||||
type Hook struct {
|
||||
mqtt.HookBase
|
||||
config *Options // options for configuring the boltdb instance.
|
||||
db *storm.DB // the boltdb instance.
|
||||
db *bbolt.DB // the boltdb instance.
|
||||
}
|
||||
|
||||
// ID returns the id of the hook.
|
||||
@@ -110,22 +115,32 @@ func (h *Hook) Init(config any) error {
|
||||
Timeout: defaultTimeout,
|
||||
}
|
||||
}
|
||||
if h.config.Path == "" {
|
||||
if len(h.config.Path) == 0 {
|
||||
h.config.Path = defaultDbFile
|
||||
}
|
||||
|
||||
if len(h.config.Bucket) == 0 {
|
||||
h.config.Bucket = defaultBucket
|
||||
}
|
||||
|
||||
var err error
|
||||
h.db, err = storm.Open(h.config.Path, storm.BoltOptions(0600, h.config.Options), storm.Codec(sgob.Codec))
|
||||
h.db, err = bbolt.Open(h.config.Path, 0600, h.config.Options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists([]byte(h.config.Bucket))
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop closes the boltdb instance.
|
||||
func (h *Hook) Stop() error {
|
||||
return h.db.Close()
|
||||
err := h.db.Close()
|
||||
h.db = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// OnSessionEstablished adds a client to the store when their session is established.
|
||||
@@ -147,7 +162,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
|
||||
|
||||
props := cl.Properties.Props.Copy(false)
|
||||
in := &storage.Client{
|
||||
ID: clientKey(cl),
|
||||
ID: cl.ID,
|
||||
T: storage.ClientKey,
|
||||
Remote: cl.Net.Remote,
|
||||
Listener: cl.Net.Listener,
|
||||
@@ -167,10 +182,8 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
|
||||
},
|
||||
Will: storage.ClientWill(cl.Properties.Will),
|
||||
}
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save client data", "error", err, "data", in)
|
||||
}
|
||||
|
||||
_ = h.setKv(clientKey(cl), in)
|
||||
}
|
||||
|
||||
// OnDisconnect removes a client from the store if they were using a clean session.
|
||||
@@ -188,10 +201,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
|
||||
return
|
||||
}
|
||||
|
||||
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
h.Log.Error("failed to delete client", "error", err, "id", clientKey(cl))
|
||||
}
|
||||
_ = h.delKv(clientKey(cl))
|
||||
}
|
||||
|
||||
// OnSubscribed adds one or more client subscriptions to the store.
|
||||
@@ -214,11 +224,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by
|
||||
RetainHandling: pk.Filters[i].RetainHandling,
|
||||
RetainAsPublished: pk.Filters[i].RetainAsPublished,
|
||||
}
|
||||
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save subscription data", "error", err, "client", cl.ID, "data", in)
|
||||
}
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,12 +236,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
|
||||
}
|
||||
|
||||
for i := 0; i < len(pk.Filters); i++ {
|
||||
err := h.db.DeleteStruct(&storage.Subscription{
|
||||
ID: subscriptionKey(cl, pk.Filters[i].Filter),
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete client", "error", err, "id", subscriptionKey(cl, pk.Filters[i].Filter))
|
||||
}
|
||||
_ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,12 +248,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
}
|
||||
|
||||
if r == -1 {
|
||||
err := h.db.DeleteStruct(&storage.Message{
|
||||
ID: retainedKey(pk.TopicName),
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(pk.TopicName))
|
||||
}
|
||||
_ = h.delKv(retainedKey(pk.TopicName))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -276,10 +272,8 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
User: props.User,
|
||||
},
|
||||
}
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save retained publish data", "error", err, "client", cl.ID, "data", in)
|
||||
}
|
||||
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
|
||||
// OnQosPublish adds or updates an inflight message in the store.
|
||||
@@ -311,10 +305,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
},
|
||||
}
|
||||
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save qos inflight data", "error", err, "client", cl.ID, "data", in)
|
||||
}
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
|
||||
// OnQosComplete removes a resolved inflight message from the store.
|
||||
@@ -324,12 +315,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
|
||||
return
|
||||
}
|
||||
|
||||
err := h.db.DeleteStruct(&storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete inflight data", "error", err, "id", inflightKey(cl, pk))
|
||||
}
|
||||
_ = h.delKv(inflightKey(cl, pk))
|
||||
}
|
||||
|
||||
// OnQosDropped removes a dropped inflight message from the store.
|
||||
@@ -354,10 +340,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) {
|
||||
Info: *sys,
|
||||
}
|
||||
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save $SYS data", "error", err, "data", in)
|
||||
}
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
|
||||
// OnRetainedExpired deletes expired retained messages from the store.
|
||||
@@ -366,10 +349,7 @@ func (h *Hook) OnRetainedExpired(filter string) {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.db.DeleteStruct(&storage.Message{ID: retainedKey(filter)}); err != nil {
|
||||
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(filter))
|
||||
}
|
||||
_ = h.delKv(retainedKey(filter))
|
||||
}
|
||||
|
||||
// OnClientExpired deleted expired clients from the store.
|
||||
@@ -378,25 +358,24 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
}
|
||||
|
||||
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
h.Log.Error("failed to delete expired client", "error", err, "id", clientKey(cl))
|
||||
}
|
||||
_ = h.delKv(clientKey(cl))
|
||||
}
|
||||
|
||||
// StoredClients returns all stored clients from the store.
|
||||
func (h *Hook) StoredClients() (v []storage.Client, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.ClientKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.iterKv(storage.ClientKey, func(value []byte) error {
|
||||
obj := storage.Client{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return v, nil
|
||||
}
|
||||
|
||||
@@ -404,58 +383,142 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) {
|
||||
func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.SubscriptionKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
v = make([]storage.Subscription, 0)
|
||||
err = h.iterKv(storage.SubscriptionKey, func(value []byte) error {
|
||||
obj := storage.Subscription{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// StoredRetainedMessages returns all stored retained messages from the store.
|
||||
func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.RetainedKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
v = make([]storage.Message, 0)
|
||||
err = h.iterKv(storage.RetainedKey, func(value []byte) error {
|
||||
obj := storage.Message{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// StoredInflightMessages returns all stored inflight messages from the store.
|
||||
func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.InflightKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
v = make([]storage.Message, 0)
|
||||
err = h.iterKv(storage.InflightKey, func(value []byte) error {
|
||||
obj := storage.Message{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// StoredSysInfo returns the system info from the store.
|
||||
func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.One("ID", storage.SysInfoKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
err = h.getKv(storage.SysInfoKey, &v)
|
||||
if err != nil && !errors.Is(err, ErrKeyNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// setKv stores a key-value pair in the database.
|
||||
func (h *Hook) setKv(k string, v storage.Serializable) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
data, _ := v.MarshalBinary()
|
||||
err := bucket.Put([]byte(k), data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to upsert data", "error", err, "key", k)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// delKv deletes a key-value pair from the database.
|
||||
func (h *Hook) delKv(k string) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
err := bucket.Delete([]byte(k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete data", "error", err, "key", k)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// getKv retrieves the value associated with a key from the database.
|
||||
func (h *Hook) getKv(k string, v storage.Serializable) error {
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
|
||||
value := bucket.Get([]byte(k))
|
||||
if value == nil {
|
||||
return ErrKeyNotFound
|
||||
}
|
||||
|
||||
return v.UnmarshalBinary(value)
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to get data", "error", err, "key", k)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// iterKv iterates over key-value pairs with keys having the specified prefix in the database.
|
||||
func (h *Hook) iterKv(prefix string, visit func([]byte) error) error {
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
|
||||
c := bucket.Cursor()
|
||||
for k, v := c.Seek([]byte(prefix)); k != nil && string(k[:len(prefix)]) == prefix; k, v = c.Next() {
|
||||
if err := visit(v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.Log.Error("failed to iter data", "error", err, "prefix", prefix)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@@ -1,10 +1,11 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package bolt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log/slog"
|
||||
"os"
|
||||
"testing"
|
||||
@@ -15,7 +16,6 @@ import (
|
||||
"github.com/mochi-mqtt/server/v2/packets"
|
||||
"github.com/mochi-mqtt/server/v2/system"
|
||||
|
||||
"github.com/asdine/storm/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -45,7 +45,7 @@ func teardown(t *testing.T, path string, h *Hook) {
|
||||
|
||||
func TestClientKey(t *testing.T) {
|
||||
k := clientKey(&mqtt.Client{ID: "cl1"})
|
||||
require.Equal(t, "cl1", k)
|
||||
require.Equal(t, "CL_cl1", k)
|
||||
}
|
||||
|
||||
func TestSubscriptionKey(t *testing.T) {
|
||||
@@ -130,7 +130,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
|
||||
h.OnSessionEstablished(client, packets.Packet{})
|
||||
|
||||
r := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r)
|
||||
err = h.getKv(clientKey(client), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, client.ID, r.ID)
|
||||
require.Equal(t, client.Net.Remote, r.Remote)
|
||||
@@ -141,15 +141,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
|
||||
|
||||
h.OnDisconnect(client, nil, false)
|
||||
r2 := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r2)
|
||||
err = h.getKv(clientKey(client), r2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, client.ID, r.ID)
|
||||
|
||||
h.OnDisconnect(client, nil, true)
|
||||
r3 := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r3)
|
||||
err = h.getKv(clientKey(client), r3)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storm.ErrNotFound, err)
|
||||
require.ErrorIs(t, ErrKeyNotFound, err)
|
||||
require.Empty(t, r3.ID)
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ func TestOnWillSent(t *testing.T) {
|
||||
h.OnWillSent(c1, packets.Packet{})
|
||||
|
||||
r := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r)
|
||||
err = h.getKv(clientKey(client), r)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint32(1), r.Will.Flag)
|
||||
@@ -197,18 +197,18 @@ func TestOnClientExpired(t *testing.T) {
|
||||
cl := &mqtt.Client{ID: "cl1"}
|
||||
clientKey := clientKey(cl)
|
||||
|
||||
err = h.db.Save(&storage.Client{ID: cl.ID})
|
||||
err = h.setKv(clientKey, &storage.Client{ID: cl.ID})
|
||||
require.NoError(t, err)
|
||||
|
||||
r := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey, r)
|
||||
err = h.getKv(clientKey, r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cl.ID, r.ID)
|
||||
|
||||
h.OnClientExpired(cl)
|
||||
err = h.db.One("ID", clientKey, r)
|
||||
err = h.getKv(clientKey, r)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storm.ErrNotFound, err)
|
||||
require.ErrorIs(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnClientExpiredClosedDB(t *testing.T) {
|
||||
@@ -274,16 +274,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) {
|
||||
h.OnSubscribed(client, pkf, []byte{0})
|
||||
r := new(storage.Subscription)
|
||||
|
||||
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, client.ID, r.Client)
|
||||
require.Equal(t, pkf.Filters[0].Filter, r.Filter)
|
||||
require.Equal(t, byte(0), r.Qos)
|
||||
|
||||
h.OnUnsubscribed(client, pkf)
|
||||
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnSubscribedNoDB(t *testing.T) {
|
||||
@@ -334,21 +334,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) {
|
||||
h.OnRetainMessage(client, pk, 1)
|
||||
|
||||
r := new(storage.Message)
|
||||
err = h.db.One("ID", retainedKey(pk.TopicName), r)
|
||||
err = h.getKv(retainedKey(pk.TopicName), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, pk.TopicName, r.TopicName)
|
||||
require.Equal(t, pk.Payload, r.Payload)
|
||||
|
||||
h.OnRetainMessage(client, pk, -1)
|
||||
err = h.db.One("ID", retainedKey(pk.TopicName), r)
|
||||
err = h.getKv(retainedKey(pk.TopicName), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
|
||||
// coverage: delete deleted
|
||||
h.OnRetainMessage(client, pk, -1)
|
||||
err = h.db.One("ID", retainedKey(pk.TopicName), r)
|
||||
err = h.getKv(retainedKey(pk.TopicName), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnRetainedExpired(t *testing.T) {
|
||||
@@ -364,18 +364,18 @@ func TestOnRetainedExpired(t *testing.T) {
|
||||
TopicName: "a/b/c",
|
||||
}
|
||||
|
||||
err = h.db.Save(m)
|
||||
err = h.setKv(m.ID, m)
|
||||
require.NoError(t, err)
|
||||
|
||||
r := new(storage.Message)
|
||||
err = h.db.One("ID", m.ID, r)
|
||||
err = h.getKv(m.ID, r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, m.TopicName, r.TopicName)
|
||||
|
||||
h.OnRetainedExpired(m.TopicName)
|
||||
err = h.db.One("ID", m.ID, r)
|
||||
err = h.getKv(m.ID, r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnRetainedExpiredClosedDB(t *testing.T) {
|
||||
@@ -427,7 +427,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
|
||||
h.OnQosPublish(client, pk, time.Now().Unix(), 0)
|
||||
|
||||
r := new(storage.Message)
|
||||
err = h.db.One("ID", inflightKey(client, pk), r)
|
||||
err = h.getKv(inflightKey(client, pk), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, pk.TopicName, r.TopicName)
|
||||
require.Equal(t, pk.Payload, r.Payload)
|
||||
@@ -438,9 +438,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
|
||||
|
||||
// OnQosDropped is a passthrough to OnQosComplete here
|
||||
h.OnQosDropped(client, pk)
|
||||
err = h.db.One("ID", inflightKey(client, pk), r)
|
||||
err = h.getKv(inflightKey(client, pk), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnQosPublishNoDB(t *testing.T) {
|
||||
@@ -494,7 +494,7 @@ func TestOnSysInfoTick(t *testing.T) {
|
||||
h.OnSysInfoTick(info)
|
||||
|
||||
r := new(storage.SystemInfo)
|
||||
err = h.db.One("ID", storage.SysInfoKey, r)
|
||||
err = h.getKv(storage.SysInfoKey, r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, info.Version, r.Version)
|
||||
require.Equal(t, info.BytesReceived, r.BytesReceived)
|
||||
@@ -524,13 +524,13 @@ func TestStoredClients(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with clients
|
||||
err = h.db.Save(&storage.Client{ID: "cl1", T: storage.ClientKey})
|
||||
err = h.setKv(storage.ClientKey+"_"+"cl1", &storage.Client{ID: "cl1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Client{ID: "cl2", T: storage.ClientKey})
|
||||
err = h.setKv(storage.ClientKey+"_"+"cl2", &storage.Client{ID: "cl2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Client{ID: "cl3", T: storage.ClientKey})
|
||||
err = h.setKv(storage.ClientKey+"_"+"cl3", &storage.Client{ID: "cl3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredClients()
|
||||
@@ -546,7 +546,7 @@ func TestStoredClientsNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredClients()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredClientsClosedDB(t *testing.T) {
|
||||
@@ -557,7 +557,7 @@ func TestStoredClientsClosedDB(t *testing.T) {
|
||||
teardown(t, h.config.Path, h)
|
||||
v, err := h.StoredClients()
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSubscriptions(t *testing.T) {
|
||||
@@ -568,13 +568,13 @@ func TestStoredSubscriptions(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with subscriptions
|
||||
err = h.db.Save(&storage.Subscription{ID: "sub1", T: storage.SubscriptionKey})
|
||||
err = h.setKv(storage.SubscriptionKey+"_"+"sub1", &storage.Subscription{ID: "sub1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Subscription{ID: "sub2", T: storage.SubscriptionKey})
|
||||
err = h.setKv(storage.SubscriptionKey+"_"+"sub2", &storage.Subscription{ID: "sub2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Subscription{ID: "sub3", T: storage.SubscriptionKey})
|
||||
err = h.setKv(storage.SubscriptionKey+"_"+"sub3", &storage.Subscription{ID: "sub3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredSubscriptions()
|
||||
@@ -590,7 +590,7 @@ func TestStoredSubscriptionsNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredSubscriptions()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSubscriptionsClosedDB(t *testing.T) {
|
||||
@@ -601,7 +601,7 @@ func TestStoredSubscriptionsClosedDB(t *testing.T) {
|
||||
teardown(t, h.config.Path, h)
|
||||
v, err := h.StoredSubscriptions()
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredRetainedMessages(t *testing.T) {
|
||||
@@ -612,16 +612,16 @@ func TestStoredRetainedMessages(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with messages
|
||||
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "m2", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m2", &storage.Message{ID: "m2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "m3", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m3", &storage.Message{ID: "m3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredRetainedMessages()
|
||||
@@ -637,7 +637,7 @@ func TestStoredRetainedMessagesNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredRetainedMessages()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredRetainedMessagesClosedDB(t *testing.T) {
|
||||
@@ -659,16 +659,16 @@ func TestStoredInflightMessages(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with messages
|
||||
err = h.db.Save(&storage.Message{ID: "i1", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i1", &storage.Message{ID: "i1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "i2", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i2", &storage.Message{ID: "i2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredInflightMessages()
|
||||
@@ -684,7 +684,7 @@ func TestStoredInflightMessagesNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredInflightMessages()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredInflightMessagesClosedDB(t *testing.T) {
|
||||
@@ -695,7 +695,7 @@ func TestStoredInflightMessagesClosedDB(t *testing.T) {
|
||||
teardown(t, h.config.Path, h)
|
||||
v, err := h.StoredInflightMessages()
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSysInfo(t *testing.T) {
|
||||
@@ -706,7 +706,7 @@ func TestStoredSysInfo(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with sys info
|
||||
err = h.db.Save(&storage.SystemInfo{
|
||||
err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{
|
||||
ID: storage.SysInfoKey,
|
||||
Info: system.Info{
|
||||
Version: "2.0.0",
|
||||
@@ -725,7 +725,7 @@ func TestStoredSysInfoNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredSysInfo()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSysInfoClosedDB(t *testing.T) {
|
||||
@@ -738,3 +738,54 @@ func TestStoredSysInfoClosedDB(t *testing.T) {
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGetSetDelKv(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
err := h.Init(nil)
|
||||
defer teardown(t, h.config.Path, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.setKv("testId", &storage.Client{ID: "testId"})
|
||||
require.NoError(t, err)
|
||||
|
||||
var obj storage.Client
|
||||
err = h.getKv("testId", &obj)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.delKv("testId")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.getKv("testId", &obj)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestIterKv(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
|
||||
err := h.Init(nil)
|
||||
defer teardown(t, h.config.Path, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
h.setKv("prefix_a_1", &storage.Client{ID: "1"})
|
||||
h.setKv("prefix_a_2", &storage.Client{ID: "2"})
|
||||
h.setKv("prefix_b_2", &storage.Client{ID: "3"})
|
||||
|
||||
var clients []storage.Client
|
||||
err = h.iterKv("prefix_a", func(data []byte) error {
|
||||
var item storage.Client
|
||||
item.UnmarshalBinary(data)
|
||||
clients = append(clients, item)
|
||||
return nil
|
||||
})
|
||||
require.Equal(t, 2, len(clients))
|
||||
require.NoError(t, err)
|
||||
|
||||
visitErr := errors.New("iter visit error")
|
||||
err = h.iterKv("prefix_b", func(data []byte) error {
|
||||
return visitErr
|
||||
})
|
||||
require.ErrorIs(t, visitErr, err)
|
||||
}
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co, gsagula
|
||||
// SPDX-FileContributor: werbenhu
|
||||
|
||||
package pebble
|
||||
|
||||
@@ -467,23 +467,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
||||
}
|
||||
|
||||
// Errorf satisfies the pebble interface for an error logger.
|
||||
func (h *Hook) Errorf(m string, v ...interface{}) {
|
||||
func (h *Hook) Errorf(m string, v ...any) {
|
||||
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
|
||||
}
|
||||
|
||||
// Warningf satisfies the pebble interface for a warning logger.
|
||||
func (h *Hook) Warningf(m string, v ...interface{}) {
|
||||
func (h *Hook) Warningf(m string, v ...any) {
|
||||
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Infof satisfies the pebble interface for an info logger.
|
||||
func (h *Hook) Infof(m string, v ...interface{}) {
|
||||
func (h *Hook) Infof(m string, v ...any) {
|
||||
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Debugf satisfies the pebble interface for a debug logger.
|
||||
func (h *Hook) Debugf(m string, v ...interface{}) {
|
||||
func (h *Hook) Debugf(m string, v ...any) {
|
||||
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: werbenhu
|
||||
|
||||
package pebble
|
||||
|
||||
|
@@ -201,6 +201,7 @@ const (
|
||||
TDisconnect
|
||||
TDisconnectTakeover
|
||||
TDisconnectMqtt5
|
||||
TDisconnectMqtt5DisconnectWithWillMessage
|
||||
TDisconnectSecondConnect
|
||||
TDisconnectReceiveMaximum
|
||||
TDisconnectDropProperties
|
||||
@@ -3781,6 +3782,31 @@ var TPacketData = map[byte]TPacketCases{
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Case: TDisconnectMqtt5DisconnectWithWillMessage,
|
||||
Desc: "mqtt5 disconnect with will message",
|
||||
Primary: true,
|
||||
RawBytes: append([]byte{
|
||||
Disconnect << 4, 38, // fixed header
|
||||
CodeDisconnectWillMessage.Code, // Reason Code
|
||||
36, // Properties Length
|
||||
17, 0, 0, 0, 120, // Session Expiry Interval (17)
|
||||
31, 0, 28, // Reason String (31)
|
||||
}, []byte(CodeDisconnectWillMessage.Reason)...),
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
FixedHeader: FixedHeader{
|
||||
Type: Disconnect,
|
||||
Remaining: 22,
|
||||
},
|
||||
ReasonCode: CodeDisconnectWillMessage.Code,
|
||||
Properties: Properties{
|
||||
ReasonString: CodeDisconnectWillMessage.Reason,
|
||||
SessionExpiryInterval: 120,
|
||||
SessionExpiryIntervalFlag: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Case: TDisconnectSecondConnect,
|
||||
Desc: "second connect packet mqtt5",
|
||||
|
20
server.go
20
server.go
@@ -27,7 +27,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "2.6.1" // the current server version.
|
||||
Version = "2.6.3" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
LocalListener = "local"
|
||||
InlineClientId = "inline"
|
||||
@@ -45,6 +45,7 @@ var (
|
||||
|
||||
// Capabilities indicates the capabilities and features provided by the server.
|
||||
type Capabilities struct {
|
||||
MaximumClients int64 `yaml:"maximum_clients" json:"maximum_clients"` // maximum number of connected clients
|
||||
MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` // maximum message expiry if message expiry is 0 or over
|
||||
MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"` // maximum number of pending message writes for a client
|
||||
MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` // maximum number of seconds to keep disconnected sessions
|
||||
@@ -65,6 +66,7 @@ type Capabilities struct {
|
||||
// NewDefaultServerCapabilities defines the default features and capabilities provided by the server.
|
||||
func NewDefaultServerCapabilities() *Capabilities {
|
||||
return &Capabilities{
|
||||
MaximumClients: math.MaxInt64, // maximum number of connected clients
|
||||
MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over
|
||||
MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client
|
||||
MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions
|
||||
@@ -110,7 +112,7 @@ type Options struct {
|
||||
// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
|
||||
ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"`
|
||||
|
||||
// Logger specifies a custom configured implementation of zerolog to override
|
||||
// Logger specifies a custom configured implementation of log/slog to override
|
||||
// the servers default logger configuration. If you wish to change the log level,
|
||||
// of the default logger, you can do so by setting:
|
||||
// server := mqtt.New(nil)
|
||||
@@ -414,6 +416,16 @@ func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
}
|
||||
|
||||
cl.ParseConnect(listener, pk)
|
||||
if atomic.LoadInt64(&s.Info.ClientsConnected) >= s.Options.Capabilities.MaximumClients {
|
||||
if cl.Properties.ProtocolVersion < 5 {
|
||||
s.SendConnack(cl, packets.ErrServerUnavailable, false, nil)
|
||||
} else {
|
||||
s.SendConnack(cl, packets.ErrServerBusy, false, nil)
|
||||
}
|
||||
|
||||
return packets.ErrServerBusy
|
||||
}
|
||||
|
||||
code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2]
|
||||
if code != packets.CodeSuccess {
|
||||
if err := s.SendConnack(cl, code, false, nil); err != nil {
|
||||
@@ -1381,6 +1393,10 @@ func (s *Server) processDisconnect(cl *Client, pk packets.Packet) error {
|
||||
cl.Properties.Props.SessionExpiryIntervalFlag = true
|
||||
}
|
||||
|
||||
if pk.ReasonCode == packets.CodeDisconnectWillMessage.Code { // [MQTT-3.1.2.5] Non-normative comment
|
||||
return packets.CodeDisconnectWillMessage
|
||||
}
|
||||
|
||||
s.loop.willDelayed.Delete(cl.ID) // [MQTT-3.1.3-9] [MQTT-3.1.2-8]
|
||||
cl.Stop(packets.CodeDisconnect) // [MQTT-3.14.4-2]
|
||||
|
||||
|
@@ -944,6 +944,41 @@ func TestServerEstablishConnectionInvalidConnect(t *testing.T) {
|
||||
_ = r.Close()
|
||||
}
|
||||
|
||||
func TestEstablishConnectionMaximumClientsReached(t *testing.T) {
|
||||
cc := NewDefaultServerCapabilities()
|
||||
cc.MaximumClients = 0
|
||||
s := New(&Options{
|
||||
Logger: logger,
|
||||
Capabilities: cc,
|
||||
})
|
||||
_ = s.AddHook(new(AllowHook), nil)
|
||||
defer s.Close()
|
||||
|
||||
r, w := net.Pipe()
|
||||
o := make(chan error)
|
||||
go func() {
|
||||
o <- s.EstablishConnection("tcp", r)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
_, _ = w.Write(packets.TPacketData[packets.Connect].Get(packets.TConnectClean).RawBytes)
|
||||
}()
|
||||
|
||||
// receive the connack
|
||||
recv := make(chan []byte)
|
||||
go func() {
|
||||
buf, err := io.ReadAll(w)
|
||||
require.NoError(t, err)
|
||||
recv <- buf
|
||||
}()
|
||||
|
||||
err := <-o
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, packets.ErrServerBusy)
|
||||
|
||||
_ = r.Close()
|
||||
}
|
||||
|
||||
// See https://github.com/mochi-mqtt/server/issues/178
|
||||
func TestServerEstablishConnectionZeroByteUsernameIsValid(t *testing.T) {
|
||||
s := newServer()
|
||||
@@ -2223,7 +2258,7 @@ func TestPublishToSubscribersExhaustedSendQuota(t *testing.T) {
|
||||
require.True(t, subbed)
|
||||
|
||||
// coverage: subscriber publish errors are non-returnable
|
||||
// can we hook into zerolog ?
|
||||
// can we hook into log/slog ?
|
||||
_ = r.Close()
|
||||
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
|
||||
pkx.PacketID = 0
|
||||
@@ -2244,7 +2279,7 @@ func TestPublishToSubscribersExhaustedPacketIDs(t *testing.T) {
|
||||
require.True(t, subbed)
|
||||
|
||||
// coverage: subscriber publish errors are non-returnable
|
||||
// can we hook into zerolog ?
|
||||
// can we hook into log/slog ?
|
||||
_ = r.Close()
|
||||
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
|
||||
pkx.PacketID = 0
|
||||
@@ -2261,7 +2296,7 @@ func TestPublishToSubscribersNoConnection(t *testing.T) {
|
||||
require.True(t, subbed)
|
||||
|
||||
// coverage: subscriber publish errors are non-returnable
|
||||
// can we hook into zerolog ?
|
||||
// can we hook into log/slog ?
|
||||
_ = r.Close()
|
||||
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
|
||||
time.Sleep(time.Millisecond)
|
||||
@@ -3103,6 +3138,22 @@ func TestServerProcessPacketDisconnectNonZeroExpiryViolation(t *testing.T) {
|
||||
require.ErrorIs(t, err, packets.ErrProtocolViolationZeroNonZeroExpiry)
|
||||
}
|
||||
|
||||
func TestServerProcessPacketDisconnectDisconnectWithWillMessage(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, _, _ := newTestClient()
|
||||
cl.Properties.Props.SessionExpiryInterval = 30
|
||||
cl.Properties.ProtocolVersion = 5
|
||||
|
||||
s.loop.willDelayed.Add(cl.ID, packets.Packet{TopicName: "a/b/c", Payload: []byte("hello")})
|
||||
require.Equal(t, 1, s.loop.willDelayed.Len())
|
||||
|
||||
err := s.processPacket(cl, *packets.TPacketData[packets.Disconnect].Get(packets.TDisconnectMqtt5DisconnectWithWillMessage).Packet)
|
||||
require.Error(t, err)
|
||||
|
||||
require.Equal(t, 1, s.loop.willDelayed.Len())
|
||||
require.False(t, cl.Closed())
|
||||
}
|
||||
|
||||
func TestServerProcessPacketAuth(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, r, w := newTestClient()
|
||||
|
@@ -514,7 +514,7 @@ func (x *TopicsIndex) seek(filter string, d int) *particle {
|
||||
|
||||
// trim removes empty filter particles from the index.
|
||||
func (x *TopicsIndex) trim(n *particle) {
|
||||
for n.parent != nil && n.retainPath == "" && n.particles.len()+n.subscriptions.Len()+n.shared.Len() == 0 {
|
||||
for n.parent != nil && n.retainPath == "" && n.particles.len()+n.subscriptions.Len()+n.shared.Len()+n.inlineSubscriptions.Len() == 0 {
|
||||
key := n.key
|
||||
n = n.parent
|
||||
n.particles.delete(key)
|
||||
|
Reference in New Issue
Block a user