diff --git a/README-CN.md b/README-CN.md index ab9e9d1..320ecd6 100644 --- a/README-CN.md +++ b/README-CN.md @@ -414,7 +414,7 @@ if err != nil { ### 内联客户端 (Inline Client v2.4.0+支持) -现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用: +现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。目前,内联客户端暂时还不支持共享订阅。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用: ```go server := mqtt.New(&mqtt.Options{ diff --git a/README.md b/README.md index 836a22f..b3ed1e9 100644 --- a/README.md +++ b/README.md @@ -399,7 +399,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`. ### Inline Client (v2.4.0+) -It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options: +It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options: ```go server := mqtt.New(&mqtt.Options{ InlineClient: true, diff --git a/config/config_test.go b/config/config_test.go index 53c4652..a20a197 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -180,7 +180,8 @@ func TestToHooksStorageBolt(t *testing.T) { hc := HookConfigs{ Storage: &HookStorageConfig{ Bolt: &bolt.Options{ - Path: "bolt", + Path: "bolt", + Bucket: "mochi", }, }, } diff --git a/examples/config/config.json b/examples/config/config.json index f95a67a..eca75e8 100644 --- a/examples/config/config.json +++ b/examples/config/config.json @@ -31,7 +31,8 @@ "gc_discard_ratio": 0.5 }, "bolt": { - "path": "bolt.db" + "path": "bolt.db", + "bucket": "mochi" }, "redis": { "h_prefix": "mc", diff --git a/examples/config/config.yaml b/examples/config/config.yaml index cd45d13..6691526 100644 --- a/examples/config/config.yaml +++ b/examples/config/config.yaml @@ -21,6 +21,7 @@ hooks: mode: "NoSync" bolt: path: bolt.db + bucket: "mochi" redis: h_prefix: "mc" username: "mochi" diff --git a/examples/persistence/badger/main.go b/examples/persistence/badger/main.go index 49f4f6b..3ff19a6 100644 --- a/examples/persistence/badger/main.go +++ b/examples/persistence/badger/main.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co +// SPDX-FileContributor: mochi-co, werbenhu package main diff --git a/examples/persistence/bolt/main.go b/examples/persistence/bolt/main.go index a88c9cc..2a063e3 100644 --- a/examples/persistence/bolt/main.go +++ b/examples/persistence/bolt/main.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co +// SPDX-FileContributor: mochi-co, werbenhu package main @@ -19,6 +19,9 @@ import ( ) func main() { + boltPath := ".bolt" + defer os.RemoveAll(boltPath) // remove the example db files at the end + sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) @@ -31,7 +34,7 @@ func main() { _ = server.AddHook(new(auth.AllowHook), nil) err := server.AddHook(new(bolt.Hook), &bolt.Options{ - Path: "bolt.db", + Path: boltPath, Options: &bbolt.Options{ Timeout: 500 * time.Millisecond, }, diff --git a/go.mod b/go.mod index 25a5631..2d90f77 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,6 @@ go 1.21 require ( github.com/alicebob/miniredis/v2 v2.23.0 - github.com/asdine/storm v2.1.2+incompatible - github.com/asdine/storm/v3 v3.2.1 github.com/cockroachdb/pebble v1.1.0 github.com/dgraph-io/badger/v4 v4.2.0 github.com/go-redis/redis/v8 v8.11.5 diff --git a/go.sum b/go.sum index c4e04b9..6a835a5 100644 --- a/go.sum +++ b/go.sum @@ -33,11 +33,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863 h1:BRrxwOZBolJN4gIwvZMJY1tzqBvQgpaZiQRuIDD40jM= -github.com/Sereal/Sereal v0.0.0-20190618215532-0b8ac451a863/go.mod h1:D0JMgToj/WdxCgd30Kc1UcA9E+WdZoJqeVOuYW7iTBM= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -47,10 +44,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw= github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88= -github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q= -github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ= -github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac= -github.com/asdine/storm/v3 v3.2.1/go.mod h1:LEpXwGt4pIqrE/XcTvCnZHT5MgZCV6Ub9q7yQzOFWr0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -147,7 +140,6 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -275,15 +267,12 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= -github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw= github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= -go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -345,7 +334,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191105084925-a882066a44e0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -502,7 +490,6 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= diff --git a/hooks/storage/badger/badger.go b/hooks/storage/badger/badger.go index 2d3a1be..b6f9564 100644 --- a/hooks/storage/badger/badger.go +++ b/hooks/storage/badger/badger.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co, gsagula +// SPDX-FileContributor: mochi-co, gsagula, werbenhu package badger @@ -490,23 +490,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) { } // Errorf satisfies the badger interface for an error logger. -func (h *Hook) Errorf(m string, v ...interface{}) { +func (h *Hook) Errorf(m string, v ...any) { h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } // Warningf satisfies the badger interface for a warning logger. -func (h *Hook) Warningf(m string, v ...interface{}) { +func (h *Hook) Warningf(m string, v ...any) { h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } // Infof satisfies the badger interface for an info logger. -func (h *Hook) Infof(m string, v ...interface{}) { +func (h *Hook) Infof(m string, v ...any) { h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } // Debugf satisfies the badger interface for a debug logger. -func (h *Hook) Debugf(m string, v ...interface{}) { +func (h *Hook) Debugf(m string, v ...any) { h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } diff --git a/hooks/storage/badger/badger_test.go b/hooks/storage/badger/badger_test.go index 028c00c..c3001c1 100644 --- a/hooks/storage/badger/badger_test.go +++ b/hooks/storage/badger/badger_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co +// SPDX-FileContributor: mochi-co, werbenhu package badger diff --git a/hooks/storage/bolt/bolt.go b/hooks/storage/bolt/bolt.go index 2ce3061..cf9dae4 100644 --- a/hooks/storage/bolt/bolt.go +++ b/hooks/storage/bolt/bolt.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co +// SPDX-FileContributor: mochi-co, werbenhu // Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead. package bolt @@ -14,23 +14,27 @@ import ( "github.com/mochi-mqtt/server/v2/hooks/storage" "github.com/mochi-mqtt/server/v2/packets" "github.com/mochi-mqtt/server/v2/system" - - sgob "github.com/asdine/storm/codec/gob" - "github.com/asdine/storm/v3" "go.etcd.io/bbolt" ) +var ( + ErrBucketNotFound = errors.New("bucket not found") + ErrKeyNotFound = errors.New("key not found") +) + const ( // defaultDbFile is the default file path for the boltdb file. - defaultDbFile = "bolt.db" + defaultDbFile = ".bolt" // defaultTimeout is the default time to hold a connection to the file. defaultTimeout = 250 * time.Millisecond + + defaultBucket = "mochi" ) // clientKey returns a primary key for a client. func clientKey(cl *mqtt.Client) string { - return cl.ID + return storage.ClientKey + "_" + cl.ID } // subscriptionKey returns a primary key for a subscription. @@ -56,6 +60,7 @@ func sysInfoKey() string { // Options contains configuration settings for the bolt instance. type Options struct { Options *bbolt.Options + Bucket string `yaml:"bucket" json:"bucket"` Path string `yaml:"path" json:"path"` } @@ -63,7 +68,7 @@ type Options struct { type Hook struct { mqtt.HookBase config *Options // options for configuring the boltdb instance. - db *storm.DB // the boltdb instance. + db *bbolt.DB // the boltdb instance. } // ID returns the id of the hook. @@ -110,22 +115,32 @@ func (h *Hook) Init(config any) error { Timeout: defaultTimeout, } } - if h.config.Path == "" { + if len(h.config.Path) == 0 { h.config.Path = defaultDbFile } + if len(h.config.Bucket) == 0 { + h.config.Bucket = defaultBucket + } + var err error - h.db, err = storm.Open(h.config.Path, storm.BoltOptions(0600, h.config.Options), storm.Codec(sgob.Codec)) + h.db, err = bbolt.Open(h.config.Path, 0600, h.config.Options) if err != nil { return err } - return nil + err = h.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(h.config.Bucket)) + return err + }) + return err } // Stop closes the boltdb instance. func (h *Hook) Stop() error { - return h.db.Close() + err := h.db.Close() + h.db = nil + return err } // OnSessionEstablished adds a client to the store when their session is established. @@ -147,7 +162,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) { props := cl.Properties.Props.Copy(false) in := &storage.Client{ - ID: clientKey(cl), + ID: cl.ID, T: storage.ClientKey, Remote: cl.Net.Remote, Listener: cl.Net.Listener, @@ -167,10 +182,8 @@ func (h *Hook) updateClient(cl *mqtt.Client) { }, Will: storage.ClientWill(cl.Properties.Will), } - err := h.db.Save(in) - if err != nil { - h.Log.Error("failed to save client data", "error", err, "data", in) - } + + _ = h.setKv(clientKey(cl), in) } // OnDisconnect removes a client from the store if they were using a clean session. @@ -188,10 +201,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) { return } - err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)}) - if err != nil && !errors.Is(err, storm.ErrNotFound) { - h.Log.Error("failed to delete client", "error", err, "id", clientKey(cl)) - } + _ = h.delKv(clientKey(cl)) } // OnSubscribed adds one or more client subscriptions to the store. @@ -214,11 +224,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by RetainHandling: pk.Filters[i].RetainHandling, RetainAsPublished: pk.Filters[i].RetainAsPublished, } - - err := h.db.Save(in) - if err != nil { - h.Log.Error("failed to save subscription data", "error", err, "client", cl.ID, "data", in) - } + _ = h.setKv(in.ID, in) } } @@ -230,12 +236,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) { } for i := 0; i < len(pk.Filters); i++ { - err := h.db.DeleteStruct(&storage.Subscription{ - ID: subscriptionKey(cl, pk.Filters[i].Filter), - }) - if err != nil { - h.Log.Error("failed to delete client", "error", err, "id", subscriptionKey(cl, pk.Filters[i].Filter)) - } + _ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter)) } } @@ -247,12 +248,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { } if r == -1 { - err := h.db.DeleteStruct(&storage.Message{ - ID: retainedKey(pk.TopicName), - }) - if err != nil { - h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(pk.TopicName)) - } + _ = h.delKv(retainedKey(pk.TopicName)) return } @@ -276,10 +272,8 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { User: props.User, }, } - err := h.db.Save(in) - if err != nil { - h.Log.Error("failed to save retained publish data", "error", err, "client", cl.ID, "data", in) - } + + _ = h.setKv(in.ID, in) } // OnQosPublish adds or updates an inflight message in the store. @@ -311,10 +305,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese }, } - err := h.db.Save(in) - if err != nil { - h.Log.Error("failed to save qos inflight data", "error", err, "client", cl.ID, "data", in) - } + _ = h.setKv(in.ID, in) } // OnQosComplete removes a resolved inflight message from the store. @@ -324,12 +315,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) { return } - err := h.db.DeleteStruct(&storage.Message{ - ID: inflightKey(cl, pk), - }) - if err != nil { - h.Log.Error("failed to delete inflight data", "error", err, "id", inflightKey(cl, pk)) - } + _ = h.delKv(inflightKey(cl, pk)) } // OnQosDropped removes a dropped inflight message from the store. @@ -354,10 +340,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) { Info: *sys, } - err := h.db.Save(in) - if err != nil { - h.Log.Error("failed to save $SYS data", "error", err, "data", in) - } + _ = h.setKv(in.ID, in) } // OnRetainedExpired deletes expired retained messages from the store. @@ -366,10 +349,7 @@ func (h *Hook) OnRetainedExpired(filter string) { h.Log.Error("", "error", storage.ErrDBFileNotOpen) return } - - if err := h.db.DeleteStruct(&storage.Message{ID: retainedKey(filter)}); err != nil { - h.Log.Error("failed to delete retained publish", "error", err, "id", retainedKey(filter)) - } + _ = h.delKv(retainedKey(filter)) } // OnClientExpired deleted expired clients from the store. @@ -378,25 +358,24 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) { h.Log.Error("", "error", storage.ErrDBFileNotOpen) return } - - err := h.db.DeleteStruct(&storage.Client{ID: clientKey(cl)}) - if err != nil && !errors.Is(err, storm.ErrNotFound) { - h.Log.Error("failed to delete expired client", "error", err, "id", clientKey(cl)) - } + _ = h.delKv(clientKey(cl)) } // StoredClients returns all stored clients from the store. func (h *Hook) StoredClients() (v []storage.Client, err error) { if h.db == nil { h.Log.Error("", "error", storage.ErrDBFileNotOpen) - return - } - - err = h.db.Find("T", storage.ClientKey, &v) - if err != nil && !errors.Is(err, storm.ErrNotFound) { - return + return v, storage.ErrDBFileNotOpen } + err = h.iterKv(storage.ClientKey, func(value []byte) error { + obj := storage.Client{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) return v, nil } @@ -404,58 +383,142 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) { func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) { if h.db == nil { h.Log.Error("", "error", storage.ErrDBFileNotOpen) - return + return v, storage.ErrDBFileNotOpen } - err = h.db.Find("T", storage.SubscriptionKey, &v) - if err != nil && !errors.Is(err, storm.ErrNotFound) { - return - } - - return v, nil + v = make([]storage.Subscription, 0) + err = h.iterKv(storage.SubscriptionKey, func(value []byte) error { + obj := storage.Subscription{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + return } // StoredRetainedMessages returns all stored retained messages from the store. func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) { if h.db == nil { h.Log.Error("", "error", storage.ErrDBFileNotOpen) - return + return v, storage.ErrDBFileNotOpen } - err = h.db.Find("T", storage.RetainedKey, &v) - if err != nil && !errors.Is(err, storm.ErrNotFound) { - return - } - - return v, nil + v = make([]storage.Message, 0) + err = h.iterKv(storage.RetainedKey, func(value []byte) error { + obj := storage.Message{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + return } // StoredInflightMessages returns all stored inflight messages from the store. func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) { if h.db == nil { h.Log.Error("", "error", storage.ErrDBFileNotOpen) - return + return v, storage.ErrDBFileNotOpen } - err = h.db.Find("T", storage.InflightKey, &v) - if err != nil && !errors.Is(err, storm.ErrNotFound) { - return - } - - return v, nil + v = make([]storage.Message, 0) + err = h.iterKv(storage.InflightKey, func(value []byte) error { + obj := storage.Message{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + return } // StoredSysInfo returns the system info from the store. func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) { if h.db == nil { h.Log.Error("", "error", storage.ErrDBFileNotOpen) - return + return v, storage.ErrDBFileNotOpen } - err = h.db.One("ID", storage.SysInfoKey, &v) - if err != nil && !errors.Is(err, storm.ErrNotFound) { + err = h.getKv(storage.SysInfoKey, &v) + if err != nil && !errors.Is(err, ErrKeyNotFound) { return } return v, nil } + +// setKv stores a key-value pair in the database. +func (h *Hook) setKv(k string, v storage.Serializable) error { + err := h.db.Update(func(tx *bbolt.Tx) error { + + bucket := tx.Bucket([]byte(h.config.Bucket)) + data, _ := v.MarshalBinary() + err := bucket.Put([]byte(k), data) + if err != nil { + return err + } + return nil + }) + if err != nil { + h.Log.Error("failed to upsert data", "error", err, "key", k) + } + return err +} + +// delKv deletes a key-value pair from the database. +func (h *Hook) delKv(k string) error { + err := h.db.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(h.config.Bucket)) + err := bucket.Delete([]byte(k)) + if err != nil { + return err + } + return nil + }) + if err != nil { + h.Log.Error("failed to delete data", "error", err, "key", k) + } + return err +} + +// getKv retrieves the value associated with a key from the database. +func (h *Hook) getKv(k string, v storage.Serializable) error { + err := h.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(h.config.Bucket)) + + value := bucket.Get([]byte(k)) + if value == nil { + return ErrKeyNotFound + } + + return v.UnmarshalBinary(value) + }) + if err != nil { + h.Log.Error("failed to get data", "error", err, "key", k) + } + return err +} + +// iterKv iterates over key-value pairs with keys having the specified prefix in the database. +func (h *Hook) iterKv(prefix string, visit func([]byte) error) error { + err := h.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket([]byte(h.config.Bucket)) + + c := bucket.Cursor() + for k, v := c.Seek([]byte(prefix)); k != nil && string(k[:len(prefix)]) == prefix; k, v = c.Next() { + if err := visit(v); err != nil { + return err + } + } + return nil + }) + + if err != nil { + h.Log.Error("failed to iter data", "error", err, "prefix", prefix) + } + return err +} diff --git a/hooks/storage/bolt/bolt_test.go b/hooks/storage/bolt/bolt_test.go index 6c1a638..ab2fbc5 100644 --- a/hooks/storage/bolt/bolt_test.go +++ b/hooks/storage/bolt/bolt_test.go @@ -1,10 +1,11 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co +// SPDX-FileContributor: mochi-co, werbenhu package bolt import ( + "errors" "log/slog" "os" "testing" @@ -15,7 +16,6 @@ import ( "github.com/mochi-mqtt/server/v2/packets" "github.com/mochi-mqtt/server/v2/system" - "github.com/asdine/storm/v3" "github.com/stretchr/testify/require" ) @@ -45,7 +45,7 @@ func teardown(t *testing.T, path string, h *Hook) { func TestClientKey(t *testing.T) { k := clientKey(&mqtt.Client{ID: "cl1"}) - require.Equal(t, "cl1", k) + require.Equal(t, "CL_cl1", k) } func TestSubscriptionKey(t *testing.T) { @@ -130,7 +130,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) { h.OnSessionEstablished(client, packets.Packet{}) r := new(storage.Client) - err = h.db.One("ID", clientKey(client), r) + err = h.getKv(clientKey(client), r) require.NoError(t, err) require.Equal(t, client.ID, r.ID) require.Equal(t, client.Net.Remote, r.Remote) @@ -141,15 +141,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) { h.OnDisconnect(client, nil, false) r2 := new(storage.Client) - err = h.db.One("ID", clientKey(client), r2) + err = h.getKv(clientKey(client), r2) require.NoError(t, err) require.Equal(t, client.ID, r.ID) h.OnDisconnect(client, nil, true) r3 := new(storage.Client) - err = h.db.One("ID", clientKey(client), r3) + err = h.getKv(clientKey(client), r3) require.Error(t, err) - require.ErrorIs(t, storm.ErrNotFound, err) + require.ErrorIs(t, ErrKeyNotFound, err) require.Empty(t, r3.ID) } @@ -180,7 +180,7 @@ func TestOnWillSent(t *testing.T) { h.OnWillSent(c1, packets.Packet{}) r := new(storage.Client) - err = h.db.One("ID", clientKey(client), r) + err = h.getKv(clientKey(client), r) require.NoError(t, err) require.Equal(t, uint32(1), r.Will.Flag) @@ -197,18 +197,18 @@ func TestOnClientExpired(t *testing.T) { cl := &mqtt.Client{ID: "cl1"} clientKey := clientKey(cl) - err = h.db.Save(&storage.Client{ID: cl.ID}) + err = h.setKv(clientKey, &storage.Client{ID: cl.ID}) require.NoError(t, err) r := new(storage.Client) - err = h.db.One("ID", clientKey, r) + err = h.getKv(clientKey, r) require.NoError(t, err) require.Equal(t, cl.ID, r.ID) h.OnClientExpired(cl) - err = h.db.One("ID", clientKey, r) + err = h.getKv(clientKey, r) require.Error(t, err) - require.ErrorIs(t, storm.ErrNotFound, err) + require.ErrorIs(t, ErrKeyNotFound, err) } func TestOnClientExpiredClosedDB(t *testing.T) { @@ -274,16 +274,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) { h.OnSubscribed(client, pkf, []byte{0}) r := new(storage.Subscription) - err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r) + err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r) require.NoError(t, err) require.Equal(t, client.ID, r.Client) require.Equal(t, pkf.Filters[0].Filter, r.Filter) require.Equal(t, byte(0), r.Qos) h.OnUnsubscribed(client, pkf) - err = h.db.One("ID", subscriptionKey(client, pkf.Filters[0].Filter), r) + err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r) require.Error(t, err) - require.Equal(t, storm.ErrNotFound, err) + require.Equal(t, ErrKeyNotFound, err) } func TestOnSubscribedNoDB(t *testing.T) { @@ -334,21 +334,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) { h.OnRetainMessage(client, pk, 1) r := new(storage.Message) - err = h.db.One("ID", retainedKey(pk.TopicName), r) + err = h.getKv(retainedKey(pk.TopicName), r) require.NoError(t, err) require.Equal(t, pk.TopicName, r.TopicName) require.Equal(t, pk.Payload, r.Payload) h.OnRetainMessage(client, pk, -1) - err = h.db.One("ID", retainedKey(pk.TopicName), r) + err = h.getKv(retainedKey(pk.TopicName), r) require.Error(t, err) - require.Equal(t, storm.ErrNotFound, err) + require.Equal(t, ErrKeyNotFound, err) // coverage: delete deleted h.OnRetainMessage(client, pk, -1) - err = h.db.One("ID", retainedKey(pk.TopicName), r) + err = h.getKv(retainedKey(pk.TopicName), r) require.Error(t, err) - require.Equal(t, storm.ErrNotFound, err) + require.Equal(t, ErrKeyNotFound, err) } func TestOnRetainedExpired(t *testing.T) { @@ -364,18 +364,18 @@ func TestOnRetainedExpired(t *testing.T) { TopicName: "a/b/c", } - err = h.db.Save(m) + err = h.setKv(m.ID, m) require.NoError(t, err) r := new(storage.Message) - err = h.db.One("ID", m.ID, r) + err = h.getKv(m.ID, r) require.NoError(t, err) require.Equal(t, m.TopicName, r.TopicName) h.OnRetainedExpired(m.TopicName) - err = h.db.One("ID", m.ID, r) + err = h.getKv(m.ID, r) require.Error(t, err) - require.Equal(t, storm.ErrNotFound, err) + require.Equal(t, ErrKeyNotFound, err) } func TestOnRetainedExpiredClosedDB(t *testing.T) { @@ -427,7 +427,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) { h.OnQosPublish(client, pk, time.Now().Unix(), 0) r := new(storage.Message) - err = h.db.One("ID", inflightKey(client, pk), r) + err = h.getKv(inflightKey(client, pk), r) require.NoError(t, err) require.Equal(t, pk.TopicName, r.TopicName) require.Equal(t, pk.Payload, r.Payload) @@ -438,9 +438,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) { // OnQosDropped is a passthrough to OnQosComplete here h.OnQosDropped(client, pk) - err = h.db.One("ID", inflightKey(client, pk), r) + err = h.getKv(inflightKey(client, pk), r) require.Error(t, err) - require.Equal(t, storm.ErrNotFound, err) + require.Equal(t, ErrKeyNotFound, err) } func TestOnQosPublishNoDB(t *testing.T) { @@ -494,7 +494,7 @@ func TestOnSysInfoTick(t *testing.T) { h.OnSysInfoTick(info) r := new(storage.SystemInfo) - err = h.db.One("ID", storage.SysInfoKey, r) + err = h.getKv(storage.SysInfoKey, r) require.NoError(t, err) require.Equal(t, info.Version, r.Version) require.Equal(t, info.BytesReceived, r.BytesReceived) @@ -524,13 +524,13 @@ func TestStoredClients(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with clients - err = h.db.Save(&storage.Client{ID: "cl1", T: storage.ClientKey}) + err = h.setKv(storage.ClientKey+"_"+"cl1", &storage.Client{ID: "cl1"}) require.NoError(t, err) - err = h.db.Save(&storage.Client{ID: "cl2", T: storage.ClientKey}) + err = h.setKv(storage.ClientKey+"_"+"cl2", &storage.Client{ID: "cl2"}) require.NoError(t, err) - err = h.db.Save(&storage.Client{ID: "cl3", T: storage.ClientKey}) + err = h.setKv(storage.ClientKey+"_"+"cl3", &storage.Client{ID: "cl3"}) require.NoError(t, err) r, err := h.StoredClients() @@ -546,7 +546,7 @@ func TestStoredClientsNoDB(t *testing.T) { h.SetOpts(logger, nil) v, err := h.StoredClients() require.Empty(t, v) - require.NoError(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredClientsClosedDB(t *testing.T) { @@ -557,7 +557,7 @@ func TestStoredClientsClosedDB(t *testing.T) { teardown(t, h.config.Path, h) v, err := h.StoredClients() require.Empty(t, v) - require.Error(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredSubscriptions(t *testing.T) { @@ -568,13 +568,13 @@ func TestStoredSubscriptions(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with subscriptions - err = h.db.Save(&storage.Subscription{ID: "sub1", T: storage.SubscriptionKey}) + err = h.setKv(storage.SubscriptionKey+"_"+"sub1", &storage.Subscription{ID: "sub1"}) require.NoError(t, err) - err = h.db.Save(&storage.Subscription{ID: "sub2", T: storage.SubscriptionKey}) + err = h.setKv(storage.SubscriptionKey+"_"+"sub2", &storage.Subscription{ID: "sub2"}) require.NoError(t, err) - err = h.db.Save(&storage.Subscription{ID: "sub3", T: storage.SubscriptionKey}) + err = h.setKv(storage.SubscriptionKey+"_"+"sub3", &storage.Subscription{ID: "sub3"}) require.NoError(t, err) r, err := h.StoredSubscriptions() @@ -590,7 +590,7 @@ func TestStoredSubscriptionsNoDB(t *testing.T) { h.SetOpts(logger, nil) v, err := h.StoredSubscriptions() require.Empty(t, v) - require.NoError(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredSubscriptionsClosedDB(t *testing.T) { @@ -601,7 +601,7 @@ func TestStoredSubscriptionsClosedDB(t *testing.T) { teardown(t, h.config.Path, h) v, err := h.StoredSubscriptions() require.Empty(t, v) - require.Error(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredRetainedMessages(t *testing.T) { @@ -612,16 +612,16 @@ func TestStoredRetainedMessages(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with messages - err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"}) require.NoError(t, err) - err = h.db.Save(&storage.Message{ID: "m2", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_"+"m2", &storage.Message{ID: "m2"}) require.NoError(t, err) - err = h.db.Save(&storage.Message{ID: "m3", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_"+"m3", &storage.Message{ID: "m3"}) require.NoError(t, err) - err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"}) require.NoError(t, err) r, err := h.StoredRetainedMessages() @@ -637,7 +637,7 @@ func TestStoredRetainedMessagesNoDB(t *testing.T) { h.SetOpts(logger, nil) v, err := h.StoredRetainedMessages() require.Empty(t, v) - require.NoError(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredRetainedMessagesClosedDB(t *testing.T) { @@ -659,16 +659,16 @@ func TestStoredInflightMessages(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with messages - err = h.db.Save(&storage.Message{ID: "i1", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_"+"i1", &storage.Message{ID: "i1"}) require.NoError(t, err) - err = h.db.Save(&storage.Message{ID: "i2", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_"+"i2", &storage.Message{ID: "i2"}) require.NoError(t, err) - err = h.db.Save(&storage.Message{ID: "i3", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_"+"i3", &storage.Message{ID: "i3"}) require.NoError(t, err) - err = h.db.Save(&storage.Message{ID: "m1", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_"+"m1", &storage.Message{ID: "m1"}) require.NoError(t, err) r, err := h.StoredInflightMessages() @@ -684,7 +684,7 @@ func TestStoredInflightMessagesNoDB(t *testing.T) { h.SetOpts(logger, nil) v, err := h.StoredInflightMessages() require.Empty(t, v) - require.NoError(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredInflightMessagesClosedDB(t *testing.T) { @@ -695,7 +695,7 @@ func TestStoredInflightMessagesClosedDB(t *testing.T) { teardown(t, h.config.Path, h) v, err := h.StoredInflightMessages() require.Empty(t, v) - require.Error(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredSysInfo(t *testing.T) { @@ -706,7 +706,7 @@ func TestStoredSysInfo(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with sys info - err = h.db.Save(&storage.SystemInfo{ + err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{ ID: storage.SysInfoKey, Info: system.Info{ Version: "2.0.0", @@ -725,7 +725,7 @@ func TestStoredSysInfoNoDB(t *testing.T) { h.SetOpts(logger, nil) v, err := h.StoredSysInfo() require.Empty(t, v) - require.NoError(t, err) + require.ErrorIs(t, storage.ErrDBFileNotOpen, err) } func TestStoredSysInfoClosedDB(t *testing.T) { @@ -738,3 +738,54 @@ func TestStoredSysInfoClosedDB(t *testing.T) { require.Empty(t, v) require.Error(t, err) } + +func TestGetSetDelKv(t *testing.T) { + h := new(Hook) + h.SetOpts(logger, nil) + err := h.Init(nil) + defer teardown(t, h.config.Path, h) + require.NoError(t, err) + + err = h.setKv("testId", &storage.Client{ID: "testId"}) + require.NoError(t, err) + + var obj storage.Client + err = h.getKv("testId", &obj) + require.NoError(t, err) + + err = h.delKv("testId") + require.NoError(t, err) + + err = h.getKv("testId", &obj) + require.Error(t, err) + require.ErrorIs(t, ErrKeyNotFound, err) +} + +func TestIterKv(t *testing.T) { + h := new(Hook) + h.SetOpts(logger, nil) + + err := h.Init(nil) + defer teardown(t, h.config.Path, h) + require.NoError(t, err) + + h.setKv("prefix_a_1", &storage.Client{ID: "1"}) + h.setKv("prefix_a_2", &storage.Client{ID: "2"}) + h.setKv("prefix_b_2", &storage.Client{ID: "3"}) + + var clients []storage.Client + err = h.iterKv("prefix_a", func(data []byte) error { + var item storage.Client + item.UnmarshalBinary(data) + clients = append(clients, item) + return nil + }) + require.Equal(t, 2, len(clients)) + require.NoError(t, err) + + visitErr := errors.New("iter visit error") + err = h.iterKv("prefix_b", func(data []byte) error { + return visitErr + }) + require.ErrorIs(t, visitErr, err) +} diff --git a/hooks/storage/pebble/pebble.go b/hooks/storage/pebble/pebble.go index 3e43b91..9dbf210 100644 --- a/hooks/storage/pebble/pebble.go +++ b/hooks/storage/pebble/pebble.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co, gsagula +// SPDX-FileContributor: werbenhu package pebble @@ -467,23 +467,23 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) { } // Errorf satisfies the pebble interface for an error logger. -func (h *Hook) Errorf(m string, v ...interface{}) { +func (h *Hook) Errorf(m string, v ...any) { h.Log.Error(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } // Warningf satisfies the pebble interface for a warning logger. -func (h *Hook) Warningf(m string, v ...interface{}) { +func (h *Hook) Warningf(m string, v ...any) { h.Log.Warn(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } // Infof satisfies the pebble interface for an info logger. -func (h *Hook) Infof(m string, v ...interface{}) { +func (h *Hook) Infof(m string, v ...any) { h.Log.Info(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } // Debugf satisfies the pebble interface for a debug logger. -func (h *Hook) Debugf(m string, v ...interface{}) { +func (h *Hook) Debugf(m string, v ...any) { h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } diff --git a/hooks/storage/pebble/pebble_test.go b/hooks/storage/pebble/pebble_test.go index 4a12985..6f3df50 100644 --- a/hooks/storage/pebble/pebble_test.go +++ b/hooks/storage/pebble/pebble_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co -// SPDX-FileContributor: mochi-co +// SPDX-FileContributor: werbenhu package pebble