Bypassing asdine/storm and directly using bbolt. (#392)

* Fix the bug where inline subscribers do not receive messages after all non-inline clients unsubscribe.

* Bypassing asdine/storm and directly using bbolt.

* Fixed erroneous removal of FileContributor.

---------

Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
This commit is contained in:
werben
2024-04-30 17:03:22 +08:00
committed by GitHub
parent 6fc4027a78
commit b26e03a433
15 changed files with 284 additions and 179 deletions

View File

@@ -414,7 +414,7 @@ if err != nil {
### 内联客户端 (Inline Client v2.4.0+支持)
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。目前,内联客户端暂时还不支持共享订阅。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用:
```go
server := mqtt.New(&mqtt.Options{

View File

@@ -399,7 +399,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found
If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`.
### Inline Client (v2.4.0+)
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:
```go
server := mqtt.New(&mqtt.Options{
InlineClient: true,

View File

@@ -180,7 +180,8 @@ func TestToHooksStorageBolt(t *testing.T) {
hc := HookConfigs{
Storage: &HookStorageConfig{
Bolt: &bolt.Options{
Path: "bolt",
Path: "bolt",
Bucket: "mochi",
},
},
}

View File

@@ -31,7 +31,8 @@
"gc_discard_ratio": 0.5
},
"bolt": {
"path": "bolt.db"
"path": "bolt.db",
"bucket": "mochi"
},
"redis": {
"h_prefix": "mc",

View File

@@ -21,6 +21,7 @@ hooks:
mode: "NoSync"
bolt:
path: bolt.db
bucket: "mochi"
redis:
h_prefix: "mc"
username: "mochi"

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package main

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package main
@@ -19,6 +19,9 @@ import (
)
func main() {
boltPath := ".bolt"
defer os.RemoveAll(boltPath) // remove the example db files at the end
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
@@ -31,7 +34,7 @@ func main() {
_ = server.AddHook(new(auth.AllowHook), nil)
err := server.AddHook(new(bolt.Hook), &bolt.Options{
Path: "bolt.db",
Path: boltPath,
Options: &bbolt.Options{
Timeout: 500 * time.Millisecond,
},

2
go.mod
View File

@@ -4,8 +4,6 @@ go 1.21
require (
github.com/alicebob/miniredis/v2 v2.23.0
github.com/asdine/storm v2.1.2+incompatible
github.com/asdine/storm/v3 v3.2.1
github.com/cockroachdb/pebble v1.1.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/go-redis/redis/v8 v8.11.5

13
go.sum
View File

@@ -33,11 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863 h1:BRrxwOZBolJN4gIwvZMJY1tzqBvQgpaZiQRuIDD40jM=
github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863/go.mod h1:D0JMgToj/WdxCgd30Kc1UcA9E+WdZoJqeVOuYW7iTBM=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -47,10 +44,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw=
github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q=
github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ=
github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac=
github.com/asdine/storm/v3 v3.2.1/go.mod h1:LEpXwGt4pIqrE/XcTvCnZHT5MgZCV6Ub9q7yQzOFWr0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -147,7 +140,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -275,15 +267,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
@@ -345,7 +334,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -502,7 +490,6 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co, gsagula
// SPDX-FileContributor: mochi-co, gsagula, werbenhu
package badger
@@ -490,23 +490,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
}
// Errorf satisfies the badger interface for an error logger.
func (h *Hook) Errorf(m string, v ...interface{}) {
func (h *Hook) Errorf(m string, v ...any) {
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Warningf satisfies the badger interface for a warning logger.
func (h *Hook) Warningf(m string, v ...interface{}) {
func (h *Hook) Warningf(m string, v ...any) {
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Infof satisfies the badger interface for an info logger.
func (h *Hook) Infof(m string, v ...interface{}) {
func (h *Hook) Infof(m string, v ...any) {
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Debugf satisfies the badger interface for a debug logger.
func (h *Hook) Debugf(m string, v ...interface{}) {
func (h *Hook) Debugf(m string, v ...any) {
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package badger

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
// Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
package bolt
@@ -14,23 +14,27 @@ import (
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
sgob "github.com/asdine/storm/codec/gob"
"github.com/asdine/storm/v3"
"go.etcd.io/bbolt"
)
var (
ErrBucketNotFound = errors.New("bucket not found")
ErrKeyNotFound = errors.New("key not found")
)
const (
// defaultDbFile is the default file path for the boltdb file.
defaultDbFile = "bolt.db"
defaultDbFile = ".bolt"
// defaultTimeout is the default time to hold a connection to the file.
defaultTimeout = 250 * time.Millisecond
defaultBucket = "mochi"
)
// clientKey returns a primary key for a client.
func clientKey(cl *mqtt.Client) string {
return cl.ID
return storage.ClientKey + "_" + cl.ID
}
// subscriptionKey returns a primary key for a subscription.
@@ -56,6 +60,7 @@ func sysInfoKey() string {
// Options contains configuration settings for the bolt instance.
type Options struct {
Options *bbolt.Options
Bucket string `yaml:"bucket" json:"bucket"`
Path string `yaml:"path" json:"path"`
}
@@ -63,7 +68,7 @@ type Options struct {
type Hook struct {
mqtt.HookBase
config *Options // options for configuring the boltdb instance.
db *storm.DB // the boltdb instance.
db *bbolt.DB // the boltdb instance.
}
// ID returns the id of the hook.
@@ -110,22 +115,32 @@ func (h *Hook) Init(config any) error {
Timeout: defaultTimeout,
}
}
if h.config.Path == "" {
if len(h.config.Path) == 0 {
h.config.Path = defaultDbFile
}
if len(h.config.Bucket) == 0 {
h.config.Bucket = defaultBucket
}
var err error
h.db, err = storm.Open(h.config.Path, storm.BoltOptions(0600, h.config.Options), storm.Codec(sgob.Codec))
h.db, err = bbolt.Open(h.config.Path, 0600, h.config.Options)
if err != nil {
return err
}
return nil
err = h.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(h.config.Bucket))
return err
})
return err
}
// Stop closes the boltdb instance.
func (h *Hook) Stop() error {
return h.db.Close()
err := h.db.Close()
h.db = nil
return err
}
// OnSessionEstablished adds a client to the store when their session is established.
@@ -147,7 +162,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
props := cl.Properties.Props.Copy(false)
in := &storage.Client{
ID: clientKey(cl),
ID: cl.ID,
T: storage.ClientKey,
Remote: cl.Net.Remote,
Listener: cl.Net.Listener,
@@ -167,10 +182,8 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
},
Will: storage.ClientWill(cl.Properties.Will),
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save client data", "error", err, "data", in)
}
_ = h.setKv(clientKey(cl), in)
}
// OnDisconnect removes a client from the store if they were using a clean session.
@@ -188,10 +201,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
return
}
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
if err != nil && !errors.Is(err, storm.ErrNotFound) {
h.Log.Error("failed to delete client", "error", err, "id", clientKey(cl))
}
_ = h.delKv(clientKey(cl))
}
// OnSubscribed adds one or more client subscriptions to the store.
@@ -214,11 +224,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by
RetainHandling: pk.Filters[i].RetainHandling,
RetainAsPublished: pk.Filters[i].RetainAsPublished,
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save subscription data", "error", err, "client", cl.ID, "data", in)
}
_ = h.setKv(in.ID, in)
}
}
@@ -230,12 +236,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
}
for i := 0; i < len(pk.Filters); i++ {
err := h.db.DeleteStruct(&storage.Subscription{
ID: subscriptionKey(cl, pk.Filters[i].Filter),
})
if err != nil {
h.Log.Error("failed to delete client", "error", err, "id", subscriptionKey(cl, pk.Filters[i].Filter))
}
_ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter))
}
}
@@ -247,12 +248,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
}
if r == -1 {
err := h.db.DeleteStruct(&storage.Message{
ID: retainedKey(pk.TopicName),
})
if err != nil {
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(pk.TopicName))
}
_ = h.delKv(retainedKey(pk.TopicName))
return
}
@@ -276,10 +272,8 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
User: props.User,
},
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save retained publish data", "error", err, "client", cl.ID, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnQosPublish adds or updates an inflight message in the store.
@@ -311,10 +305,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
},
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save qos inflight data", "error", err, "client", cl.ID, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnQosComplete removes a resolved inflight message from the store.
@@ -324,12 +315,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
return
}
err := h.db.DeleteStruct(&storage.Message{
ID: inflightKey(cl, pk),
})
if err != nil {
h.Log.Error("failed to delete inflight data", "error", err, "id", inflightKey(cl, pk))
}
_ = h.delKv(inflightKey(cl, pk))
}
// OnQosDropped removes a dropped inflight message from the store.
@@ -354,10 +340,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) {
Info: *sys,
}
err := h.db.Save(in)
if err != nil {
h.Log.Error("failed to save $SYS data", "error", err, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnRetainedExpired deletes expired retained messages from the store.
@@ -366,10 +349,7 @@ func (h *Hook) OnRetainedExpired(filter string) {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
}
if err := h.db.DeleteStruct(&storage.Message{ID: retainedKey(filter)}); err != nil {
h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(filter))
}
_ = h.delKv(retainedKey(filter))
}
// OnClientExpired deleted expired clients from the store.
@@ -378,25 +358,24 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
}
err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)})
if err != nil && !errors.Is(err, storm.ErrNotFound) {
h.Log.Error("failed to delete expired client", "error", err, "id", clientKey(cl))
}
_ = h.delKv(clientKey(cl))
}
// StoredClients returns all stored clients from the store.
func (h *Hook) StoredClients() (v []storage.Client, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
}
err = h.db.Find("T", storage.ClientKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
return v, storage.ErrDBFileNotOpen
}
err = h.iterKv(storage.ClientKey, func(value []byte) error {
obj := storage.Client{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return v, nil
}
@@ -404,58 +383,142 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) {
func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.Find("T", storage.SubscriptionKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Subscription, 0)
err = h.iterKv(storage.SubscriptionKey, func(value []byte) error {
obj := storage.Subscription{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredRetainedMessages returns all stored retained messages from the store.
func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.Find("T", storage.RetainedKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Message, 0)
err = h.iterKv(storage.RetainedKey, func(value []byte) error {
obj := storage.Message{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredInflightMessages returns all stored inflight messages from the store.
func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.Find("T", storage.InflightKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Message, 0)
err = h.iterKv(storage.InflightKey, func(value []byte) error {
obj := storage.Message{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredSysInfo returns the system info from the store.
func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
if h.db == nil {
h.Log.Error("", "error", storage.ErrDBFileNotOpen)
return
return v, storage.ErrDBFileNotOpen
}
err = h.db.One("ID", storage.SysInfoKey, &v)
if err != nil && !errors.Is(err, storm.ErrNotFound) {
err = h.getKv(storage.SysInfoKey, &v)
if err != nil && !errors.Is(err, ErrKeyNotFound) {
return
}
return v, nil
}
// setKv stores a key-value pair in the database.
func (h *Hook) setKv(k string, v storage.Serializable) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
data, _ := v.MarshalBinary()
err := bucket.Put([]byte(k), data)
if err != nil {
return err
}
return nil
})
if err != nil {
h.Log.Error("failed to upsert data", "error", err, "key", k)
}
return err
}
// delKv deletes a key-value pair from the database.
func (h *Hook) delKv(k string) error {
err := h.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
err := bucket.Delete([]byte(k))
if err != nil {
return err
}
return nil
})
if err != nil {
h.Log.Error("failed to delete data", "error", err, "key", k)
}
return err
}
// getKv retrieves the value associated with a key from the database.
func (h *Hook) getKv(k string, v storage.Serializable) error {
err := h.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
value := bucket.Get([]byte(k))
if value == nil {
return ErrKeyNotFound
}
return v.UnmarshalBinary(value)
})
if err != nil {
h.Log.Error("failed to get data", "error", err, "key", k)
}
return err
}
// iterKv iterates over key-value pairs with keys having the specified prefix in the database.
func (h *Hook) iterKv(prefix string, visit func([]byte) error) error {
err := h.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte(h.config.Bucket))
c := bucket.Cursor()
for k, v := c.Seek([]byte(prefix)); k != nil && string(k[:len(prefix)]) == prefix; k, v = c.Next() {
if err := visit(v); err != nil {
return err
}
}
return nil
})
if err != nil {
h.Log.Error("failed to iter data", "error", err, "prefix", prefix)
}
return err
}

View File

@@ -1,10 +1,11 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: mochi-co, werbenhu
package bolt
import (
"errors"
"log/slog"
"os"
"testing"
@@ -15,7 +16,6 @@ import (
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/asdine/storm/v3"
"github.com/stretchr/testify/require"
)
@@ -45,7 +45,7 @@ func teardown(t *testing.T, path string, h *Hook) {
func TestClientKey(t *testing.T) {
k := clientKey(&mqtt.Client{ID: "cl1"})
require.Equal(t, "cl1", k)
require.Equal(t, "CL_cl1", k)
}
func TestSubscriptionKey(t *testing.T) {
@@ -130,7 +130,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
h.OnSessionEstablished(client, packets.Packet{})
r := new(storage.Client)
err = h.db.One("ID", clientKey(client), r)
err = h.getKv(clientKey(client), r)
require.NoError(t, err)
require.Equal(t, client.ID, r.ID)
require.Equal(t, client.Net.Remote, r.Remote)
@@ -141,15 +141,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
h.OnDisconnect(client, nil, false)
r2 := new(storage.Client)
err = h.db.One("ID", clientKey(client), r2)
err = h.getKv(clientKey(client), r2)
require.NoError(t, err)
require.Equal(t, client.ID, r.ID)
h.OnDisconnect(client, nil, true)
r3 := new(storage.Client)
err = h.db.One("ID", clientKey(client), r3)
err = h.getKv(clientKey(client), r3)
require.Error(t, err)
require.ErrorIs(t, storm.ErrNotFound, err)
require.ErrorIs(t, ErrKeyNotFound, err)
require.Empty(t, r3.ID)
}
@@ -180,7 +180,7 @@ func TestOnWillSent(t *testing.T) {
h.OnWillSent(c1, packets.Packet{})
r := new(storage.Client)
err = h.db.One("ID", clientKey(client), r)
err = h.getKv(clientKey(client), r)
require.NoError(t, err)
require.Equal(t, uint32(1), r.Will.Flag)
@@ -197,18 +197,18 @@ func TestOnClientExpired(t *testing.T) {
cl := &mqtt.Client{ID: "cl1"}
clientKey := clientKey(cl)
err = h.db.Save(&storage.Client{ID: cl.ID})
err = h.setKv(clientKey, &storage.Client{ID: cl.ID})
require.NoError(t, err)
r := new(storage.Client)
err = h.db.One("ID", clientKey, r)
err = h.getKv(clientKey, r)
require.NoError(t, err)
require.Equal(t, cl.ID, r.ID)
h.OnClientExpired(cl)
err = h.db.One("ID", clientKey, r)
err = h.getKv(clientKey, r)
require.Error(t, err)
require.ErrorIs(t, storm.ErrNotFound, err)
require.ErrorIs(t, ErrKeyNotFound, err)
}
func TestOnClientExpiredClosedDB(t *testing.T) {
@@ -274,16 +274,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) {
h.OnSubscribed(client, pkf, []byte{0})
r := new(storage.Subscription)
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
require.NoError(t, err)
require.Equal(t, client.ID, r.Client)
require.Equal(t, pkf.Filters[0].Filter, r.Filter)
require.Equal(t, byte(0), r.Qos)
h.OnUnsubscribed(client, pkf)
err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r)
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnSubscribedNoDB(t *testing.T) {
@@ -334,21 +334,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) {
h.OnRetainMessage(client, pk, 1)
r := new(storage.Message)
err = h.db.One("ID", retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.NoError(t, err)
require.Equal(t, pk.TopicName, r.TopicName)
require.Equal(t, pk.Payload, r.Payload)
h.OnRetainMessage(client, pk, -1)
err = h.db.One("ID", retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
// coverage: delete deleted
h.OnRetainMessage(client, pk, -1)
err = h.db.One("ID", retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnRetainedExpired(t *testing.T) {
@@ -364,18 +364,18 @@ func TestOnRetainedExpired(t *testing.T) {
TopicName: "a/b/c",
}
err = h.db.Save(m)
err = h.setKv(m.ID, m)
require.NoError(t, err)
r := new(storage.Message)
err = h.db.One("ID", m.ID, r)
err = h.getKv(m.ID, r)
require.NoError(t, err)
require.Equal(t, m.TopicName, r.TopicName)
h.OnRetainedExpired(m.TopicName)
err = h.db.One("ID", m.ID, r)
err = h.getKv(m.ID, r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnRetainedExpiredClosedDB(t *testing.T) {
@@ -427,7 +427,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
h.OnQosPublish(client, pk, time.Now().Unix(), 0)
r := new(storage.Message)
err = h.db.One("ID", inflightKey(client, pk), r)
err = h.getKv(inflightKey(client, pk), r)
require.NoError(t, err)
require.Equal(t, pk.TopicName, r.TopicName)
require.Equal(t, pk.Payload, r.Payload)
@@ -438,9 +438,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
// OnQosDropped is a passthrough to OnQosComplete here
h.OnQosDropped(client, pk)
err = h.db.One("ID", inflightKey(client, pk), r)
err = h.getKv(inflightKey(client, pk), r)
require.Error(t, err)
require.Equal(t, storm.ErrNotFound, err)
require.Equal(t, ErrKeyNotFound, err)
}
func TestOnQosPublishNoDB(t *testing.T) {
@@ -494,7 +494,7 @@ func TestOnSysInfoTick(t *testing.T) {
h.OnSysInfoTick(info)
r := new(storage.SystemInfo)
err = h.db.One("ID", storage.SysInfoKey, r)
err = h.getKv(storage.SysInfoKey, r)
require.NoError(t, err)
require.Equal(t, info.Version, r.Version)
require.Equal(t, info.BytesReceived, r.BytesReceived)
@@ -524,13 +524,13 @@ func TestStoredClients(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with clients
err = h.db.Save(&storage.Client{ID: "cl1", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_"+"cl1", &storage.Client{ID: "cl1"})
require.NoError(t, err)
err = h.db.Save(&storage.Client{ID: "cl2", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_"+"cl2", &storage.Client{ID: "cl2"})
require.NoError(t, err)
err = h.db.Save(&storage.Client{ID: "cl3", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_"+"cl3", &storage.Client{ID: "cl3"})
require.NoError(t, err)
r, err := h.StoredClients()
@@ -546,7 +546,7 @@ func TestStoredClientsNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredClients()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredClientsClosedDB(t *testing.T) {
@@ -557,7 +557,7 @@ func TestStoredClientsClosedDB(t *testing.T) {
teardown(t, h.config.Path, h)
v, err := h.StoredClients()
require.Empty(t, v)
require.Error(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSubscriptions(t *testing.T) {
@@ -568,13 +568,13 @@ func TestStoredSubscriptions(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with subscriptions
err = h.db.Save(&storage.Subscription{ID: "sub1", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_"+"sub1", &storage.Subscription{ID: "sub1"})
require.NoError(t, err)
err = h.db.Save(&storage.Subscription{ID: "sub2", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_"+"sub2", &storage.Subscription{ID: "sub2"})
require.NoError(t, err)
err = h.db.Save(&storage.Subscription{ID: "sub3", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_"+"sub3", &storage.Subscription{ID: "sub3"})
require.NoError(t, err)
r, err := h.StoredSubscriptions()
@@ -590,7 +590,7 @@ func TestStoredSubscriptionsNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredSubscriptions()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSubscriptionsClosedDB(t *testing.T) {
@@ -601,7 +601,7 @@ func TestStoredSubscriptionsClosedDB(t *testing.T) {
teardown(t, h.config.Path, h)
v, err := h.StoredSubscriptions()
require.Empty(t, v)
require.Error(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredRetainedMessages(t *testing.T) {
@@ -612,16 +612,16 @@ func TestStoredRetainedMessages(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "m2", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m2", &storage.Message{ID: "m2"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "m3", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m3", &storage.Message{ID: "m3"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
require.NoError(t, err)
r, err := h.StoredRetainedMessages()
@@ -637,7 +637,7 @@ func TestStoredRetainedMessagesNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredRetainedMessages()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredRetainedMessagesClosedDB(t *testing.T) {
@@ -659,16 +659,16 @@ func TestStoredInflightMessages(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Save(&storage.Message{ID: "i1", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i1", &storage.Message{ID: "i1"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "i2", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i2", &storage.Message{ID: "i2"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"})
require.NoError(t, err)
err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"})
require.NoError(t, err)
r, err := h.StoredInflightMessages()
@@ -684,7 +684,7 @@ func TestStoredInflightMessagesNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredInflightMessages()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredInflightMessagesClosedDB(t *testing.T) {
@@ -695,7 +695,7 @@ func TestStoredInflightMessagesClosedDB(t *testing.T) {
teardown(t, h.config.Path, h)
v, err := h.StoredInflightMessages()
require.Empty(t, v)
require.Error(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSysInfo(t *testing.T) {
@@ -706,7 +706,7 @@ func TestStoredSysInfo(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with sys info
err = h.db.Save(&storage.SystemInfo{
err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{
ID: storage.SysInfoKey,
Info: system.Info{
Version: "2.0.0",
@@ -725,7 +725,7 @@ func TestStoredSysInfoNoDB(t *testing.T) {
h.SetOpts(logger, nil)
v, err := h.StoredSysInfo()
require.Empty(t, v)
require.NoError(t, err)
require.ErrorIs(t, storage.ErrDBFileNotOpen, err)
}
func TestStoredSysInfoClosedDB(t *testing.T) {
@@ -738,3 +738,54 @@ func TestStoredSysInfoClosedDB(t *testing.T) {
require.Empty(t, v)
require.Error(t, err)
}
func TestGetSetDelKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(nil)
defer teardown(t, h.config.Path, h)
require.NoError(t, err)
err = h.setKv("testId", &storage.Client{ID: "testId"})
require.NoError(t, err)
var obj storage.Client
err = h.getKv("testId", &obj)
require.NoError(t, err)
err = h.delKv("testId")
require.NoError(t, err)
err = h.getKv("testId", &obj)
require.Error(t, err)
require.ErrorIs(t, ErrKeyNotFound, err)
}
func TestIterKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(nil)
defer teardown(t, h.config.Path, h)
require.NoError(t, err)
h.setKv("prefix_a_1", &storage.Client{ID: "1"})
h.setKv("prefix_a_2", &storage.Client{ID: "2"})
h.setKv("prefix_b_2", &storage.Client{ID: "3"})
var clients []storage.Client
err = h.iterKv("prefix_a", func(data []byte) error {
var item storage.Client
item.UnmarshalBinary(data)
clients = append(clients, item)
return nil
})
require.Equal(t, 2, len(clients))
require.NoError(t, err)
visitErr := errors.New("iter visit error")
err = h.iterKv("prefix_b", func(data []byte) error {
return visitErr
})
require.ErrorIs(t, visitErr, err)
}

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co, gsagula
// SPDX-FileContributor: werbenhu
package pebble
@@ -467,23 +467,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
}
// Errorf satisfies the pebble interface for an error logger.
func (h *Hook) Errorf(m string, v ...interface{}) {
func (h *Hook) Errorf(m string, v ...any) {
h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Warningf satisfies the pebble interface for a warning logger.
func (h *Hook) Warningf(m string, v ...interface{}) {
func (h *Hook) Warningf(m string, v ...any) {
h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Infof satisfies the pebble interface for an info logger.
func (h *Hook) Infof(m string, v ...interface{}) {
func (h *Hook) Infof(m string, v ...any) {
h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// Debugf satisfies the pebble interface for a debug logger.
func (h *Hook) Debugf(m string, v ...interface{}) {
func (h *Hook) Debugf(m string, v ...any) {
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}

View File

@@ -1,6 +1,6 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
// SPDX-FileContributor: werbenhu
package pebble