mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-26 20:21:12 +08:00
Compare commits
20 Commits
v2.6.2
...
f52990691f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f52990691f | ||
![]() |
9441e92595 | ||
![]() |
bfaf78332b | ||
![]() |
043906876e | ||
![]() |
dcb814cedf | ||
![]() |
8f52b891d5 | ||
![]() |
47536b77f2 | ||
![]() |
00601ca982 | ||
![]() |
830de149dc | ||
![]() |
34f9370f8c | ||
![]() |
dc272d2c36 | ||
![]() |
82c96fa4e3 | ||
![]() |
01f81ebeee | ||
![]() |
cc3f827fc1 | ||
![]() |
5966c7fe0d | ||
![]() |
b26e03a433 | ||
![]() |
6fc4027a78 | ||
![]() |
b9d2dfb824 | ||
![]() |
d5d9b02b28 | ||
![]() |
64ea905c41 |
@@ -149,7 +149,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// 在标1883端口上创建一个 TCP 服务端。
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -197,6 +197,7 @@ func main() {
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
Capabilities: mqtt.Capabilities{
|
||||
MaximumSessionExpiryInterval: 3600,
|
||||
MaximumClientWritesPending: 3,
|
||||
Compatibilities: mqtt.Compatibilities{
|
||||
ObscureNotAuthorized: true,
|
||||
},
|
||||
@@ -207,7 +208,7 @@ server := mqtt.New(&mqtt.Options{
|
||||
InlineClient: false,
|
||||
})
|
||||
```
|
||||
请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。
|
||||
请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。 ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。其中 Capabilities.MaximumClientWritesPending 的大小会影响服务器运行内存占用,如果 IoT 设备同时在线的数量比较多,设置的值很大,尽管没有收发数据,服务器运行内存占用也会增加很多,默认该数值为 1024*8 ,可以根据实际情况调整该参数。
|
||||
|
||||
### 默认配置说明(Default Configuration Notes)
|
||||
|
||||
@@ -414,7 +415,7 @@ if err != nil {
|
||||
|
||||
### 内联客户端 (Inline Client v2.4.0+支持)
|
||||
|
||||
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
|
||||
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。目前,内联客户端暂时还不支持共享订阅。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
|
||||
|
||||
```go
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
|
@@ -119,7 +119,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// Create a TCP listener on a standard port.
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -168,6 +168,7 @@ TLSを設定するには`*listeners.Config`を渡すことができます。
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
Capabilities: mqtt.Capabilities{
|
||||
MaximumSessionExpiryInterval: 3600,
|
||||
MaximumClientWritesPending: 3,
|
||||
Compatibilities: mqtt.Compatibilities{
|
||||
ObscureNotAuthorized: true,
|
||||
},
|
||||
@@ -181,6 +182,7 @@ server := mqtt.New(&mqtt.Options{
|
||||
|
||||
mqtt.Options、mqtt.Capabilities、mqtt.Compatibilitiesの構造体はオプションの理解に役立ちます。
|
||||
必要に応じて`ClientNetWriteBufferSize`と`ClientNetReadBufferSize`はクライアントの使用するメモリに合わせて設定できます。
|
||||
`Capabilities.MaximumClientWritesPending`のサイズは、サーバーのメモリ使用量に影響を与えます。IoTデバイスが同時にオンラインで多数存在する場合、また設定値が非常に大きい場合、データの送受信がなくても、サーバーのメモリ使用量は大幅に増加します。デフォルト値は1024*8で、実際の状況に応じてこのパラメータを調整することができます。
|
||||
|
||||
### デフォルト設定に関する注意事項
|
||||
|
||||
|
@@ -149,7 +149,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// Create a TCP listener on a standard port.
|
||||
tcp := listeners.NewTCP("t1", ":1883", nil)
|
||||
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
@@ -198,6 +198,7 @@ A number of configurable options are available which can be used to alter the be
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
Capabilities: mqtt.Capabilities{
|
||||
MaximumSessionExpiryInterval: 3600,
|
||||
MaximumClientWritesPending: 3,
|
||||
Compatibilities: mqtt.Compatibilities{
|
||||
ObscureNotAuthorized: true,
|
||||
},
|
||||
@@ -209,7 +210,7 @@ server := mqtt.New(&mqtt.Options{
|
||||
})
|
||||
```
|
||||
|
||||
Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs.
|
||||
Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs. The size of `Capabilities.MaximumClientWritesPending` will affect the memory usage of the server. If the number of IoT devices online at the same time is large, and the set value is very large, even if there is no data transmission, the memory usage of the server will increase a lot. The default value is 1024*8, and this parameter can be adjusted according to the actual situation.
|
||||
|
||||
### Default Configuration Notes
|
||||
|
||||
@@ -399,7 +400,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found
|
||||
If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`.
|
||||
|
||||
### Inline Client (v2.4.0+)
|
||||
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
|
||||
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
|
||||
```go
|
||||
server := mqtt.New(&mqtt.Options{
|
||||
InlineClient: true,
|
||||
|
11
clients.go
11
clients.go
@@ -150,7 +150,7 @@ type ClientState struct {
|
||||
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
|
||||
outbound chan *packets.Packet // queue for pending outbound packets
|
||||
endOnce sync.Once // only end once
|
||||
isTakenOver uint32 // used to identify orphaned clients
|
||||
isTakenOver atomic.Bool // used to identify orphaned clients
|
||||
packetID uint32 // the current highest packetID
|
||||
open context.Context // indicate that the client is open for packet exchange
|
||||
cancelOpen context.CancelFunc // cancel function for open context
|
||||
@@ -417,11 +417,20 @@ func (cl *Client) StopCause() error {
|
||||
return cl.State.stopCause.Load().(error)
|
||||
}
|
||||
|
||||
// StopTime returns the the time the client disconnected in unix time, else zero.
|
||||
func (cl *Client) StopTime() int64 {
|
||||
return atomic.LoadInt64(&cl.State.disconnected)
|
||||
}
|
||||
|
||||
// Closed returns true if client connection is closed.
|
||||
func (cl *Client) Closed() bool {
|
||||
return cl.State.open == nil || cl.State.open.Err() != nil
|
||||
}
|
||||
|
||||
func (cl *Client) IsTakenOver() bool {
|
||||
return cl.State.isTakenOver.Load()
|
||||
}
|
||||
|
||||
// ReadFixedHeader reads in the values of the next packet's fixed header.
|
||||
func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
|
||||
if cl.Net.bconn == nil {
|
||||
|
@@ -583,9 +583,11 @@ func TestClientReadDone(t *testing.T) {
|
||||
|
||||
func TestClientStop(t *testing.T) {
|
||||
cl, _, _ := newTestClient()
|
||||
require.Equal(t, int64(0), cl.StopTime())
|
||||
cl.Stop(nil)
|
||||
require.Equal(t, nil, cl.State.stopCause.Load())
|
||||
require.Equal(t, time.Now().Unix(), cl.State.disconnected)
|
||||
require.InDelta(t, time.Now().Unix(), cl.State.disconnected, 1.0)
|
||||
require.Equal(t, cl.State.disconnected, cl.StopTime())
|
||||
require.True(t, cl.Closed())
|
||||
require.Equal(t, nil, cl.StopCause())
|
||||
}
|
||||
@@ -597,6 +599,13 @@ func TestClientClosed(t *testing.T) {
|
||||
require.True(t, cl.Closed())
|
||||
}
|
||||
|
||||
func TestClientIsTakenOver(t *testing.T) {
|
||||
cl, _, _ := newTestClient()
|
||||
require.False(t, cl.IsTakenOver())
|
||||
cl.State.isTakenOver.Store(true)
|
||||
require.True(t, cl.IsTakenOver())
|
||||
}
|
||||
|
||||
func TestClientReadFixedHeaderError(t *testing.T) {
|
||||
cl, r, _ := newTestClient()
|
||||
defer cl.Stop(errClientStop)
|
||||
|
20
cmd/main.go
20
cmd/main.go
@@ -5,6 +5,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
@@ -20,6 +21,8 @@ func main() {
|
||||
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
|
||||
wsAddr := flag.String("ws", ":1882", "network address for Websocket listener")
|
||||
infoAddr := flag.String("info", ":8080", "network address for web info dashboard listener")
|
||||
tlsCertFile := flag.String("tls-cert-file", "", "TLS certificate file")
|
||||
tlsKeyFile := flag.String("tls-key-file", "", "TLS key file")
|
||||
flag.Parse()
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
@@ -30,12 +33,25 @@ func main() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
var tlsConfig *tls.Config
|
||||
|
||||
if tlsCertFile != nil && tlsKeyFile != nil && *tlsCertFile != "" && *tlsKeyFile != "" {
|
||||
cert, err := tls.LoadX509KeyPair(*tlsCertFile, *tlsKeyFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tlsConfig = &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}
|
||||
}
|
||||
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
tcp := listeners.NewTCP(listeners.Config{
|
||||
ID: "t1",
|
||||
Address: *tcpAddr,
|
||||
ID: "t1",
|
||||
Address: *tcpAddr,
|
||||
TLSConfig: tlsConfig,
|
||||
})
|
||||
err := server.AddListener(tcp)
|
||||
if err != nil {
|
||||
|
@@ -6,6 +6,8 @@ package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
"github.com/mochi-mqtt/server/v2/hooks/auth"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/debug"
|
||||
@@ -21,9 +23,27 @@ import (
|
||||
|
||||
// config defines the structure of configuration data to be parsed from a config source.
|
||||
type config struct {
|
||||
Options mqtt.Options
|
||||
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
|
||||
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
|
||||
Options mqtt.Options
|
||||
Listeners []listeners.Config `yaml:"listeners" json:"listeners"`
|
||||
HookConfigs HookConfigs `yaml:"hooks" json:"hooks"`
|
||||
LoggingConfig LoggingConfig `yaml:"logging" json:"logging"`
|
||||
}
|
||||
|
||||
type LoggingConfig struct {
|
||||
Level string
|
||||
}
|
||||
|
||||
func (lc LoggingConfig) ToLogger() *slog.Logger {
|
||||
var level slog.Level
|
||||
if err := level.UnmarshalText([]byte(lc.Level)); err != nil {
|
||||
level = slog.LevelInfo
|
||||
}
|
||||
|
||||
leveler := new(slog.LevelVar)
|
||||
leveler.Set(level)
|
||||
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: leveler,
|
||||
}))
|
||||
}
|
||||
|
||||
// HookConfigs contains configurations to enable individual hooks.
|
||||
@@ -149,6 +169,7 @@ func FromBytes(b []byte) (*mqtt.Options, error) {
|
||||
o = c.Options
|
||||
o.Hooks = c.HookConfigs.ToHooks()
|
||||
o.Listeners = c.Listeners
|
||||
o.Logger = c.LoggingConfig.ToLogger()
|
||||
|
||||
return &o, nil
|
||||
}
|
||||
|
@@ -5,6 +5,8 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -60,7 +62,6 @@ options:
|
||||
}
|
||||
}
|
||||
`)
|
||||
|
||||
parsedOptions = mqtt.Options{
|
||||
Listeners: []listeners.Config{
|
||||
{
|
||||
@@ -81,6 +82,9 @@ options:
|
||||
RestoreSysInfoOnRestart: true,
|
||||
},
|
||||
},
|
||||
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: new(slog.LevelVar),
|
||||
})),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -176,7 +180,8 @@ func TestToHooksStorageBolt(t *testing.T) {
|
||||
hc := HookConfigs{
|
||||
Storage: &HookStorageConfig{
|
||||
Bolt: &bolt.Options{
|
||||
Path: "bolt",
|
||||
Path: "bolt",
|
||||
Bucket: "mochi",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@@ -31,7 +31,8 @@
|
||||
"gc_discard_ratio": 0.5
|
||||
},
|
||||
"bolt": {
|
||||
"path": "bolt.db"
|
||||
"path": "bolt.db",
|
||||
"bucket": "mochi"
|
||||
},
|
||||
"redis": {
|
||||
"h_prefix": "mc",
|
||||
|
@@ -21,6 +21,7 @@ hooks:
|
||||
mode: "NoSync"
|
||||
bolt:
|
||||
path: bolt.db
|
||||
bucket: "mochi"
|
||||
redis:
|
||||
h_prefix: "mc"
|
||||
username: "mochi"
|
||||
@@ -65,3 +66,5 @@ options:
|
||||
always_return_response_info: false
|
||||
restore_sys_info_on_restart: false
|
||||
no_inherited_properties_on_ack: false
|
||||
logging:
|
||||
level: INFO
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package main
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package main
|
||||
|
||||
@@ -19,6 +19,9 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
boltPath := ".bolt"
|
||||
defer os.RemoveAll(boltPath) // remove the example db files at the end
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
done := make(chan bool, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
@@ -31,7 +34,7 @@ func main() {
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
err := server.AddHook(new(bolt.Hook), &bolt.Options{
|
||||
Path: "bolt.db",
|
||||
Path: boltPath,
|
||||
Options: &bbolt.Options{
|
||||
Timeout: 500 * time.Millisecond,
|
||||
},
|
||||
|
@@ -57,7 +57,10 @@ func main() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
|
||||
// Load tls cert from your cert file
|
||||
cert, err := tls.LoadX509KeyPair("replace_your_cert.pem", "replace_your_cert.key")
|
||||
|
||||
//cert, err := tls.X509KeyPair(testCertificate, testPrivateKey)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
10
go.mod
10
go.mod
@@ -4,8 +4,6 @@ go 1.21
|
||||
|
||||
require (
|
||||
github.com/alicebob/miniredis/v2 v2.23.0
|
||||
github.com/asdine/storm v2.1.2+incompatible
|
||||
github.com/asdine/storm/v3 v3.2.1
|
||||
github.com/cockroachdb/pebble v1.1.0
|
||||
github.com/dgraph-io/badger/v4 v4.2.0
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
@@ -32,7 +30,7 @@ require (
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/getsentry/sentry-go v0.18.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/glog v1.0.0 // indirect
|
||||
github.com/golang/glog v1.2.4 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
@@ -51,8 +49,8 @@ require (
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
|
||||
go.opencensus.io v0.22.5 // indirect
|
||||
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/net v0.33.0 // indirect
|
||||
golang.org/x/sys v0.28.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
google.golang.org/protobuf v1.33.0 // indirect
|
||||
)
|
||||
|
37
go.sum
37
go.sum
@@ -33,11 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
|
||||
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863 h1:BRrxwOZBolJN4gIwvZMJY1tzqBvQgpaZiQRuIDD40jM=
|
||||
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863/go.mod h1:D0JMgToj/WdxCgd30Kc1UcA9E+WdZoJqeVOuYW7iTBM=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -47,10 +44,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
|
||||
github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw=
|
||||
github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
|
||||
github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q=
|
||||
github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ=
|
||||
github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac=
|
||||
github.com/asdine/storm/v3 v3.2.1/go.mod h1:LEpXwGt4pIqrE/XcTvCnZHT5MgZCV6Ub9q7yQzOFWr0=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
@@ -117,8 +110,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
|
||||
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
|
||||
github.com/golang/glog v1.2.4 h1:CNNw5U8lSiiBk7druxtSHHTsRWcxKoac6kZKm2peBBc=
|
||||
github.com/golang/glog v1.2.4/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
|
||||
@@ -147,7 +140,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
@@ -163,8 +155,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
@@ -275,15 +267,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
|
||||
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
|
||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
|
||||
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
|
||||
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
|
||||
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
|
||||
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
@@ -345,7 +334,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@@ -362,8 +350,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
|
||||
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@@ -380,8 +368,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
@@ -421,8 +409,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@@ -430,8 +418,8 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
@@ -502,7 +490,6 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
|
||||
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
|
||||
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
|
||||
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
|
2
hooks.go
2
hooks.go
@@ -405,6 +405,8 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
|
||||
"hook", hook.ID(),
|
||||
"packet", pkx)
|
||||
return pk, err
|
||||
} else if errors.Is(err, packets.CodeSuccessIgnore) {
|
||||
return pk, err
|
||||
}
|
||||
h.Log.Error("publish packet error",
|
||||
"error", err,
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co, gsagula
|
||||
// SPDX-FileContributor: mochi-co, gsagula, werbenhu
|
||||
|
||||
package badger
|
||||
|
||||
@@ -292,6 +292,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -319,6 +320,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
PacketID: pk.PacketID,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
@@ -490,23 +492,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
||||
}
|
||||
|
||||
// Errorf satisfies the badger interface for an error logger.
|
||||
func (h *Hook) Errorf(m string, v ...interface{}) {
|
||||
func (h *Hook) Errorf(m string, v ...any) {
|
||||
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
|
||||
}
|
||||
|
||||
// Warningf satisfies the badger interface for a warning logger.
|
||||
func (h *Hook) Warningf(m string, v ...interface{}) {
|
||||
func (h *Hook) Warningf(m string, v ...any) {
|
||||
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Infof satisfies the badger interface for an info logger.
|
||||
func (h *Hook) Infof(m string, v ...interface{}) {
|
||||
func (h *Hook) Infof(m string, v ...any) {
|
||||
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Debugf satisfies the badger interface for a debug logger.
|
||||
func (h *Hook) Debugf(m string, v ...interface{}) {
|
||||
func (h *Hook) Debugf(m string, v ...any) {
|
||||
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package badger
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
// Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
|
||||
package bolt
|
||||
@@ -14,23 +14,27 @@ import (
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage"
|
||||
"github.com/mochi-mqtt/server/v2/packets"
|
||||
"github.com/mochi-mqtt/server/v2/system"
|
||||
|
||||
sgob "github.com/asdine/storm/codec/gob"
|
||||
"github.com/asdine/storm/v3"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrBucketNotFound = errors.New("bucket not found")
|
||||
ErrKeyNotFound = errors.New("key not found")
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultDbFile is the default file path for the boltdb file.
|
||||
defaultDbFile = "bolt.db"
|
||||
defaultDbFile = ".bolt"
|
||||
|
||||
// defaultTimeout is the default time to hold a connection to the file.
|
||||
defaultTimeout = 250 * time.Millisecond
|
||||
|
||||
defaultBucket = "mochi"
|
||||
)
|
||||
|
||||
// clientKey returns a primary key for a client.
|
||||
func clientKey(cl *mqtt.Client) string {
|
||||
return cl.ID
|
||||
return storage.ClientKey + "_" + cl.ID
|
||||
}
|
||||
|
||||
// subscriptionKey returns a primary key for a subscription.
|
||||
@@ -56,6 +60,7 @@ func sysInfoKey() string {
|
||||
// Options contains configuration settings for the bolt instance.
|
||||
type Options struct {
|
||||
Options *bbolt.Options
|
||||
Bucket string `yaml:"bucket" json:"bucket"`
|
||||
Path string `yaml:"path" json:"path"`
|
||||
}
|
||||
|
||||
@@ -63,7 +68,7 @@ type Options struct {
|
||||
type Hook struct {
|
||||
mqtt.HookBase
|
||||
config *Options // options for configuring the boltdb instance.
|
||||
db *storm.DB // the boltdb instance.
|
||||
db *bbolt.DB // the boltdb instance.
|
||||
}
|
||||
|
||||
// ID returns the id of the hook.
|
||||
@@ -110,22 +115,32 @@ func (h *Hook) Init(config any) error {
|
||||
Timeout: defaultTimeout,
|
||||
}
|
||||
}
|
||||
if h.config.Path == "" {
|
||||
if len(h.config.Path) == 0 {
|
||||
h.config.Path = defaultDbFile
|
||||
}
|
||||
|
||||
if len(h.config.Bucket) == 0 {
|
||||
h.config.Bucket = defaultBucket
|
||||
}
|
||||
|
||||
var err error
|
||||
h.db, err = storm.Open(h.config.Path, storm.BoltOptions(0600, h.config.Options), storm.Codec(sgob.Codec))
|
||||
h.db, err = bbolt.Open(h.config.Path, 0600, h.config.Options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
err = h.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists([]byte(h.config.Bucket))
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop closes the boltdb instance.
|
||||
func (h *Hook) Stop() error {
|
||||
return h.db.Close()
|
||||
err := h.db.Close()
|
||||
h.db = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// OnSessionEstablished adds a client to the store when their session is established.
|
||||
@@ -147,7 +162,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
|
||||
|
||||
props := cl.Properties.Props.Copy(false)
|
||||
in := &storage.Client{
|
||||
ID: clientKey(cl),
|
||||
ID: cl.ID,
|
||||
T: storage.ClientKey,
|
||||
Remote: cl.Net.Remote,
|
||||
Listener: cl.Net.Listener,
|
||||
@@ -167,10 +182,8 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
|
||||
},
|
||||
Will: storage.ClientWill(cl.Properties.Will),
|
||||
}
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save client data", "error", err, "data", in)
|
||||
}
|
||||
|
||||
_ = h.setKv(clientKey(cl), in)
|
||||
}
|
||||
|
||||
// OnDisconnect removes a client from the store if they were using a clean session.
|
||||
@@ -188,10 +201,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
|
||||
return
|
||||
}
|
||||
|
||||
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
h.Log.Error("failed to delete client", "error", err, "id", clientKey(cl))
|
||||
}
|
||||
_ = h.delKv(clientKey(cl))
|
||||
}
|
||||
|
||||
// OnSubscribed adds one or more client subscriptions to the store.
|
||||
@@ -214,11 +224,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by
|
||||
RetainHandling: pk.Filters[i].RetainHandling,
|
||||
RetainAsPublished: pk.Filters[i].RetainAsPublished,
|
||||
}
|
||||
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save subscription data", "error", err, "client", cl.ID, "data", in)
|
||||
}
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,12 +236,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
|
||||
}
|
||||
|
||||
for i := 0; i < len(pk.Filters); i++ {
|
||||
err := h.db.DeleteStruct(&storage.Subscription{
|
||||
ID: subscriptionKey(cl, pk.Filters[i].Filter),
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete client", "error", err, "id", subscriptionKey(cl, pk.Filters[i].Filter))
|
||||
}
|
||||
_ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,12 +248,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
}
|
||||
|
||||
if r == -1 {
|
||||
err := h.db.DeleteStruct(&storage.Message{
|
||||
ID: retainedKey(pk.TopicName),
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(pk.TopicName))
|
||||
}
|
||||
_ = h.delKv(retainedKey(pk.TopicName))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -264,6 +260,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -276,10 +273,8 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
User: props.User,
|
||||
},
|
||||
}
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save retained publish data", "error", err, "client", cl.ID, "data", in)
|
||||
}
|
||||
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
|
||||
// OnQosPublish adds or updates an inflight message in the store.
|
||||
@@ -293,6 +288,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
TopicName: pk.TopicName,
|
||||
@@ -311,10 +307,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
},
|
||||
}
|
||||
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save qos inflight data", "error", err, "client", cl.ID, "data", in)
|
||||
}
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
|
||||
// OnQosComplete removes a resolved inflight message from the store.
|
||||
@@ -324,12 +317,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
|
||||
return
|
||||
}
|
||||
|
||||
err := h.db.DeleteStruct(&storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete inflight data", "error", err, "id", inflightKey(cl, pk))
|
||||
}
|
||||
_ = h.delKv(inflightKey(cl, pk))
|
||||
}
|
||||
|
||||
// OnQosDropped removes a dropped inflight message from the store.
|
||||
@@ -354,10 +342,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) {
|
||||
Info: *sys,
|
||||
}
|
||||
|
||||
err := h.db.Save(in)
|
||||
if err != nil {
|
||||
h.Log.Error("failed to save $SYS data", "error", err, "data", in)
|
||||
}
|
||||
_ = h.setKv(in.ID, in)
|
||||
}
|
||||
|
||||
// OnRetainedExpired deletes expired retained messages from the store.
|
||||
@@ -366,10 +351,7 @@ func (h *Hook) OnRetainedExpired(filter string) {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.db.DeleteStruct(&storage.Message{ID: retainedKey(filter)}); err != nil {
|
||||
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(filter))
|
||||
}
|
||||
_ = h.delKv(retainedKey(filter))
|
||||
}
|
||||
|
||||
// OnClientExpired deleted expired clients from the store.
|
||||
@@ -378,25 +360,24 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
}
|
||||
|
||||
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
h.Log.Error("failed to delete expired client", "error", err, "id", clientKey(cl))
|
||||
}
|
||||
_ = h.delKv(clientKey(cl))
|
||||
}
|
||||
|
||||
// StoredClients returns all stored clients from the store.
|
||||
func (h *Hook) StoredClients() (v []storage.Client, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.ClientKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.iterKv(storage.ClientKey, func(value []byte) error {
|
||||
obj := storage.Client{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return v, nil
|
||||
}
|
||||
|
||||
@@ -404,58 +385,142 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) {
|
||||
func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.SubscriptionKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
v = make([]storage.Subscription, 0)
|
||||
err = h.iterKv(storage.SubscriptionKey, func(value []byte) error {
|
||||
obj := storage.Subscription{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// StoredRetainedMessages returns all stored retained messages from the store.
|
||||
func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.RetainedKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
v = make([]storage.Message, 0)
|
||||
err = h.iterKv(storage.RetainedKey, func(value []byte) error {
|
||||
obj := storage.Message{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// StoredInflightMessages returns all stored inflight messages from the store.
|
||||
func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.Find("T", storage.InflightKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
v = make([]storage.Message, 0)
|
||||
err = h.iterKv(storage.InflightKey, func(value []byte) error {
|
||||
obj := storage.Message{}
|
||||
err = obj.UnmarshalBinary(value)
|
||||
if err == nil {
|
||||
v = append(v, obj)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// StoredSysInfo returns the system info from the store.
|
||||
func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
||||
if h.db == nil {
|
||||
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
|
||||
return
|
||||
return v, storage.ErrDBFileNotOpen
|
||||
}
|
||||
|
||||
err = h.db.One("ID", storage.SysInfoKey, &v)
|
||||
if err != nil && !errors.Is(err, storm.ErrNotFound) {
|
||||
err = h.getKv(storage.SysInfoKey, &v)
|
||||
if err != nil && !errors.Is(err, ErrKeyNotFound) {
|
||||
return
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// setKv stores a key-value pair in the database.
|
||||
func (h *Hook) setKv(k string, v storage.Serializable) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
data, _ := v.MarshalBinary()
|
||||
err := bucket.Put([]byte(k), data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to upsert data", "error", err, "key", k)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// delKv deletes a key-value pair from the database.
|
||||
func (h *Hook) delKv(k string) error {
|
||||
err := h.db.Update(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
err := bucket.Delete([]byte(k))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to delete data", "error", err, "key", k)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// getKv retrieves the value associated with a key from the database.
|
||||
func (h *Hook) getKv(k string, v storage.Serializable) error {
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
|
||||
value := bucket.Get([]byte(k))
|
||||
if value == nil {
|
||||
return ErrKeyNotFound
|
||||
}
|
||||
|
||||
return v.UnmarshalBinary(value)
|
||||
})
|
||||
if err != nil {
|
||||
h.Log.Error("failed to get data", "error", err, "key", k)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// iterKv iterates over key-value pairs with keys having the specified prefix in the database.
|
||||
func (h *Hook) iterKv(prefix string, visit func([]byte) error) error {
|
||||
err := h.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket([]byte(h.config.Bucket))
|
||||
|
||||
c := bucket.Cursor()
|
||||
for k, v := c.Seek([]byte(prefix)); k != nil && string(k[:len(prefix)]) == prefix; k, v = c.Next() {
|
||||
if err := visit(v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.Log.Error("failed to iter data", "error", err, "prefix", prefix)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@@ -1,10 +1,11 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: mochi-co, werbenhu
|
||||
|
||||
package bolt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log/slog"
|
||||
"os"
|
||||
"testing"
|
||||
@@ -15,7 +16,6 @@ import (
|
||||
"github.com/mochi-mqtt/server/v2/packets"
|
||||
"github.com/mochi-mqtt/server/v2/system"
|
||||
|
||||
"github.com/asdine/storm/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -45,7 +45,7 @@ func teardown(t *testing.T, path string, h *Hook) {
|
||||
|
||||
func TestClientKey(t *testing.T) {
|
||||
k := clientKey(&mqtt.Client{ID: "cl1"})
|
||||
require.Equal(t, "cl1", k)
|
||||
require.Equal(t, "CL_cl1", k)
|
||||
}
|
||||
|
||||
func TestSubscriptionKey(t *testing.T) {
|
||||
@@ -130,7 +130,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
|
||||
h.OnSessionEstablished(client, packets.Packet{})
|
||||
|
||||
r := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r)
|
||||
err = h.getKv(clientKey(client), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, client.ID, r.ID)
|
||||
require.Equal(t, client.Net.Remote, r.Remote)
|
||||
@@ -141,15 +141,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
|
||||
|
||||
h.OnDisconnect(client, nil, false)
|
||||
r2 := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r2)
|
||||
err = h.getKv(clientKey(client), r2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, client.ID, r.ID)
|
||||
|
||||
h.OnDisconnect(client, nil, true)
|
||||
r3 := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r3)
|
||||
err = h.getKv(clientKey(client), r3)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storm.ErrNotFound, err)
|
||||
require.ErrorIs(t, ErrKeyNotFound, err)
|
||||
require.Empty(t, r3.ID)
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ func TestOnWillSent(t *testing.T) {
|
||||
h.OnWillSent(c1, packets.Packet{})
|
||||
|
||||
r := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey(client), r)
|
||||
err = h.getKv(clientKey(client), r)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint32(1), r.Will.Flag)
|
||||
@@ -197,18 +197,18 @@ func TestOnClientExpired(t *testing.T) {
|
||||
cl := &mqtt.Client{ID: "cl1"}
|
||||
clientKey := clientKey(cl)
|
||||
|
||||
err = h.db.Save(&storage.Client{ID: cl.ID})
|
||||
err = h.setKv(clientKey, &storage.Client{ID: cl.ID})
|
||||
require.NoError(t, err)
|
||||
|
||||
r := new(storage.Client)
|
||||
err = h.db.One("ID", clientKey, r)
|
||||
err = h.getKv(clientKey, r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cl.ID, r.ID)
|
||||
|
||||
h.OnClientExpired(cl)
|
||||
err = h.db.One("ID", clientKey, r)
|
||||
err = h.getKv(clientKey, r)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storm.ErrNotFound, err)
|
||||
require.ErrorIs(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnClientExpiredClosedDB(t *testing.T) {
|
||||
@@ -274,16 +274,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) {
|
||||
h.OnSubscribed(client, pkf, []byte{0})
|
||||
r := new(storage.Subscription)
|
||||
|
||||
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, client.ID, r.Client)
|
||||
require.Equal(t, pkf.Filters[0].Filter, r.Filter)
|
||||
require.Equal(t, byte(0), r.Qos)
|
||||
|
||||
h.OnUnsubscribed(client, pkf)
|
||||
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnSubscribedNoDB(t *testing.T) {
|
||||
@@ -334,21 +334,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) {
|
||||
h.OnRetainMessage(client, pk, 1)
|
||||
|
||||
r := new(storage.Message)
|
||||
err = h.db.One("ID", retainedKey(pk.TopicName), r)
|
||||
err = h.getKv(retainedKey(pk.TopicName), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, pk.TopicName, r.TopicName)
|
||||
require.Equal(t, pk.Payload, r.Payload)
|
||||
|
||||
h.OnRetainMessage(client, pk, -1)
|
||||
err = h.db.One("ID", retainedKey(pk.TopicName), r)
|
||||
err = h.getKv(retainedKey(pk.TopicName), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
|
||||
// coverage: delete deleted
|
||||
h.OnRetainMessage(client, pk, -1)
|
||||
err = h.db.One("ID", retainedKey(pk.TopicName), r)
|
||||
err = h.getKv(retainedKey(pk.TopicName), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnRetainedExpired(t *testing.T) {
|
||||
@@ -364,18 +364,18 @@ func TestOnRetainedExpired(t *testing.T) {
|
||||
TopicName: "a/b/c",
|
||||
}
|
||||
|
||||
err = h.db.Save(m)
|
||||
err = h.setKv(m.ID, m)
|
||||
require.NoError(t, err)
|
||||
|
||||
r := new(storage.Message)
|
||||
err = h.db.One("ID", m.ID, r)
|
||||
err = h.getKv(m.ID, r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, m.TopicName, r.TopicName)
|
||||
|
||||
h.OnRetainedExpired(m.TopicName)
|
||||
err = h.db.One("ID", m.ID, r)
|
||||
err = h.getKv(m.ID, r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnRetainedExpiredClosedDB(t *testing.T) {
|
||||
@@ -427,7 +427,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
|
||||
h.OnQosPublish(client, pk, time.Now().Unix(), 0)
|
||||
|
||||
r := new(storage.Message)
|
||||
err = h.db.One("ID", inflightKey(client, pk), r)
|
||||
err = h.getKv(inflightKey(client, pk), r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, pk.TopicName, r.TopicName)
|
||||
require.Equal(t, pk.Payload, r.Payload)
|
||||
@@ -438,9 +438,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
|
||||
|
||||
// OnQosDropped is a passthrough to OnQosComplete here
|
||||
h.OnQosDropped(client, pk)
|
||||
err = h.db.One("ID", inflightKey(client, pk), r)
|
||||
err = h.getKv(inflightKey(client, pk), r)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, storm.ErrNotFound, err)
|
||||
require.Equal(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestOnQosPublishNoDB(t *testing.T) {
|
||||
@@ -494,7 +494,7 @@ func TestOnSysInfoTick(t *testing.T) {
|
||||
h.OnSysInfoTick(info)
|
||||
|
||||
r := new(storage.SystemInfo)
|
||||
err = h.db.One("ID", storage.SysInfoKey, r)
|
||||
err = h.getKv(storage.SysInfoKey, r)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, info.Version, r.Version)
|
||||
require.Equal(t, info.BytesReceived, r.BytesReceived)
|
||||
@@ -524,13 +524,13 @@ func TestStoredClients(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with clients
|
||||
err = h.db.Save(&storage.Client{ID: "cl1", T: storage.ClientKey})
|
||||
err = h.setKv(storage.ClientKey+"_"+"cl1", &storage.Client{ID: "cl1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Client{ID: "cl2", T: storage.ClientKey})
|
||||
err = h.setKv(storage.ClientKey+"_"+"cl2", &storage.Client{ID: "cl2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Client{ID: "cl3", T: storage.ClientKey})
|
||||
err = h.setKv(storage.ClientKey+"_"+"cl3", &storage.Client{ID: "cl3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredClients()
|
||||
@@ -546,7 +546,7 @@ func TestStoredClientsNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredClients()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredClientsClosedDB(t *testing.T) {
|
||||
@@ -557,7 +557,7 @@ func TestStoredClientsClosedDB(t *testing.T) {
|
||||
teardown(t, h.config.Path, h)
|
||||
v, err := h.StoredClients()
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSubscriptions(t *testing.T) {
|
||||
@@ -568,13 +568,13 @@ func TestStoredSubscriptions(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with subscriptions
|
||||
err = h.db.Save(&storage.Subscription{ID: "sub1", T: storage.SubscriptionKey})
|
||||
err = h.setKv(storage.SubscriptionKey+"_"+"sub1", &storage.Subscription{ID: "sub1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Subscription{ID: "sub2", T: storage.SubscriptionKey})
|
||||
err = h.setKv(storage.SubscriptionKey+"_"+"sub2", &storage.Subscription{ID: "sub2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Subscription{ID: "sub3", T: storage.SubscriptionKey})
|
||||
err = h.setKv(storage.SubscriptionKey+"_"+"sub3", &storage.Subscription{ID: "sub3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredSubscriptions()
|
||||
@@ -590,7 +590,7 @@ func TestStoredSubscriptionsNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredSubscriptions()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSubscriptionsClosedDB(t *testing.T) {
|
||||
@@ -601,7 +601,7 @@ func TestStoredSubscriptionsClosedDB(t *testing.T) {
|
||||
teardown(t, h.config.Path, h)
|
||||
v, err := h.StoredSubscriptions()
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredRetainedMessages(t *testing.T) {
|
||||
@@ -612,16 +612,16 @@ func TestStoredRetainedMessages(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with messages
|
||||
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "m2", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m2", &storage.Message{ID: "m2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "m3", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m3", &storage.Message{ID: "m3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredRetainedMessages()
|
||||
@@ -637,7 +637,7 @@ func TestStoredRetainedMessagesNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredRetainedMessages()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredRetainedMessagesClosedDB(t *testing.T) {
|
||||
@@ -659,16 +659,16 @@ func TestStoredInflightMessages(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with messages
|
||||
err = h.db.Save(&storage.Message{ID: "i1", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i1", &storage.Message{ID: "i1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "i2", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i2", &storage.Message{ID: "i2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
|
||||
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
|
||||
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
r, err := h.StoredInflightMessages()
|
||||
@@ -684,7 +684,7 @@ func TestStoredInflightMessagesNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredInflightMessages()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredInflightMessagesClosedDB(t *testing.T) {
|
||||
@@ -695,7 +695,7 @@ func TestStoredInflightMessagesClosedDB(t *testing.T) {
|
||||
teardown(t, h.config.Path, h)
|
||||
v, err := h.StoredInflightMessages()
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSysInfo(t *testing.T) {
|
||||
@@ -706,7 +706,7 @@ func TestStoredSysInfo(t *testing.T) {
|
||||
defer teardown(t, h.config.Path, h)
|
||||
|
||||
// populate with sys info
|
||||
err = h.db.Save(&storage.SystemInfo{
|
||||
err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{
|
||||
ID: storage.SysInfoKey,
|
||||
Info: system.Info{
|
||||
Version: "2.0.0",
|
||||
@@ -725,7 +725,7 @@ func TestStoredSysInfoNoDB(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
v, err := h.StoredSysInfo()
|
||||
require.Empty(t, v)
|
||||
require.NoError(t, err)
|
||||
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
|
||||
}
|
||||
|
||||
func TestStoredSysInfoClosedDB(t *testing.T) {
|
||||
@@ -738,3 +738,54 @@ func TestStoredSysInfoClosedDB(t *testing.T) {
|
||||
require.Empty(t, v)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGetSetDelKv(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
err := h.Init(nil)
|
||||
defer teardown(t, h.config.Path, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.setKv("testId", &storage.Client{ID: "testId"})
|
||||
require.NoError(t, err)
|
||||
|
||||
var obj storage.Client
|
||||
err = h.getKv("testId", &obj)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.delKv("testId")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = h.getKv("testId", &obj)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, ErrKeyNotFound, err)
|
||||
}
|
||||
|
||||
func TestIterKv(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
|
||||
err := h.Init(nil)
|
||||
defer teardown(t, h.config.Path, h)
|
||||
require.NoError(t, err)
|
||||
|
||||
h.setKv("prefix_a_1", &storage.Client{ID: "1"})
|
||||
h.setKv("prefix_a_2", &storage.Client{ID: "2"})
|
||||
h.setKv("prefix_b_2", &storage.Client{ID: "3"})
|
||||
|
||||
var clients []storage.Client
|
||||
err = h.iterKv("prefix_a", func(data []byte) error {
|
||||
var item storage.Client
|
||||
item.UnmarshalBinary(data)
|
||||
clients = append(clients, item)
|
||||
return nil
|
||||
})
|
||||
require.Equal(t, 2, len(clients))
|
||||
require.NoError(t, err)
|
||||
|
||||
visitErr := errors.New("iter visit error")
|
||||
err = h.iterKv("prefix_b", func(data []byte) error {
|
||||
return visitErr
|
||||
})
|
||||
require.ErrorIs(t, visitErr, err)
|
||||
}
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co, gsagula
|
||||
// SPDX-FileContributor: werbenhu
|
||||
|
||||
package pebble
|
||||
|
||||
@@ -268,6 +268,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -295,6 +296,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
PacketID: pk.PacketID,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
@@ -467,23 +469,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
|
||||
}
|
||||
|
||||
// Errorf satisfies the pebble interface for an error logger.
|
||||
func (h *Hook) Errorf(m string, v ...interface{}) {
|
||||
func (h *Hook) Errorf(m string, v ...any) {
|
||||
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
|
||||
}
|
||||
|
||||
// Warningf satisfies the pebble interface for a warning logger.
|
||||
func (h *Hook) Warningf(m string, v ...interface{}) {
|
||||
func (h *Hook) Warningf(m string, v ...any) {
|
||||
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Infof satisfies the pebble interface for an info logger.
|
||||
func (h *Hook) Infof(m string, v ...interface{}) {
|
||||
func (h *Hook) Infof(m string, v ...any) {
|
||||
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
// Debugf satisfies the pebble interface for a debug logger.
|
||||
func (h *Hook) Debugf(m string, v ...interface{}) {
|
||||
func (h *Hook) Debugf(m string, v ...any) {
|
||||
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
|
||||
}
|
||||
|
||||
|
@@ -1,6 +1,6 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
|
||||
// SPDX-FileContributor: mochi-co
|
||||
// SPDX-FileContributor: werbenhu
|
||||
|
||||
package pebble
|
||||
|
||||
|
@@ -287,6 +287,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
|
||||
TopicName: pk.TopicName,
|
||||
Payload: pk.Payload,
|
||||
Created: pk.Created,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
Properties: storage.MessageProperties{
|
||||
PayloadFormat: props.PayloadFormat,
|
||||
@@ -317,6 +318,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
|
||||
in := &storage.Message{
|
||||
ID: inflightKey(cl, pk),
|
||||
T: storage.InflightKey,
|
||||
Client: cl.ID,
|
||||
Origin: pk.Origin,
|
||||
FixedHeader: pk.FixedHeader,
|
||||
TopicName: pk.TopicName,
|
||||
|
@@ -46,28 +46,28 @@ type Client struct {
|
||||
|
||||
// ClientProperties contains a limited set of the mqtt v5 properties specific to a client connection.
|
||||
type ClientProperties struct {
|
||||
AuthenticationData []byte `json:"authenticationData"`
|
||||
User []packets.UserProperty `json:"user"`
|
||||
AuthenticationMethod string `json:"authenticationMethod"`
|
||||
SessionExpiryInterval uint32 `json:"sessionExpiryInterval"`
|
||||
MaximumPacketSize uint32 `json:"maximumPacketSize"`
|
||||
ReceiveMaximum uint16 `json:"receiveMaximum"`
|
||||
TopicAliasMaximum uint16 `json:"topicAliasMaximum"`
|
||||
SessionExpiryIntervalFlag bool `json:"sessionExpiryIntervalFlag"`
|
||||
RequestProblemInfo byte `json:"requestProblemInfo"`
|
||||
RequestProblemInfoFlag bool `json:"requestProblemInfoFlag"`
|
||||
RequestResponseInfo byte `json:"requestResponseInfo"`
|
||||
AuthenticationData []byte `json:"authenticationData,omitempty"`
|
||||
User []packets.UserProperty `json:"user,omitempty"`
|
||||
AuthenticationMethod string `json:"authenticationMethod,omitempty"`
|
||||
SessionExpiryInterval uint32 `json:"sessionExpiryInterval,omitempty"`
|
||||
MaximumPacketSize uint32 `json:"maximumPacketSize,omitempty"`
|
||||
ReceiveMaximum uint16 `json:"receiveMaximum,omitempty"`
|
||||
TopicAliasMaximum uint16 `json:"topicAliasMaximum,omitempty"`
|
||||
SessionExpiryIntervalFlag bool `json:"sessionExpiryIntervalFlag,omitempty"`
|
||||
RequestProblemInfo byte `json:"requestProblemInfo,omitempty"`
|
||||
RequestProblemInfoFlag bool `json:"requestProblemInfoFlag,omitempty"`
|
||||
RequestResponseInfo byte `json:"requestResponseInfo,omitempty"`
|
||||
}
|
||||
|
||||
// ClientWill contains a will message for a client, and limited mqtt v5 properties.
|
||||
type ClientWill struct {
|
||||
Payload []byte `json:"payload"`
|
||||
User []packets.UserProperty `json:"user"`
|
||||
TopicName string `json:"topicName"`
|
||||
Flag uint32 `json:"flag"`
|
||||
WillDelayInterval uint32 `json:"willDelayInterval"`
|
||||
Qos byte `json:"qos"`
|
||||
Retain bool `json:"retain"`
|
||||
Payload []byte `json:"payload,omitempty"`
|
||||
User []packets.UserProperty `json:"user,omitempty"`
|
||||
TopicName string `json:"topicName,omitempty"`
|
||||
Flag uint32 `json:"flag,omitempty"`
|
||||
WillDelayInterval uint32 `json:"willDelayInterval,omitempty"`
|
||||
Qos byte `json:"qos,omitempty"`
|
||||
Retain bool `json:"retain,omitempty"`
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the values into a json string.
|
||||
@@ -85,29 +85,30 @@ func (d *Client) UnmarshalBinary(data []byte) error {
|
||||
|
||||
// Message is a storable representation of an MQTT message (specifically publish).
|
||||
type Message struct {
|
||||
Properties MessageProperties `json:"properties"` // -
|
||||
Payload []byte `json:"payload"` // the message payload (if retained)
|
||||
T string `json:"t"` // the data type
|
||||
ID string `json:"id" storm:"id"` // the storage key
|
||||
Origin string `json:"origin"` // the id of the client who sent the message
|
||||
TopicName string `json:"topic_name"` // the topic the message was sent to (if retained)
|
||||
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message
|
||||
Created int64 `json:"created"` // the time the message was created in unixtime
|
||||
Sent int64 `json:"sent"` // the last time the message was sent (for retries) in unixtime (if inflight)
|
||||
PacketID uint16 `json:"packet_id"` // the unique id of the packet (if inflight)
|
||||
Properties MessageProperties `json:"properties"` // -
|
||||
Payload []byte `json:"payload"` // the message payload (if retained)
|
||||
T string `json:"t,omitempty"` // the data type
|
||||
ID string `json:"id,omitempty" storm:"id"` // the storage key
|
||||
Client string `json:"client,omitempty"` // the client id the message is for
|
||||
Origin string `json:"origin,omitempty"` // the id of the client who sent the message
|
||||
TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained)
|
||||
FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message
|
||||
Created int64 `json:"created,omitempty"` // the time the message was created in unixtime
|
||||
Sent int64 `json:"sent,omitempty"` // the last time the message was sent (for retries) in unixtime (if inflight)
|
||||
PacketID uint16 `json:"packet_id,omitempty"` // the unique id of the packet (if inflight)
|
||||
}
|
||||
|
||||
// MessageProperties contains a limited subset of mqtt v5 properties specific to publish messages.
|
||||
type MessageProperties struct {
|
||||
CorrelationData []byte `json:"correlationData"`
|
||||
SubscriptionIdentifier []int `json:"subscriptionIdentifier"`
|
||||
User []packets.UserProperty `json:"user"`
|
||||
ContentType string `json:"contentType"`
|
||||
ResponseTopic string `json:"responseTopic"`
|
||||
MessageExpiryInterval uint32 `json:"messageExpiry"`
|
||||
TopicAlias uint16 `json:"topicAlias"`
|
||||
PayloadFormat byte `json:"payloadFormat"`
|
||||
PayloadFormatFlag bool `json:"payloadFormatFlag"`
|
||||
CorrelationData []byte `json:"correlationData,omitempty"`
|
||||
SubscriptionIdentifier []int `json:"subscriptionIdentifier,omitempty"`
|
||||
User []packets.UserProperty `json:"user,omitempty"`
|
||||
ContentType string `json:"contentType,omitempty"`
|
||||
ResponseTopic string `json:"responseTopic,omitempty"`
|
||||
MessageExpiryInterval uint32 `json:"messageExpiry,omitempty"`
|
||||
TopicAlias uint16 `json:"topicAlias,omitempty"`
|
||||
PayloadFormat byte `json:"payloadFormat,omitempty"`
|
||||
PayloadFormatFlag bool `json:"payloadFormatFlag,omitempty"`
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the values into a json string.
|
||||
@@ -155,15 +156,15 @@ func (d *Message) ToPacket() packets.Packet {
|
||||
|
||||
// Subscription is a storable representation of an MQTT subscription.
|
||||
type Subscription struct {
|
||||
T string `json:"t"`
|
||||
ID string `json:"id" storm:"id"`
|
||||
Client string `json:"client"`
|
||||
T string `json:"t,omitempty"`
|
||||
ID string `json:"id,omitempty" storm:"id"`
|
||||
Client string `json:"client,omitempty"`
|
||||
Filter string `json:"filter"`
|
||||
Identifier int `json:"identifier"`
|
||||
RetainHandling byte `json:"retain_handling"`
|
||||
Identifier int `json:"identifier,omitempty"`
|
||||
RetainHandling byte `json:"retain_handling,omitempty"`
|
||||
Qos byte `json:"qos"`
|
||||
RetainAsPublished bool `json:"retain_as_pub"`
|
||||
NoLocal bool `json:"no_local"`
|
||||
RetainAsPublished bool `json:"retain_as_pub,omitempty"`
|
||||
NoLocal bool `json:"no_local,omitempty"`
|
||||
}
|
||||
|
||||
// MarshalBinary encodes the values into a json string.
|
||||
|
@@ -89,7 +89,7 @@ var (
|
||||
Filter: "a/b/c",
|
||||
Qos: 1,
|
||||
}
|
||||
subscriptionJSON = []byte(`{"t":"subscription","id":"id","client":"mochi","filter":"a/b/c","identifier":0,"retain_handling":0,"qos":1,"retain_as_pub":false,"no_local":false}`)
|
||||
subscriptionJSON = []byte(`{"t":"subscription","id":"id","client":"mochi","filter":"a/b/c","qos":1}`)
|
||||
|
||||
sysInfoStruct = SystemInfo{
|
||||
T: "info",
|
||||
|
@@ -201,6 +201,7 @@ const (
|
||||
TDisconnect
|
||||
TDisconnectTakeover
|
||||
TDisconnectMqtt5
|
||||
TDisconnectMqtt5DisconnectWithWillMessage
|
||||
TDisconnectSecondConnect
|
||||
TDisconnectReceiveMaximum
|
||||
TDisconnectDropProperties
|
||||
@@ -3781,6 +3782,31 @@ var TPacketData = map[byte]TPacketCases{
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Case: TDisconnectMqtt5DisconnectWithWillMessage,
|
||||
Desc: "mqtt5 disconnect with will message",
|
||||
Primary: true,
|
||||
RawBytes: append([]byte{
|
||||
Disconnect << 4, 38, // fixed header
|
||||
CodeDisconnectWillMessage.Code, // Reason Code
|
||||
36, // Properties Length
|
||||
17, 0, 0, 0, 120, // Session Expiry Interval (17)
|
||||
31, 0, 28, // Reason String (31)
|
||||
}, []byte(CodeDisconnectWillMessage.Reason)...),
|
||||
Packet: &Packet{
|
||||
ProtocolVersion: 5,
|
||||
FixedHeader: FixedHeader{
|
||||
Type: Disconnect,
|
||||
Remaining: 22,
|
||||
},
|
||||
ReasonCode: CodeDisconnectWillMessage.Code,
|
||||
Properties: Properties{
|
||||
ReasonString: CodeDisconnectWillMessage.Reason,
|
||||
SessionExpiryInterval: 120,
|
||||
SessionExpiryIntervalFlag: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Case: TDisconnectSecondConnect,
|
||||
Desc: "second connect packet mqtt5",
|
||||
|
24
server.go
24
server.go
@@ -27,7 +27,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "2.6.2" // the current server version.
|
||||
Version = "2.6.6" // the current server version.
|
||||
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
|
||||
LocalListener = "local"
|
||||
InlineClientId = "inline"
|
||||
@@ -112,7 +112,7 @@ type Options struct {
|
||||
// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
|
||||
ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"`
|
||||
|
||||
// Logger specifies a custom configured implementation of zerolog to override
|
||||
// Logger specifies a custom configured implementation of log/slog to override
|
||||
// the servers default logger configuration. If you wish to change the log level,
|
||||
// of the default logger, you can do so by setting:
|
||||
// server := mqtt.New(nil)
|
||||
@@ -485,7 +485,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
|
||||
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
|
||||
s.hooks.OnDisconnect(cl, err, expire)
|
||||
|
||||
if expire && atomic.LoadUint32(&cl.State.isTakenOver) == 0 {
|
||||
if expire && !cl.IsTakenOver() {
|
||||
cl.ClearInflights()
|
||||
s.UnsubscribeClient(cl)
|
||||
s.Clients.Delete(cl.ID) // [MQTT-4.1.0-2] ![MQTT-3.1.2-23]
|
||||
@@ -560,16 +560,16 @@ func (s *Server) validateConnect(cl *Client, pk packets.Packet) packets.Code {
|
||||
// connection ID. If clean is true, the state of any previously existing client
|
||||
// session is abandoned.
|
||||
func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
|
||||
if existing, ok := s.Clients.Get(pk.Connect.ClientIdentifier); ok {
|
||||
if existing, ok := s.Clients.Get(cl.ID); ok {
|
||||
_ = s.DisconnectClient(existing, packets.ErrSessionTakenOver) // [MQTT-3.1.4-3]
|
||||
if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
|
||||
s.UnsubscribeClient(existing)
|
||||
existing.ClearInflights()
|
||||
atomic.StoreUint32(&existing.State.isTakenOver, 1) // only set isTakenOver after unsubscribe has occurred
|
||||
return false // [MQTT-3.2.2-3]
|
||||
existing.State.isTakenOver.Store(true) // only set isTakenOver after unsubscribe has occurred
|
||||
return false // [MQTT-3.2.2-3]
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&existing.State.isTakenOver, 1)
|
||||
existing.State.isTakenOver.Store(true)
|
||||
if existing.State.Inflight.Len() > 0 {
|
||||
cl.State.Inflight = existing.State.Inflight.Clone() // [MQTT-3.1.2-5]
|
||||
if cl.State.Inflight.maximumReceiveQuota == 0 && cl.ops.options.Capabilities.ReceiveMaximum != 0 {
|
||||
@@ -1358,7 +1358,7 @@ func (s *Server) UnsubscribeClient(cl *Client) {
|
||||
cl.State.Subscriptions.Delete(k)
|
||||
}
|
||||
|
||||
if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
|
||||
if cl.IsTakenOver() {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1393,6 +1393,10 @@ func (s *Server) processDisconnect(cl *Client, pk packets.Packet) error {
|
||||
cl.Properties.Props.SessionExpiryIntervalFlag = true
|
||||
}
|
||||
|
||||
if pk.ReasonCode == packets.CodeDisconnectWillMessage.Code { // [MQTT-3.1.2.5] Non-normative comment
|
||||
return packets.CodeDisconnectWillMessage
|
||||
}
|
||||
|
||||
s.loop.willDelayed.Delete(cl.ID) // [MQTT-3.1.3-9] [MQTT-3.1.2-8]
|
||||
cl.Stop(packets.CodeDisconnect) // [MQTT-3.14.4-2]
|
||||
|
||||
@@ -1668,7 +1672,7 @@ func (s *Server) loadClients(v []storage.Client) {
|
||||
// loadInflight restores inflight messages from the datastore.
|
||||
func (s *Server) loadInflight(v []storage.Message) {
|
||||
for _, msg := range v {
|
||||
if client, ok := s.Clients.Get(msg.Origin); ok {
|
||||
if client, ok := s.Clients.Get(msg.Client); ok {
|
||||
client.State.Inflight.Set(msg.ToPacket())
|
||||
}
|
||||
}
|
||||
@@ -1685,7 +1689,7 @@ func (s *Server) loadRetained(v []storage.Message) {
|
||||
// than their given expiry intervals.
|
||||
func (s *Server) clearExpiredClients(dt int64) {
|
||||
for id, client := range s.Clients.GetAll() {
|
||||
disconnected := atomic.LoadInt64(&client.State.disconnected)
|
||||
disconnected := client.StopTime()
|
||||
if disconnected == 0 {
|
||||
continue
|
||||
}
|
||||
|
@@ -667,6 +667,7 @@ func TestEstablishConnectionInheritExisting(t *testing.T) {
|
||||
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, clw.State.Subscriptions)
|
||||
require.True(t, cl.IsTakenOver())
|
||||
|
||||
// Prevent sequential takeover memory-bloom.
|
||||
require.Empty(t, cl.State.Subscriptions.GetAll())
|
||||
@@ -761,6 +762,9 @@ func TestEstablishConnectionInheritExistingTrueTakeover(t *testing.T) {
|
||||
|
||||
_, _ = w2.Write(packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes)
|
||||
require.NoError(t, <-o2)
|
||||
|
||||
require.True(t, clp1.IsTakenOver())
|
||||
require.False(t, clp2.IsTakenOver())
|
||||
}
|
||||
|
||||
func TestEstablishConnectionResentPendingInflightsError(t *testing.T) {
|
||||
@@ -848,12 +852,15 @@ func TestEstablishConnectionInheritExistingClean(t *testing.T) {
|
||||
require.Equal(t, packets.TPacketData[packets.Connack].Get(packets.TConnackAcceptedNoSession).RawBytes, <-recv)
|
||||
require.Equal(t, packets.TPacketData[packets.Disconnect].Get(packets.TDisconnect).RawBytes, <-takeover)
|
||||
|
||||
require.True(t, cl.IsTakenOver())
|
||||
|
||||
_ = w.Close()
|
||||
_ = r.Close()
|
||||
|
||||
clw, ok := s.Clients.Get(packets.TPacketData[packets.Connect].Get(packets.TConnectMqtt311).Packet.Connect.ClientIdentifier)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 0, clw.State.Subscriptions.Len())
|
||||
|
||||
}
|
||||
|
||||
func TestEstablishConnectionBadAuthentication(t *testing.T) {
|
||||
@@ -2258,7 +2265,7 @@ func TestPublishToSubscribersExhaustedSendQuota(t *testing.T) {
|
||||
require.True(t, subbed)
|
||||
|
||||
// coverage: subscriber publish errors are non-returnable
|
||||
// can we hook into zerolog ?
|
||||
// can we hook into log/slog ?
|
||||
_ = r.Close()
|
||||
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
|
||||
pkx.PacketID = 0
|
||||
@@ -2279,7 +2286,7 @@ func TestPublishToSubscribersExhaustedPacketIDs(t *testing.T) {
|
||||
require.True(t, subbed)
|
||||
|
||||
// coverage: subscriber publish errors are non-returnable
|
||||
// can we hook into zerolog ?
|
||||
// can we hook into log/slog ?
|
||||
_ = r.Close()
|
||||
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
|
||||
pkx.PacketID = 0
|
||||
@@ -2296,7 +2303,7 @@ func TestPublishToSubscribersNoConnection(t *testing.T) {
|
||||
require.True(t, subbed)
|
||||
|
||||
// coverage: subscriber publish errors are non-returnable
|
||||
// can we hook into zerolog ?
|
||||
// can we hook into log/slog ?
|
||||
_ = r.Close()
|
||||
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
|
||||
time.Sleep(time.Millisecond)
|
||||
@@ -3138,6 +3145,22 @@ func TestServerProcessPacketDisconnectNonZeroExpiryViolation(t *testing.T) {
|
||||
require.ErrorIs(t, err, packets.ErrProtocolViolationZeroNonZeroExpiry)
|
||||
}
|
||||
|
||||
func TestServerProcessPacketDisconnectDisconnectWithWillMessage(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, _, _ := newTestClient()
|
||||
cl.Properties.Props.SessionExpiryInterval = 30
|
||||
cl.Properties.ProtocolVersion = 5
|
||||
|
||||
s.loop.willDelayed.Add(cl.ID, packets.Packet{TopicName: "a/b/c", Payload: []byte("hello")})
|
||||
require.Equal(t, 1, s.loop.willDelayed.Len())
|
||||
|
||||
err := s.processPacket(cl, *packets.TPacketData[packets.Disconnect].Get(packets.TDisconnectMqtt5DisconnectWithWillMessage).Packet)
|
||||
require.Error(t, err)
|
||||
|
||||
require.Equal(t, 1, s.loop.willDelayed.Len())
|
||||
require.False(t, cl.Closed())
|
||||
}
|
||||
|
||||
func TestServerProcessPacketAuth(t *testing.T) {
|
||||
s := newServer()
|
||||
cl, r, w := newTestClient()
|
||||
@@ -3400,10 +3423,10 @@ func TestServerLoadInflightMessages(t *testing.T) {
|
||||
require.Equal(t, 3, s.Clients.Len())
|
||||
|
||||
v := []storage.Message{
|
||||
{Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
|
||||
{Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Client: "mochi", Origin: "mochi", PacketID: 1, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Client: "mochi", Origin: "mochi", PacketID: 2, Payload: []byte("yes"), TopicName: "a/b/c"},
|
||||
{Client: "zen", Origin: "zen", PacketID: 3, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
{Client: "mochi-co", Origin: "mochi-co", PacketID: 4, Payload: []byte("hello world"), TopicName: "a/b/c"},
|
||||
}
|
||||
s.loadInflight(v)
|
||||
|
||||
|
Reference in New Issue
Block a user