Compare commits

...

20 Commits

Author SHA1 Message Date
JB
f52990691f Merge branch 'main' into main 2025-01-30 18:06:57 +00:00
dependabot[bot]
9441e92595 Bump golang.org/x/net from 0.23.0 to 0.33.0 (#450)
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.23.0 to 0.33.0.
- [Commits](https://github.com/golang/net/compare/v0.23.0...v0.33.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 18:05:35 +00:00
thedevop
bfaf78332b Suppress OnPublish CodeSuccessIgnore error log (#438)
* OnPublish CodeSuccessIgnore, use debug instead of error log

* Suppress OnPublish CodeSuccessIgnore error log

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 18:02:25 +00:00
thedevop
043906876e Add cl.IsTakenOver and switch cl.isTakenOver to atomic.Bool (#446)
* OnPublish CodeSuccessIgnore, use debug instead of error log

* Suppress OnPublish CodeSuccessIgnore error log

* Add cl.IsTakenOver and switch to use atomic.Bool

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2025-01-30 17:58:03 +00:00
dependabot[bot]
dcb814cedf Bump github.com/golang/glog from 1.0.0 to 1.2.4 (#449)
Bumps [github.com/golang/glog](https://github.com/golang/glog) from 1.0.0 to 1.2.4.
- [Release notes](https://github.com/golang/glog/releases)
- [Commits](https://github.com/golang/glog/compare/v1.0.0...v1.2.4)

---
updated-dependencies:
- dependency-name: github.com/golang/glog
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-01-30 17:54:44 +00:00
mochi-co
8f52b891d5 Update server version 2024-10-23 21:06:28 +01:00
s9-hfe
47536b77f2 Fix QoS 1 message delivery after server restart (#427)
Resolved an issue where persisted QoS 1 messages were not correctly loaded
into the appropriate client instance during server startup.
2024-10-23 21:02:45 +01:00
dagehuifei
00601ca982 feat: Add TLS Cert File flag 2024-10-12 10:14:38 +08:00
mochi-co
830de149dc Update server version 2024-07-30 20:24:12 +01:00
Simon
34f9370f8c use cl.ID instead of pk.Connect.ClientIdentifier when looking for existing clients in inheritClientSession (#417)
* use cl.id instead of pk.Connect.ClientIdentifier which is client-supplied, when looking for existing clients in inheritClientSession

* look for assigned client id explicitly

* simpler implementation using just cl.ID

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2024-07-30 19:09:16 +01:00
Cendeal
dc272d2c36 add description for MaximumClientWritesPending(#410) (#411) 2024-07-30 19:02:32 +01:00
mochi-co
82c96fa4e3 Update server version 2024-05-19 19:15:53 +01:00
Jens Alfke
01f81ebeee Added omitempty to storage structs (#401)
This significantly cuts down the size of the marshaled JSON.

I've left it out on fields that, in my experience, never have the
default value.
2024-05-19 19:12:57 +01:00
Jens Alfke
cc3f827fc1 Added client.StopTime(), to expose disconnected (#400)
* Added client.StopTime() to expose `disconnected`

* Added a test of Client.StopTime()

I added a check to TestClientStop(), both before and after stopping.

I also noticed a race condition in the test (comparing a time against
time.Now) and fixed it to allow a one-second discrepancy.
2024-05-19 19:11:24 +01:00
mochi-co
5966c7fe0d Update server version 2024-04-30 10:04:01 +01:00
werben
b26e03a433 Bypassing asdine/storm and directly using bbolt. (#392)
* Fix the bug where inline subscribers do not receive messages after all non-inline clients unsubscribe.

* Bypassing asdine/storm and directly using bbolt.

* Fixed erroneous removal of FileContributor.

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2024-04-30 10:03:22 +01:00
dependabot[bot]
6fc4027a78 Bump golang.org/x/net from 0.17.0 to 0.23.0 (#388)
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.17.0 to 0.23.0.
- [Commits](https://github.com/golang/net/compare/v0.17.0...v0.23.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2024-04-30 09:21:53 +01:00
Derek Duncan
b9d2dfb824 Add logging level file config (#396)
* add logging level to file config

* remove last zerolog references

* add levelvar to unit test

---------

Co-authored-by: Derek Duncan <derekduncan@gmail.com>
Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2024-04-30 09:08:45 +01:00
thedevop
d5d9b02b28 Update README example to reflect new listeners API (#389)
* Update README example to reflect new listeners API

* Update README example to reflect new listeners API

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
2024-04-30 09:03:34 +01:00
Derek Duncan
64ea905c41 add 0x04 Reason Code functionality (#395)
* add 0x04 Reason Code functionality

* fix tpackets test

* simplify logic

---------

Co-authored-by: Derek Duncan <derekduncan@gmail.com>
2024-04-30 08:57:16 +01:00
28 changed files with 508 additions and 271 deletions

View File

@@ -149,7 +149,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)
// 在标1883端口上创建一个 TCP 服务端。
tcp := listeners.NewTCP("t1", ":1883", nil)
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
@@ -197,6 +197,7 @@ func main() {
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
@@ -207,7 +208,7 @@ server := mqtt.New(&mqtt.Options{
InlineClient: false,
})
```
请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体以查看完整的所有服务端选项。ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。
请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。 ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。其中 Capabilities.MaximumClientWritesPending 的大小会影响服务器运行内存占用,如果 IoT 设备同时在线的数量比较多,设置的值很大,尽管没有收发数据,服务器运行内存占用也会增加很多,默认该数值为 1024*8 ,可以根据实际情况调整该参数。
### 默认配置说明(Default Configuration Notes)
@@ -414,7 +415,7 @@ if err != nil {
### 内联客户端 (Inline Client v2.4.0+支持)
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。目前,内联客户端暂时还不支持共享订阅。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
```go
server := mqtt.New(&mqtt.Options{

View File

@@ -119,7 +119,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)
// Create a TCP listener on a standard port.
tcp := listeners.NewTCP("t1", ":1883", nil)
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
@@ -168,6 +168,7 @@ TLSを設定するには`*listeners.Config`を渡すことができます。
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
@@ -181,6 +182,7 @@ server := mqtt.New(&mqtt.Options{
mqtt.Options、mqtt.Capabilities、mqtt.Compatibilitiesの構造体はオプションの理解に役立ちます。
必要に応じて`ClientNetWriteBufferSize`と`ClientNetReadBufferSize`はクライアントの使用するメモリに合わせて設定できます。
`Capabilities.MaximumClientWritesPending`のサイズは、サーバーのメモリ使用量に影響を与えます。IoTデバイスが同時にオンラインで多数存在する場合、また設定値が非常に大きい場合、データの送受信がなくても、サーバーのメモリ使用量は大幅に増加します。デフォルト値は1024*8で、実際の状況に応じてこのパラメータを調整することができます。
### デフォルト設定に関する注意事項

View File

@@ -149,7 +149,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)
// Create a TCP listener on a standard port.
tcp := listeners.NewTCP("t1", ":1883", nil)
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
@@ -198,6 +198,7 @@ A number of configurable options are available which can be used to alter the be
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
@@ -209,7 +210,7 @@ server := mqtt.New(&mqtt.Options{
})
```
Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs.
Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs. The size of `Capabilities.MaximumClientWritesPending` will affect the memory usage of the server. If the number of IoT devices online at the same time is large, and the set value is very large, even if there is no data transmission, the memory usage of the server will increase a lot. The default value is 1024*8, and this parameter can be adjusted according to the actual situation.
### Default Configuration Notes
@@ -399,7 +400,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found
If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`.
### Inline Client (v2.4.0+)
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
```go
server := mqtt.New(&mqtt.Options{
InlineClient: true,

View File

@@ -150,7 +150,7 @@ type ClientState struct {
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
outbound chan *packets.Packet // queue for pending outbound packets
endOnce sync.Once // only end once
isTakenOver uint32 // used to identify orphaned clients
isTakenOver atomic.Bool // used to identify orphaned clients
packetID uint32 // the current highest packetID
open context.Context // indicate that the client is open for packet exchange
cancelOpen context.CancelFunc // cancel function for open context
@@ -417,11 +417,20 @@ func (cl *Client) StopCause() error {
return cl.State.stopCause.Load().(error)
}
// StopTime returns the the time the client disconnected in unix time, else zero.
func (cl *Client) StopTime() int64 {
return atomic.LoadInt64(&cl.State.disconnected)
}
// Closed returns true if client connection is closed.
func (cl *Client) Closed() bool {
return cl.State.open == nil || cl.State.open.Err() != nil
}
func (cl *Client) IsTakenOver() bool {
return cl.State.isTakenOver.Load()
}
// ReadFixedHeader reads in the values of the next packet's fixed header.
func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
if cl.Net.bconn == nil {

View File

@@ -583,9 +583,11 @@ func TestClientReadDone(t *testing.T) {
func TestClientStop(t *testing.T) {
cl, _, _ := newTestClient()
require.Equal(t, int64(0), cl.StopTime())
cl.Stop(nil)
require.Equal(t, nil, cl.State.stopCause.Load())
require.Equal(t, time.Now().Unix(), cl.State.disconnected)
require.InDelta(t, time.Now().Unix(), cl.State.disconnected, 1.0)
require.Equal(t, cl.State.disconnected, cl.StopTime())
require.True(t, cl.Closed())
require.Equal(t, nil, cl.StopCause())
}
@@ -597,6 +599,13 @@ func TestClientClosed(t *testing.T) {
require.True(t, cl.Closed())
}
func TestClientIsTakenOver(t *testing.T) {
cl, _, _ := newTestClient()
require.False(t, cl.IsTakenOver())
cl.State.isTakenOver.Store(true)
require.True(t, cl.IsTakenOver())
}
func TestClientReadFixedHeaderError(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)

View File

@@ -5,6 +5,7 @@
package main
import (
"crypto/tls"
"flag"
"log"
"os"
@@ -20,6 +21,8 @@ func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
wsAddr := flag.String("ws", ":1882", "network address for Websocket listener")
infoAddr := flag.String("info", ":8080", "network address for web info dashboard listener")
tlsCertFile := flag.String("tls-cert-file", "", "TLS certificate file")
tlsKeyFile := flag.String("tls-key-file", "", "TLS key file")
flag.Parse()
sigs := make(chan os.Signal, 1)
@@ -30,12 +33,25 @@ func main() {
done <- true
}()
var tlsConfig *tls.Config
if tlsCertFile != nil && tlsKeyFile != nil && *tlsCertFile != "" && *tlsKeyFile != "" {
cert, err := tls.LoadX509KeyPair(*tlsCertFile, *tlsKeyFile)
if err != nil {
return
}
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
}
}
server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)
tcp := listeners.NewTCP(listeners.Config{
ID: "t1",
Address: *tcpAddr,
ID: "t1",
Address: *tcpAddr,
TLSConfig: tlsConfig,
})
err := server.AddListener(tcp)
if err != nil {

View File

@@ -6,6 +6,8 @@ package config
import (
"encoding/json"
"log/slog"
"os"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/debug"
@@ -21,9 +23,27 @@ import (
// config defines the structure of configuration data to be parsed from a config source.
type config struct {
Options mqtt.Options
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
Options mqtt.Options
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
LoggingConfig LoggingConfig `yaml:"logging" json:"logging"`
}
type LoggingConfig struct {
Level string
}
func (lc LoggingConfig) ToLogger() *slog.Logger {
var level slog.Level
if err := level.UnmarshalText([]byte(lc.Level)); err != nil {
level = slog.LevelInfo
}
leveler := new(slog.LevelVar)
leveler.Set(level)
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: leveler,
}))
}
// HookConfigs contains configurations to enable individual hooks.
@@ -149,6 +169,7 @@ func FromBytes(b []byte) (*mqtt.Options, error) {
o = c.Options
o.Hooks = c.HookConfigs.ToHooks()
o.Listeners = c.Listeners
o.Logger = c.LoggingConfig.ToLogger()
return &o, nil
}

View File

@@ -5,6 +5,8 @@
package config
import (
"log/slog"
"os"
"testing"
"github.com/stretchr/testify/require"
@@ -60,7 +62,6 @@ options:
}
}
`)
parsedOptions = mqtt.Options{
Listeners: []listeners.Config{
{
@@ -81,6 +82,9 @@ options:
RestoreSysInfoOnRestart: true,
},
},
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: new(slog.LevelVar),
})),
}
)
@@ -176,7 +180,8 @@ func TestToHooksStorageBolt(t *testing.T) {
hc := HookConfigs{
Storage: &HookStorageConfig{
Bolt: &bolt.Options{
Path: "bolt",
Path: "bolt",
Bucket: "mochi",
},
},
}

View File

@@ -31,7 +31,8 @@
"gc_discard_ratio": 0.5
},
"bolt": {
"path": "bolt.db"
"path": "bolt.db",
"bucket": "mochi"
},
"redis": {
"h_prefix": "mc",

View File

@@ -21,6 +21,7 @@ hooks:
mode: "NoSync"
bolt:
path: bolt.db
bucket: "mochi"
redis:
h_prefix: "mc"
username: "mochi"
@@ -65,3 +66,5 @@ options:
always_return_response_info: false
restore_sys_info_on_restart: false
no_inherited_properties_on_ack: false
logging:
level: INFO

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package main

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package main
@@ -19,6 +19,9 @@ import (
)
func main() {
boltPath := ".bolt"
defer os.RemoveAll(boltPath) // remove the example db files at the end
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
@@ -31,7 +34,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)
err := server.AddHook(new(bolt.Hook), &bolt.Options{
Path: "bolt.db",
Path: boltPath,
Options: &bbolt.Options{
Timeout: 500 * time.Millisecond,
},

View File

@@ -57,7 +57,10 @@ func main() {
done <- true
}()
cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
// Load tls cert from your cert file
cert, err := tls.LoadX509KeyPair("replace_your_cert.pem", "replace_your_cert.key")
//cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
if err != nil {
log.Fatal(err)
}

10
go.mod
View File

@@ -4,8 +4,6 @@ go 1.21
require (
github.com/alicebob/miniredis/v2 v2.23.0
github.com/asdine/storm v2.1.2+incompatible
github.com/asdine/storm/v3 v3.2.1
github.com/cockroachdb/pebble v1.1.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/go-redis/redis/v8 v8.11.5
@@ -32,7 +30,7 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/glog v1.2.4 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
@@ -51,8 +49,8 @@ require (
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)

37
go.sum
View File

@@ -33,11 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863 h1:BRrxwOZBolJN4gIwvZMJY1tzqBvQgpaZiQRuIDD40jM=
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863/go.mod h1:D0JMgToj/WdxCgd30Kc1UcA9E+WdZoJqeVOuYW7iTBM=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -47,10 +44,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw=
github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q=
github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ=
github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac=
github.com/asdine/storm/v3 v3.2.1/go.mod h1:LEpXwGt4pIqrE/XcTvCnZHT5MgZCV6Ub9q7yQzOFWr0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -117,8 +110,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/glog v1.2.4 h1:CNNw5U8lSiiBk7druxtSHHTsRWcxKoac6kZKm2peBBc=
github.com/golang/glog v1.2.4/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
@@ -147,7 +140,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -163,8 +155,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@@ -275,15 +267,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@@ -345,7 +334,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -362,8 +350,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -380,8 +368,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -421,8 +409,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -430,8 +418,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -502,7 +490,6 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=

View File

@@ -405,6 +405,8 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
"hook", hook.ID(),
"packet", pkx)
return pk, err
} else if errors.Is(err, packets.CodeSuccessIgnore) {
return pk, err
}
h.Log.Error("publish packet error",
"error", err,

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co, gsagula
// SPDX-FileContributor: mochi-co, gsagula, werbenhu
package badger
@@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
PacketID: pk.PacketID,
FixedHeader: pk.FixedHeader,
@@ -490,23 +492,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
}
// Errorf satisfies the badger interface for an error logger.
func (h *Hook) Errorf(m string, v ...interface{}) {
func (h *Hook) Errorf(m string, v ...any) {
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Warningf satisfies the badger interface for a warning logger.
func (h *Hook) Warningf(m string, v ...interface{}) {
func (h *Hook) Warningf(m string, v ...any) {
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Infof satisfies the badger interface for an info logger.
func (h *Hook) Infof(m string, v ...interface{}) {
func (h *Hook) Infof(m string, v ...any) {
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Debugf satisfies the badger interface for a debug logger.
func (h *Hook) Debugf(m string, v ...interface{}) {
func (h *Hook) Debugf(m string, v ...any) {
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package badger

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
// Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
package bolt
@@ -14,23 +14,27 @@ import (
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
sgob "github.com/asdine/storm/codec/gob"
"github.com/asdine/storm/v3"
"go.etcd.io/bbolt"
)
var (
ErrBucketNotFound = errors.New("bucket not found")
ErrKeyNotFound = errors.New("key not found")
)
const (
// defaultDbFile is the default file path for the boltdb file.
defaultDbFile = "bolt.db"
defaultDbFile = ".bolt"
// defaultTimeout is the default time to hold a connection to the file.
defaultTimeout = 250 * time.Millisecond
defaultBucket = "mochi"
)
// clientKey returns a primary key for a client.
func clientKey(cl *mqtt.Client) string {
return cl.ID
return storage.ClientKey + "_" + cl.ID
}
// subscriptionKey returns a primary key for a subscription.
@@ -56,6 +60,7 @@ func sysInfoKey() string {
// Options contains configuration settings for the bolt instance.
type Options struct {
Options *bbolt.Options
Bucket string `yaml:"bucket" json:"bucket"`
Path string `yaml:"path" json:"path"`
}
@@ -63,7 +68,7 @@ type Options struct {
type Hook struct {
mqtt.HookBase
config *Options // options for configuring the boltdb instance.
db *storm.DB // the boltdb instance.
db *bbolt.DB // the boltdb instance.
}
// ID returns the id of the hook.
@@ -110,22 +115,32 @@ func (h *Hook) Init(config any) error {
Timeout: defaultTimeout,
}
}
if h.config.Path == "" {
if len(h.config.Path) == 0 {
h.config.Path = defaultDbFile
}
if len(h.config.Bucket) == 0 {
h.config.Bucket = defaultBucket
}
var err error
h.db, err = storm.Open(h.config.Path, storm.BoltOptions(0600, h.config.Options), storm.Codec(sgob.Codec))
h.db, err = bbolt.Open(h.config.Path, 0600, h.config.Options)
if err != nil {
return err
}
return nil
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(h.config.Bucket))
return err
})
return err
}
// Stop closes the boltdb instance.
func (h *Hook) Stop() error {
return h.db.Close()
err := h.db.Close()
h.db = nil
return err
}
// OnSessionEstablished adds a client to the store when their session is established.
@@ -147,7 +162,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
props := cl.Properties.Props.Copy(false)
in := &storage.Client{
ID: clientKey(cl),
ID: cl.ID,
T: storage.ClientKey,
Remote: cl.Net.Remote,
Listener: cl.Net.Listener,
@@ -167,10 +182,8 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
},
Will: storage.ClientWill(cl.Properties.Will),
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save client data", "error", err, "data", in)
}
_ = h.setKv(clientKey(cl), in)
}
// OnDisconnect removes a client from the store if they were using a clean session.
@@ -188,10 +201,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
return
}
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
if err != nil && !errors.Is(err, storm.ErrNotFound) {
h.Log.Error("failed to delete client", "error", err, "id", clientKey(cl))
}
_ = h.delKv(clientKey(cl))
}
// OnSubscribed adds one or more client subscriptions to the store.
@@ -214,11 +224,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by
RetainHandling: pk.Filters[i].RetainHandling,
RetainAsPublished: pk.Filters[i].RetainAsPublished,
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save subscription data", "error", err, "client", cl.ID, "data", in)
}
_ = h.setKv(in.ID, in)
}
}
@@ -230,12 +236,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
}
for i := 0; i < len(pk.Filters); i++ {
err := h.db.DeleteStruct(&storage.Subscription{
ID: subscriptionKey(cl, pk.Filters[i].Filter),
})
if err != nil {
h.Log.Error("failed to delete client", "error", err, "id", subscriptionKey(cl, pk.Filters[i].Filter))
}
_ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter))
}
}
@@ -247,12 +248,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
}
if r == -1 {
err := h.db.DeleteStruct(&storage.Message{
ID: retainedKey(pk.TopicName),
})
if err != nil {
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(pk.TopicName))
}
_ = h.delKv(retainedKey(pk.TopicName))
return
}
@@ -264,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -276,10 +273,8 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
User: props.User,
},
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save retained publish data", "error", err, "client", cl.ID, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnQosPublish adds or updates an inflight message in the store.
@@ -293,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
FixedHeader: pk.FixedHeader,
TopicName: pk.TopicName,
@@ -311,10 +307,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
},
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save qos inflight data", "error", err, "client", cl.ID, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnQosComplete removes a resolved inflight message from the store.
@@ -324,12 +317,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
return
}
err := h.db.DeleteStruct(&storage.Message{
ID: inflightKey(cl, pk),
})
if err != nil {
h.Log.Error("failed to delete inflight data", "error", err, "id", inflightKey(cl, pk))
}
_ = h.delKv(inflightKey(cl, pk))
}
// OnQosDropped removes a dropped inflight message from the store.
@@ -354,10 +342,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) {
Info: *sys,
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save $SYS data", "error", err, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnRetainedExpired deletes expired retained messages from the store.
@@ -366,10 +351,7 @@ func (h *Hook) OnRetainedExpired(filter string) {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
}
if err := h.db.DeleteStruct(&storage.Message{ID: retainedKey(filter)}); err != nil {
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(filter))
}
_ = h.delKv(retainedKey(filter))
}
// OnClientExpired deleted expired clients from the store.
@@ -378,25 +360,24 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
}
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
if err != nil && !errors.Is(err, storm.ErrNotFound) {
h.Log.Error("failed to delete expired client", "error", err, "id", clientKey(cl))
}
_ = h.delKv(clientKey(cl))
}
// StoredClients returns all stored clients from the store.
func (h *Hook) StoredClients() (v []storage.Client, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
}
err = h.db.Find("T", storage.ClientKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
return v, storage.ErrDBFileNotOpen
}
err = h.iterKv(storage.ClientKey, func(value []byte) error {
obj := storage.Client{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return v, nil
}
@@ -404,58 +385,142 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) {
func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.Find("T", storage.SubscriptionKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Subscription, 0)
err = h.iterKv(storage.SubscriptionKey, func(value []byte) error {
obj := storage.Subscription{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredRetainedMessages returns all stored retained messages from the store.
func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.Find("T", storage.RetainedKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Message, 0)
err = h.iterKv(storage.RetainedKey, func(value []byte) error {
obj := storage.Message{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredInflightMessages returns all stored inflight messages from the store.
func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.Find("T", storage.InflightKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Message, 0)
err = h.iterKv(storage.InflightKey, func(value []byte) error {
obj := storage.Message{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredSysInfo returns the system info from the store.
func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.One("ID", storage.SysInfoKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
err = h.getKv(storage.SysInfoKey, &v)
if err != nil && !errors.Is(err, ErrKeyNotFound) {
return
}
return v, nil
}
// setKv stores a key-value pair in the database.
func (h *Hook) setKv(k string, v storage.Serializable) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
data, _ := v.MarshalBinary()
err := bucket.Put([]byte(k), data)
if err != nil {
return err
}
return nil
})
if err != nil {
h.Log.Error("failed to upsert data", "error", err, "key", k)
}
return err
}
// delKv deletes a key-value pair from the database.
func (h *Hook) delKv(k string) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
err := bucket.Delete([]byte(k))
if err != nil {
return err
}
return nil
})
if err != nil {
h.Log.Error("failed to delete data", "error", err, "key", k)
}
return err
}
// getKv retrieves the value associated with a key from the database.
func (h *Hook) getKv(k string, v storage.Serializable) error {
err := h.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
value := bucket.Get([]byte(k))
if value == nil {
return ErrKeyNotFound
}
return v.UnmarshalBinary(value)
})
if err != nil {
h.Log.Error("failed to get data", "error", err, "key", k)
}
return err
}
// iterKv iterates over key-value pairs with keys having the specified prefix in the database.
func (h *Hook) iterKv(prefix string, visit func([]byte) error) error {
err := h.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
c := bucket.Cursor()
for k, v := c.Seek([]byte(prefix)); k != nil && string(k[:len(prefix)]) == prefix; k, v = c.Next() {
if err := visit(v); err != nil {
return err
}
}
return nil
})
if err != nil {
h.Log.Error("failed to iter data", "error", err, "prefix", prefix)
}
return err
}

View File

@@ -1,10 +1,11 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package bolt
import (
"errors"
"log/slog"
"os"
"testing"
@@ -15,7 +16,6 @@ import (
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/asdine/storm/v3"
"github.com/stretchr/testify/require"
)
@@ -45,7 +45,7 @@ func teardown(t *testing.T, path string, h *Hook) {
func TestClientKey(t *testing.T) {
k := clientKey(&mqtt.Client{ID: "cl1"})
require.Equal(t, "cl1", k)
require.Equal(t, "CL_cl1", k)
}
func TestSubscriptionKey(t *testing.T) {
@@ -130,7 +130,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
h.OnSessionEstablished(client, packets.Packet{})
r := new(storage.Client)
err = h.db.One("ID", clientKey(client), r)
err = h.getKv(clientKey(client), r)
require.NoError(t, err)
require.Equal(t, client.ID, r.ID)
require.Equal(t, client.Net.Remote, r.Remote)
@@ -141,15 +141,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
h.OnDisconnect(client, nil, false)
r2 := new(storage.Client)
err = h.db.One("ID", clientKey(client), r2)
err = h.getKv(clientKey(client), r2)
require.NoError(t, err)
require.Equal(t, client.ID, r.ID)
h.OnDisconnect(client, nil, true)
r3 := new(storage.Client)
err = h.db.One("ID", clientKey(client), r3)
err = h.getKv(clientKey(client), r3)
require.Error(t, err)
require.ErrorIs(t, storm.ErrNotFound, err)
require.ErrorIs(t, ErrKeyNotFound, err)
require.Empty(t, r3.ID)
}
@@ -180,7 +180,7 @@ func TestOnWillSent(t *testing.T) {
h.OnWillSent(c1, packets.Packet{})
r := new(storage.Client)
err = h.db.One("ID", clientKey(client), r)
err = h.getKv(clientKey(client), r)
require.NoError(t, err)
require.Equal(t, uint32(1), r.Will.Flag)
@@ -197,18 +197,18 @@ func TestOnClientExpired(t *testing.T) {
cl := &mqtt.Client{ID: "cl1"}
clientKey := clientKey(cl)
err = h.db.Save(&storage.Client{ID: cl.ID})
err = h.setKv(clientKey, &storage.Client{ID: cl.ID})
require.NoError(t, err)
r := new(storage.Client)
err = h.db.One("ID", clientKey, r)
err = h.getKv(clientKey, r)
require.NoError(t, err)
require.Equal(t, cl.ID, r.ID)
h.OnClientExpired(cl)
err = h.db.One("ID", clientKey, r)
err = h.getKv(clientKey, r)
require.Error(t, err)
require.ErrorIs(t, storm.ErrNotFound, err)
require.ErrorIs(t, ErrKeyNotFound, err)
}
func TestOnClientExpiredClosedDB(t *testing.T) {
@@ -274,16 +274,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) {
h.OnSubscribed(client, pkf, []byte{0})
r := new(storage.Subscription)
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
require.NoError(t, err)
require.Equal(t, client.ID, r.Client)
require.Equal(t, pkf.Filters[0].Filter, r.Filter)
require.Equal(t, byte(0), r.Qos)
h.OnUnsubscribed(client, pkf)
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnSubscribedNoDB(t *testing.T) {
@@ -334,21 +334,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) {
h.OnRetainMessage(client, pk, 1)
r := new(storage.Message)
err = h.db.One("ID", retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.NoError(t, err)
require.Equal(t, pk.TopicName, r.TopicName)
require.Equal(t, pk.Payload, r.Payload)
h.OnRetainMessage(client, pk, -1)
err = h.db.One("ID", retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
// coverage: delete deleted
h.OnRetainMessage(client, pk, -1)
err = h.db.One("ID", retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnRetainedExpired(t *testing.T) {
@@ -364,18 +364,18 @@ func TestOnRetainedExpired(t *testing.T) {
TopicName: "a/b/c",
}
err = h.db.Save(m)
err = h.setKv(m.ID, m)
require.NoError(t, err)
r := new(storage.Message)
err = h.db.One("ID", m.ID, r)
err = h.getKv(m.ID, r)
require.NoError(t, err)
require.Equal(t, m.TopicName, r.TopicName)
h.OnRetainedExpired(m.TopicName)
err = h.db.One("ID", m.ID, r)
err = h.getKv(m.ID, r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnRetainedExpiredClosedDB(t *testing.T) {
@@ -427,7 +427,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
h.OnQosPublish(client, pk, time.Now().Unix(), 0)
r := new(storage.Message)
err = h.db.One("ID", inflightKey(client, pk), r)
err = h.getKv(inflightKey(client, pk), r)
require.NoError(t, err)
require.Equal(t, pk.TopicName, r.TopicName)
require.Equal(t, pk.Payload, r.Payload)
@@ -438,9 +438,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
// OnQosDropped is a passthrough to OnQosComplete here
h.OnQosDropped(client, pk)
err = h.db.One("ID", inflightKey(client, pk), r)
err = h.getKv(inflightKey(client, pk), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnQosPublishNoDB(t *testing.T) {
@@ -494,7 +494,7 @@ func TestOnSysInfoTick(t *testing.T) {
h.OnSysInfoTick(info)
r := new(storage.SystemInfo)
err = h.db.One("ID", storage.SysInfoKey, r)
err = h.getKv(storage.SysInfoKey, r)
require.NoError(t, err)
require.Equal(t, info.Version, r.Version)
require.Equal(t, info.BytesReceived, r.BytesReceived)
@@ -524,13 +524,13 @@ func TestStoredClients(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with clients
err = h.db.Save(&storage.Client{ID: "cl1", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_"+"cl1", &storage.Client{ID: "cl1"})
require.NoError(t, err)
err = h.db.Save(&storage.Client{ID: "cl2", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_"+"cl2", &storage.Client{ID: "cl2"})
require.NoError(t, err)
err = h.db.Save(&storage.Client{ID: "cl3", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_"+"cl3", &storage.Client{ID: "cl3"})
require.NoError(t, err)
r, err := h.StoredClients()
@@ -546,7 +546,7 @@ func TestStoredClientsNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredClients()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredClientsClosedDB(t *testing.T) {
@@ -557,7 +557,7 @@ func TestStoredClientsClosedDB(t *testing.T) {
teardown(t, h.config.Path, h)
v, err := h.StoredClients()
require.Empty(t, v)
require.Error(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSubscriptions(t *testing.T) {
@@ -568,13 +568,13 @@ func TestStoredSubscriptions(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with subscriptions
err = h.db.Save(&storage.Subscription{ID: "sub1", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_"+"sub1", &storage.Subscription{ID: "sub1"})
require.NoError(t, err)
err = h.db.Save(&storage.Subscription{ID: "sub2", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_"+"sub2", &storage.Subscription{ID: "sub2"})
require.NoError(t, err)
err = h.db.Save(&storage.Subscription{ID: "sub3", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_"+"sub3", &storage.Subscription{ID: "sub3"})
require.NoError(t, err)
r, err := h.StoredSubscriptions()
@@ -590,7 +590,7 @@ func TestStoredSubscriptionsNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredSubscriptions()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSubscriptionsClosedDB(t *testing.T) {
@@ -601,7 +601,7 @@ func TestStoredSubscriptionsClosedDB(t *testing.T) {
teardown(t, h.config.Path, h)
v, err := h.StoredSubscriptions()
require.Empty(t, v)
require.Error(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredRetainedMessages(t *testing.T) {
@@ -612,16 +612,16 @@ func TestStoredRetainedMessages(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "m2", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m2", &storage.Message{ID: "m2"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "m3", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m3", &storage.Message{ID: "m3"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
require.NoError(t, err)
r, err := h.StoredRetainedMessages()
@@ -637,7 +637,7 @@ func TestStoredRetainedMessagesNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredRetainedMessages()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredRetainedMessagesClosedDB(t *testing.T) {
@@ -659,16 +659,16 @@ func TestStoredInflightMessages(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Save(&storage.Message{ID: "i1", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i1", &storage.Message{ID: "i1"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "i2", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i2", &storage.Message{ID: "i2"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
require.NoError(t, err)
r, err := h.StoredInflightMessages()
@@ -684,7 +684,7 @@ func TestStoredInflightMessagesNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredInflightMessages()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredInflightMessagesClosedDB(t *testing.T) {
@@ -695,7 +695,7 @@ func TestStoredInflightMessagesClosedDB(t *testing.T) {
teardown(t, h.config.Path, h)
v, err := h.StoredInflightMessages()
require.Empty(t, v)
require.Error(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSysInfo(t *testing.T) {
@@ -706,7 +706,7 @@ func TestStoredSysInfo(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with sys info
err = h.db.Save(&storage.SystemInfo{
err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{
ID: storage.SysInfoKey,
Info: system.Info{
Version: "2.0.0",
@@ -725,7 +725,7 @@ func TestStoredSysInfoNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredSysInfo()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSysInfoClosedDB(t *testing.T) {
@@ -738,3 +738,54 @@ func TestStoredSysInfoClosedDB(t *testing.T) {
require.Empty(t, v)
require.Error(t, err)
}
func TestGetSetDelKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(nil)
defer teardown(t, h.config.Path, h)
require.NoError(t, err)
err = h.setKv("testId", &storage.Client{ID: "testId"})
require.NoError(t, err)
var obj storage.Client
err = h.getKv("testId", &obj)
require.NoError(t, err)
err = h.delKv("testId")
require.NoError(t, err)
err = h.getKv("testId", &obj)
require.Error(t, err)
require.ErrorIs(t, ErrKeyNotFound, err)
}
func TestIterKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(nil)
defer teardown(t, h.config.Path, h)
require.NoError(t, err)
h.setKv("prefix_a_1", &storage.Client{ID: "1"})
h.setKv("prefix_a_2", &storage.Client{ID: "2"})
h.setKv("prefix_b_2", &storage.Client{ID: "3"})
var clients []storage.Client
err = h.iterKv("prefix_a", func(data []byte) error {
var item storage.Client
item.UnmarshalBinary(data)
clients = append(clients, item)
return nil
})
require.Equal(t, 2, len(clients))
require.NoError(t, err)
visitErr := errors.New("iter visit error")
err = h.iterKv("prefix_b", func(data []byte) error {
return visitErr
})
require.ErrorIs(t, visitErr, err)
}

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co, gsagula
// SPDX-FileContributor: werbenhu
package pebble
@@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
PacketID: pk.PacketID,
FixedHeader: pk.FixedHeader,
@@ -467,23 +469,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
}
// Errorf satisfies the pebble interface for an error logger.
func (h *Hook) Errorf(m string, v ...interface{}) {
func (h *Hook) Errorf(m string, v ...any) {
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Warningf satisfies the pebble interface for a warning logger.
func (h *Hook) Warningf(m string, v ...interface{}) {
func (h *Hook) Warningf(m string, v ...any) {
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Infof satisfies the pebble interface for an info logger.
func (h *Hook) Infof(m string, v ...interface{}) {
func (h *Hook) Infof(m string, v ...any) {
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Debugf satisfies the pebble interface for a debug logger.
func (h *Hook) Debugf(m string, v ...interface{}) {
func (h *Hook) Debugf(m string, v ...any) {
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: werbenhu
package pebble

View File

@@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
TopicName: pk.TopicName,
Payload: pk.Payload,
Created: pk.Created,
Client: cl.ID,
Origin: pk.Origin,
Properties: storage.MessageProperties{
PayloadFormat: props.PayloadFormat,
@@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
in := &storage.Message{
ID: inflightKey(cl, pk),
T: storage.InflightKey,
Client: cl.ID,
Origin: pk.Origin,
FixedHeader: pk.FixedHeader,
TopicName: pk.TopicName,

View File

@@ -46,28 +46,28 @@ type Client struct {
// ClientProperties contains a limited set of the mqtt v5 properties specific to a client connection.
type ClientProperties struct {
AuthenticationData []byte `json:"authenticationData"`
User []packets.UserProperty `json:"user"`
AuthenticationMethod string `json:"authenticationMethod"`
SessionExpiryInterval uint32 `json:"sessionExpiryInterval"`
MaximumPacketSize uint32 `json:"maximumPacketSize"`
ReceiveMaximum uint16 `json:"receiveMaximum"`
TopicAliasMaximum uint16 `json:"topicAliasMaximum"`
SessionExpiryIntervalFlag bool `json:"sessionExpiryIntervalFlag"`
RequestProblemInfo byte `json:"requestProblemInfo"`
RequestProblemInfoFlag bool `json:"requestProblemInfoFlag"`
RequestResponseInfo byte `json:"requestResponseInfo"`
AuthenticationData []byte `json:"authenticationData,omitempty"`
User []packets.UserProperty `json:"user,omitempty"`
AuthenticationMethod string `json:"authenticationMethod,omitempty"`
SessionExpiryInterval uint32 `json:"sessionExpiryInterval,omitempty"`
MaximumPacketSize uint32 `json:"maximumPacketSize,omitempty"`
ReceiveMaximum uint16 `json:"receiveMaximum,omitempty"`
TopicAliasMaximum uint16 `json:"topicAliasMaximum,omitempty"`
SessionExpiryIntervalFlag bool `json:"sessionExpiryIntervalFlag,omitempty"`
RequestProblemInfo byte `json:"requestProblemInfo,omitempty"`
RequestProblemInfoFlag bool `json:"requestProblemInfoFlag,omitempty"`
RequestResponseInfo byte `json:"requestResponseInfo,omitempty"`
}
// ClientWill contains a will message for a client, and limited mqtt v5 properties.
type ClientWill struct {
Payload []byte `json:"payload"`
User []packets.UserProperty `json:"user"`
TopicName string `json:"topicName"`
Flag uint32 `json:"flag"`
WillDelayInterval uint32 `json:"willDelayInterval"`
Qos byte `json:"qos"`
Retain bool `json:"retain"`
Payload []byte `json:"payload,omitempty"`
User []packets.UserProperty `json:"user,omitempty"`
TopicName string `json:"topicName,omitempty"`
Flag uint32 `json:"flag,omitempty"`
WillDelayInterval uint32 `json:"willDelayInterval,omitempty"`
Qos byte `json:"qos,omitempty"`
Retain bool `json:"retain,omitempty"`
}
// MarshalBinary encodes the values into a json string.
@@ -85,29 +85,30 @@ func (d *Client) UnmarshalBinary(data []byte) error {
// Message is a storable representation of an MQTT message (specifically publish).
type Message struct {
Properties MessageProperties `json:"properties"` // -
Payload []byte `json:"payload"` // the message payload (if retained)
T string `json:"t"` // the data type
ID string `json:"id" storm:"id"` // the storage key
Origin string `json:"origin"` // the id of the client who sent the message
TopicName string `json:"topic_name"` // the topic the message was sent to (if retained)
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message
Created int64 `json:"created"` // the time the message was created in unixtime
Sent int64 `json:"sent"` // the last time the message was sent (for retries) in unixtime (if inflight)
PacketID uint16 `json:"packet_id"` // the unique id of the packet (if inflight)
Properties MessageProperties `json:"properties"` // -
Payload []byte `json:"payload"` // the message payload (if retained)
T string `json:"t,omitempty"` // the data type
ID string `json:"id,omitempty" storm:"id"` // the storage key
Client string `json:"client,omitempty"` // the client id the message is for
Origin string `json:"origin,omitempty"` // the id of the client who sent the message
TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained)
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message
Created int64 `json:"created,omitempty"` // the time the message was created in unixtime
Sent int64 `json:"sent,omitempty"` // the last time the message was sent (for retries) in unixtime (if inflight)
PacketID uint16 `json:"packet_id,omitempty"` // the unique id of the packet (if inflight)
}
// MessageProperties contains a limited subset of mqtt v5 properties specific to publish messages.
type MessageProperties struct {
CorrelationData []byte `json:"correlationData"`
SubscriptionIdentifier []int `json:"subscriptionIdentifier"`
User []packets.UserProperty `json:"user"`
ContentType string `json:"contentType"`
ResponseTopic string `json:"responseTopic"`
MessageExpiryInterval uint32 `json:"messageExpiry"`
TopicAlias uint16 `json:"topicAlias"`
PayloadFormat byte `json:"payloadFormat"`
PayloadFormatFlag bool `json:"payloadFormatFlag"`
CorrelationData []byte `json:"correlationData,omitempty"`
SubscriptionIdentifier []int `json:"subscriptionIdentifier,omitempty"`
User []packets.UserProperty `json:"user,omitempty"`
ContentType string `json:"contentType,omitempty"`
ResponseTopic string `json:"responseTopic,omitempty"`
MessageExpiryInterval uint32 `json:"messageExpiry,omitempty"`
TopicAlias uint16 `json:"topicAlias,omitempty"`
PayloadFormat byte `json:"payloadFormat,omitempty"`
PayloadFormatFlag bool `json:"payloadFormatFlag,omitempty"`
}
// MarshalBinary encodes the values into a json string.
@@ -155,15 +156,15 @@ func (d *Message) ToPacket() packets.Packet {
// Subscription is a storable representation of an MQTT subscription.
type Subscription struct {
T string `json:"t"`
ID string `json:"id" storm:"id"`
Client string `json:"client"`
T string `json:"t,omitempty"`
ID string `json:"id,omitempty" storm:"id"`
Client string `json:"client,omitempty"`
Filter string `json:"filter"`
Identifier int `json:"identifier"`
RetainHandling byte `json:"retain_handling"`
Identifier int `json:"identifier,omitempty"`
RetainHandling byte `json:"retain_handling,omitempty"`
Qos byte `json:"qos"`
RetainAsPublished bool `json:"retain_as_pub"`
NoLocal bool `json:"no_local"`
RetainAsPublished bool `json:"retain_as_pub,omitempty"`
NoLocal bool `json:"no_local,omitempty"`
}
// MarshalBinary encodes the values into a json string.

View File

@@ -89,7 +89,7 @@ var (
Filter: "a/b/c",
Qos: 1,
}
subscriptionJSON = []byte(`{"t":"subscription","id":"id","client":"mochi","filter":"a/b/c","identifier":0,"retain_handling":0,"qos":1,"retain_as_pub":false,"no_local":false}`)
subscriptionJSON = []byte(`{"t":"subscription","id":"id","client":"mochi","filter":"a/b/c","qos":1}`)
sysInfoStruct = SystemInfo{
T: "info",

View File

@@ -201,6 +201,7 @@ const (
TDisconnect
TDisconnectTakeover
TDisconnectMqtt5
TDisconnectMqtt5DisconnectWithWillMessage
TDisconnectSecondConnect
TDisconnectReceiveMaximum
TDisconnectDropProperties
@@ -3781,6 +3782,31 @@ var TPacketData = map[byte]TPacketCases{
},
},
},
{
Case: TDisconnectMqtt5DisconnectWithWillMessage,
Desc: "mqtt5 disconnect with will message",
Primary: true,
RawBytes: append([]byte{
Disconnect << 4, 38, // fixed header
CodeDisconnectWillMessage.Code, // Reason Code
36, // Properties Length
17, 0, 0, 0, 120, // Session Expiry Interval (17)
31, 0, 28, // Reason String (31)
}, []byte(CodeDisconnectWillMessage.Reason)...),
Packet: &Packet{
ProtocolVersion: 5,
FixedHeader: FixedHeader{
Type: Disconnect,
Remaining: 22,
},
ReasonCode: CodeDisconnectWillMessage.Code,
Properties: Properties{
ReasonString: CodeDisconnectWillMessage.Reason,
SessionExpiryInterval: 120,
SessionExpiryIntervalFlag: true,
},
},
},
{
Case: TDisconnectSecondConnect,
Desc: "second connect packet mqtt5",

View File

@@ -27,7 +27,7 @@ import (
)
const (
Version = "2.6.2" // the current server version.
Version = "2.6.6" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
LocalListener = "local"
InlineClientId = "inline"
@@ -112,7 +112,7 @@ type Options struct {
// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"`
// Logger specifies a custom configured implementation of zerolog to override
// Logger specifies a custom configured implementation of log/slog to override
// the servers default logger configuration. If you wish to change the log level,
// of the default logger, you can do so by setting:
// server := mqtt.New(nil)
@@ -485,7 +485,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, err, expire)
if expire && atomic.LoadUint32(&cl.State.isTakenOver) == 0 {
if expire && !cl.IsTakenOver() {
cl.ClearInflights()
s.UnsubscribeClient(cl)
s.Clients.Delete(cl.ID) // [MQTT-4.1.0-2] ![MQTT-3.1.2-23]
@@ -560,16 +560,16 @@ func (s *Server) validateConnect(cl *Client, pk packets.Packet) packets.Code {
// connection ID. If clean is true, the state of any previously existing client
// session is abandoned.
func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
if existing, ok := s.Clients.Get(pk.Connect.ClientIdentifier); ok {
if existing, ok := s.Clients.Get(cl.ID); ok {
_ = s.DisconnectClient(existing, packets.ErrSessionTakenOver) // [MQTT-3.1.4-3]
if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
s.UnsubscribeClient(existing)
existing.ClearInflights()
atomic.StoreUint32(&existing.State.isTakenOver, 1) // only set isTakenOver after unsubscribe has occurred
return false // [MQTT-3.2.2-3]
existing.State.isTakenOver.Store(true) // only set isTakenOver after unsubscribe has occurred
return false // [MQTT-3.2.2-3]
}
atomic.StoreUint32(&existing.State.isTakenOver, 1)
existing.State.isTakenOver.Store(true)
if existing.State.Inflight.Len() > 0 {
cl.State.Inflight = existing.State.Inflight.Clone() // [MQTT-3.1.2-5]
if cl.State.Inflight.maximumReceiveQuota == 0 && cl.ops.options.Capabilities.ReceiveMaximum != 0 {
@@ -1358,7 +1358,7 @@ func (s *Server) UnsubscribeClient(cl *Client) {
cl.State.Subscriptions.Delete(k)
}
if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
if cl.IsTakenOver() {
return
}
@@ -1393,6 +1393,10 @@ func (s *Server) processDisconnect(cl *Client, pk packets.Packet) error {
cl.Properties.Props.SessionExpiryIntervalFlag = true
}
if pk.ReasonCode == packets.CodeDisconnectWillMessage.Code { // [MQTT-3.1.2.5] Non-normative comment
return packets.CodeDisconnectWillMessage
}
s.loop.willDelayed.Delete(cl.ID) // [MQTT-3.1.3-9] [MQTT-3.1.2-8]
cl.Stop(packets.CodeDisconnect) // [MQTT-3.14.4-2]
@@ -1668,7 +1672,7 @@ func (s *Server) loadClients(v []storage.Client) {
// loadInflight restores inflight messages from the datastore.
func (s *Server) loadInflight(v []storage.Message) {
for _, msg := range v {
if client, ok := s.Clients.Get(msg.Origin); ok {
if client, ok := s.Clients.Get(msg.Client); ok {
client.State.Inflight.Set(msg.ToPacket())
}
}
@@ -1685,7 +1689,7 @@ func (s *Server) loadRetained(v []storage.Message) {
// than their given expiry intervals.
func (s *Server) clearExpiredClients(dt int64) {
for id, client := range s.Clients.GetAll() {
disconnected := atomic.LoadInt64(&client.State.disconnected)
disconnected := client.StopTime()
if disconnected == 0 {
continue
}

View File

@@ -667,6 +667,7 @@ func TestEstablishConnectionInheritExisting(t *testing.T) {
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
require.True(t, ok)
require.NotEmpty(t, clw.State.Subscriptions)
require.True(t, cl.IsTakenOver())
// Prevent sequential takeover memory-bloom.
require.Empty(t, cl.State.Subscriptions.GetAll())
@@ -761,6 +762,9 @@ func TestEstablishConnectionInheritExistingTrueTakeover(t *testing.T) {
_, _ = w2.Write(packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes)
require.NoError(t, <-o2)
require.True(t, clp1.IsTakenOver())
require.False(t, clp2.IsTakenOver())
}
func TestEstablishConnectionResentPendingInflightsError(t *testing.T) {
@@ -848,12 +852,15 @@ func TestEstablishConnectionInheritExistingClean(t *testing.T) {
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackAcceptedNoSession).RawBytes, <-recv)
require.Equal(t, packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes, <-takeover)
require.True(t, cl.IsTakenOver())
_ = w.Close()
_ = r.Close()
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
require.True(t, ok)
require.Equal(t, 0, clw.State.Subscriptions.Len())
}
func TestEstablishConnectionBadAuthentication(t *testing.T) {
@@ -2258,7 +2265,7 @@ func TestPublishToSubscribersExhaustedSendQuota(t *testing.T) {
require.True(t, subbed)
// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
// can we hook into log/slog ?
_ = r.Close()
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.PacketID = 0
@@ -2279,7 +2286,7 @@ func TestPublishToSubscribersExhaustedPacketIDs(t *testing.T) {
require.True(t, subbed)
// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
// can we hook into log/slog ?
_ = r.Close()
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.PacketID = 0
@@ -2296,7 +2303,7 @@ func TestPublishToSubscribersNoConnection(t *testing.T) {
require.True(t, subbed)
// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
// can we hook into log/slog ?
_ = r.Close()
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
time.Sleep(time.Millisecond)
@@ -3138,6 +3145,22 @@ func TestServerProcessPacketDisconnectNonZeroExpiryViolation(t *testing.T) {
require.ErrorIs(t, err, packets.ErrProtocolViolationZeroNonZeroExpiry)
}
func TestServerProcessPacketDisconnectDisconnectWithWillMessage(t *testing.T) {
s := newServer()
cl, _, _ := newTestClient()
cl.Properties.Props.SessionExpiryInterval = 30
cl.Properties.ProtocolVersion = 5
s.loop.willDelayed.Add(cl.ID, packets.Packet{TopicName: "a/b/c", Payload: []byte("hello")})
require.Equal(t, 1, s.loop.willDelayed.Len())
err := s.processPacket(cl, *packets.TPacketData[packets.Disconnect].Get(packets.TDisconnectMqtt5DisconnectWithWillMessage).Packet)
require.Error(t, err)
require.Equal(t, 1, s.loop.willDelayed.Len())
require.False(t, cl.Closed())
}
func TestServerProcessPacketAuth(t *testing.T) {
s := newServer()
cl, r, w := newTestClient()
@@ -3400,10 +3423,10 @@ func TestServerLoadInflightMessages(t *testing.T) {
require.Equal(t, 3, s.Clients.Len())
v := []storage.Message{
{Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
{Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
{Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
{Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
}
s.loadInflight(v)