Replace badgerhold with directly using BadgerDB v4 (#376)

* Replace badgerhold with directly using BadgerDB v4.

* Optimize code and test cases.

* Optimize code.

* Set the default size of the log file to 100 MB.

* Resolve merge conflicts.
This commit is contained in:
werben
2024-04-02 00:51:02 +08:00
committed by GitHub
parent d048e4bef7
commit e2cb688869
5 changed files with 309 additions and 186 deletions

View File

@@ -5,16 +5,16 @@
package main
import (
badgerdb "github.com/dgraph-io/badger"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/timshannon/badgerhold"
"log"
"os"
"os/signal"
"syscall"
badgerdb "github.com/dgraph-io/badger/v4"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
"github.com/mochi-mqtt/server/v2/listeners"
)
func main() {
@@ -32,6 +32,9 @@ func main() {
server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)
badgerOpts := badgerdb.DefaultOptions(badgerPath) // BadgerDB options. Adjust according to your actual scenario.
badgerOpts.ValueLogFileSize = 100 * (1 << 20) // Set the default size of the log file to 100 MB.
// AddHook adds a BadgerDB hook to the server with the specified options.
// GcInterval specifies the interval at which BadgerDB garbage collection process runs.
// Refer to https://dgraph.io/docs/badger/get-started/#garbage-collection for more information.
@@ -48,14 +51,7 @@ func main() {
// Adjust according to your actual scenario.
GcDiscardRatio: 0.5,
Options: &badgerhold.Options{
// BadgerDB options. Adjust according to your actual scenario.
Options: badgerdb.Options{
NumCompactors: 2, // Number of compactors. Compactions can be expensive.
MaxTableSize: 64 << 20, // Maximum size of each table (64 MB).
ValueLogFileSize: 100 * (1 << 20), // Set the default size of the log file to 100 MB.
},
},
Options: &badgerOpts,
})
if err != nil {
log.Fatal(err)

10
go.mod
View File

@@ -7,19 +7,17 @@ require (
github.com/asdine/storm v2.1.2+incompatible
github.com/asdine/storm/v3 v3.2.1
github.com/cockroachdb/pebble v1.1.0
github.com/dgraph-io/badger v1.6.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/go-redis/redis/v8 v8.11.5
github.com/gorilla/websocket v1.5.0
github.com/jinzhu/copier v0.3.5
github.com/rs/xid v1.4.0
github.com/stretchr/testify v1.8.1
github.com/timshannon/badgerhold v1.0.0
go.etcd.io/bbolt v1.3.5
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/beorn7/perks v1.0.1 // indirect
@@ -29,13 +27,16 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
@@ -48,6 +49,7 @@ require (
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect

43
go.sum
View File

@@ -31,9 +31,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
@@ -50,7 +47,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw=
github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q=
github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ=
github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac=
@@ -81,16 +77,14 @@ github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwP
github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs=
github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
@@ -101,7 +95,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/getsentry/sentry-go v0.18.0 h1:MtBW5H9QgdcJabtZcuJG80BMOwaBpkRDZkxRkNC1sN0=
@@ -124,8 +117,11 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@@ -156,6 +152,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -184,9 +182,7 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@@ -212,12 +208,9 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -231,7 +224,6 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@@ -269,16 +261,9 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
@@ -290,12 +275,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/timshannon/badgerhold v1.0.0 h1:LtqnDRVP7294FWRiZCIfQa6Tt0bGmlzbO8c364QC2Y8=
github.com/timshannon/badgerhold v1.0.0/go.mod h1:Vv2Jj0PAfzqViEpGvJzLP8PY07x1iXLgKRuLY7bqPOE=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -310,8 +291,9 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -403,7 +385,6 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -413,7 +394,6 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -440,6 +420,7 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

View File

@@ -11,12 +11,11 @@ import (
"strings"
"time"
badgerdb "github.com/dgraph-io/badger/v4"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/timshannon/badgerhold"
)
const (
@@ -28,7 +27,7 @@ const (
// clientKey returns a primary key for a client.
func clientKey(cl *mqtt.Client) string {
return cl.ID
return storage.ClientKey + "_" + cl.ID
}
// subscriptionKey returns a primary key for a subscription.
@@ -51,9 +50,15 @@ func sysInfoKey() string {
return storage.SysInfoKey
}
// Serializable is an interface for objects that can be serialized and deserialized.
type Serializable interface {
UnmarshalBinary([]byte) error
MarshalBinary() (data []byte, err error)
}
// Options contains configuration settings for the BadgerDB instance.
type Options struct {
Options *badgerhold.Options
Options *badgerdb.Options
Path string `yaml:"path" json:"path"`
// GcDiscardRatio specifies the ratio of log discard compared to the maximum possible log discard.
// Setting it to a higher value would result in fewer space reclaims, while setting it to a lower value
@@ -66,9 +71,9 @@ type Options struct {
// Hook is a persistent storage hook based using BadgerDB file store as a backend.
type Hook struct {
mqtt.HookBase
config *Options // options for configuring the BadgerDB instance.
gcTicker *time.Ticker // Ticker for BadgerDB garbage collection.
db *badgerhold.Store // the BadgerDB instance.
config *Options // options for configuring the BadgerDB instance.
gcTicker *time.Ticker // Ticker for BadgerDB garbage collection.
db *badgerdb.DB // the BadgerDB instance.
}
// ID returns the id of the hook.
@@ -102,12 +107,12 @@ func (h *Hook) Provides(b byte) bool {
// GcLoop periodically runs the garbage collection process to reclaim space in the value log files.
// It uses a ticker to trigger the garbage collection at regular intervals specified by the configuration.
// Refer to: https://dgraph.io/docs/badger/get-started/#garbage-collection
func (h *Hook) GcLoop() {
func (h *Hook) gcLoop() {
for range h.gcTicker.C {
again:
// Run the garbage collection process with a threshold.
// If the process returns nil (success), repeat the process.
err := h.db.Badger().RunValueLogGC(h.config.GcDiscardRatio)
err := h.db.RunValueLogGC(h.config.GcDiscardRatio)
if err == nil {
goto again // Retry garbage collection if successful.
}
@@ -121,11 +126,12 @@ func (h *Hook) Init(config any) error {
}
if config == nil {
config = new(Options)
h.config = new(Options)
} else {
h.config = config.(*Options)
}
h.config = config.(*Options)
if h.config.Path == "" {
if len(h.config.Path) == 0 {
h.config.Path = defaultDbFile
}
@@ -137,19 +143,20 @@ func (h *Hook) Init(config any) error {
h.config.GcDiscardRatio = defaultGcDiscardRatio
}
options := badgerhold.DefaultOptions
options.Dir = h.config.Path
options.ValueDir = h.config.Path
options.Logger = h
if h.config.Options == nil {
defaultOpts := badgerdb.DefaultOptions(h.config.Path)
h.config.Options = &defaultOpts
}
h.config.Options.Logger = h
var err error
h.db, err = badgerhold.Open(options)
h.db, err = badgerdb.Open(*h.config.Options)
if err != nil {
return err
}
h.gcTicker = time.NewTicker(time.Duration(h.config.GcInterval) * time.Second)
go h.GcLoop()
go h.gcLoop()
return nil
}
@@ -181,7 +188,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
props := cl.Properties.Props.Copy(false)
in := &storage.Client{
ID: clientKey(cl),
ID: cl.ID,
T: storage.ClientKey,
Remote: cl.Net.Remote,
Listener: cl.Net.Listener,
@@ -202,7 +209,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) {
Will: storage.ClientWill(cl.Properties.Will),
}
err := h.db.Upsert(in.ID, in)
err := h.setKv(clientKey(cl), in)
if err != nil {
h.Log.Error("failed to upsert client data", "error", err, "data", in)
}
@@ -225,10 +232,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
return
}
err := h.db.Delete(clientKey(cl), new(storage.Client))
if err != nil {
h.Log.Error("failed to delete client data", "error", err, "data", clientKey(cl))
}
_ = h.delKv(clientKey(cl))
}
// OnSubscribed adds one or more client subscriptions to the store.
@@ -252,10 +256,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by
RetainAsPublished: pk.Filters[i].RetainAsPublished,
}
err := h.db.Upsert(in.ID, in)
if err != nil {
h.Log.Error("failed to upsert subscription data", "error", err, "data", in)
}
_ = h.setKv(in.ID, in)
}
}
@@ -267,10 +268,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
}
for i := 0; i < len(pk.Filters); i++ {
err := h.db.Delete(subscriptionKey(cl, pk.Filters[i].Filter), new(storage.Subscription))
if err != nil {
h.Log.Error("failed to delete subscription data", "error", err, "data", subscriptionKey(cl, pk.Filters[i].Filter))
}
_ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter))
}
}
@@ -282,11 +280,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
}
if r == -1 {
err := h.db.Delete(retainedKey(pk.TopicName), new(storage.Message))
if err != nil {
h.Log.Error("failed to delete retained message data", "error", err, "data", retainedKey(pk.TopicName))
}
_ = h.delKv(retainedKey(pk.TopicName))
return
}
@@ -311,10 +305,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) {
},
}
err := h.db.Upsert(in.ID, in)
if err != nil {
h.Log.Error("failed to upsert retained message data", "error", err, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnQosPublish adds or updates an inflight message in the store.
@@ -347,10 +338,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese
},
}
err := h.db.Upsert(in.ID, in)
if err != nil {
h.Log.Error("failed to upsert qos inflight data", "error", err, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnQosComplete removes a resolved inflight message from the store.
@@ -360,10 +348,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) {
return
}
err := h.db.Delete(inflightKey(cl, pk), new(storage.Message))
if err != nil {
h.Log.Error("failed to delete inflight message data", "error", err, "data", inflightKey(cl, pk))
}
_ = h.delKv(inflightKey(cl, pk))
}
// OnQosDropped removes a dropped inflight message from the store.
@@ -388,10 +373,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) {
Info: *sys.Clone(),
}
err := h.db.Upsert(in.ID, in)
if err != nil {
h.Log.Error("failed to upsert $SYS data", "error", err, "data", in)
}
_ = h.setKv(in.ID, in)
}
// OnRetainedExpired deletes expired retained messages from the store.
@@ -401,10 +383,7 @@ func (h *Hook) OnRetainedExpired(filter string) {
return
}
err := h.db.Delete(retainedKey(filter), new(storage.Message))
if err != nil {
h.Log.Error("failed to delete expired retained message data", "error", err, "id", retainedKey(filter))
}
_ = h.delKv(retainedKey(filter))
}
// OnClientExpired deleted expired clients from the store.
@@ -414,10 +393,7 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) {
return
}
err := h.db.Delete(clientKey(cl), new(storage.Client))
if err != nil {
h.Log.Error("failed to delete expired client data", "error", err, "id", clientKey(cl))
}
_ = h.delKv(clientKey(cl))
}
// StoredClients returns all stored clients from the store.
@@ -427,12 +403,15 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) {
return
}
err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.ClientKey))
if err != nil && !errors.Is(err, badgerhold.ErrNotFound) {
return
}
return v, nil
err = h.iterKv(storage.ClientKey, func(value []byte) error {
obj := storage.Client{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredSubscriptions returns all stored subscriptions from the store.
@@ -442,12 +421,16 @@ func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) {
return
}
err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.SubscriptionKey))
if err != nil && !errors.Is(err, badgerhold.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Subscription, 0)
err = h.iterKv(storage.SubscriptionKey, func(value []byte) error {
obj := storage.Subscription{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredRetainedMessages returns all stored retained messages from the store.
@@ -457,12 +440,20 @@ func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) {
return
}
err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.RetainedKey))
if err != nil && !errors.Is(err, badgerhold.ErrNotFound) {
v = make([]storage.Message, 0)
err = h.iterKv(storage.RetainedKey, func(value []byte) error {
obj := storage.Message{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
if err != nil && !errors.Is(err, badgerdb.ErrKeyNotFound) {
return
}
return v, nil
return
}
// StoredInflightMessages returns all stored inflight messages from the store.
@@ -472,12 +463,16 @@ func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) {
return
}
err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.InflightKey))
if err != nil && !errors.Is(err, badgerhold.ErrNotFound) {
return
}
return v, nil
v = make([]storage.Message, 0)
err = h.iterKv(storage.InflightKey, func(value []byte) error {
obj := storage.Message{}
err = obj.UnmarshalBinary(value)
if err == nil {
v = append(v, obj)
}
return err
})
return
}
// StoredSysInfo returns the system info from the store.
@@ -487,11 +482,10 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) {
return
}
err = h.db.Get(storage.SysInfoKey, &v)
if err != nil && !errors.Is(err, badgerhold.ErrNotFound) {
err = h.getKv(storage.SysInfoKey, &v)
if err != nil && !errors.Is(err, badgerdb.ErrKeyNotFound) {
return
}
return v, nil
}
@@ -515,3 +509,68 @@ func (h *Hook) Infof(m string, v ...interface{}) {
func (h *Hook) Debugf(m string, v ...interface{}) {
h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v)
}
// setKv stores a key-value pair in the database.
func (h *Hook) setKv(k string, v storage.Serializable) error {
err := h.db.Update(func(txn *badgerdb.Txn) error {
data, _ := v.MarshalBinary()
return txn.Set([]byte(k), data)
})
if err != nil {
h.Log.Error("failed to upsert data", "error", err, "key", k)
}
return err
}
// delKv deletes a key-value pair from the database.
func (h *Hook) delKv(k string) error {
err := h.db.Update(func(txn *badgerdb.Txn) error {
return txn.Delete([]byte(k))
})
if err != nil {
h.Log.Error("failed to delete data", "error", err, "key", k)
}
return err
}
// getKv retrieves the value associated with a key from the database.
func (h *Hook) getKv(k string, v storage.Serializable) error {
return h.db.View(func(txn *badgerdb.Txn) error {
item, err := txn.Get([]byte(k))
if err != nil {
return err
}
value, err := item.ValueCopy(nil)
if err != nil {
return err
}
return v.UnmarshalBinary(value)
})
}
// iterKv iterates over key-value pairs with keys having the specified prefix in the database.
func (h *Hook) iterKv(prefix string, visit func([]byte) error) error {
err := h.db.View(func(txn *badgerdb.Txn) error {
iterator := txn.NewIterator(badgerdb.DefaultIteratorOptions)
defer iterator.Close()
for iterator.Seek([]byte(prefix)); iterator.ValidForPrefix([]byte(prefix)); iterator.Next() {
item := iterator.Item()
value, err := item.ValueCopy(nil)
if err != nil {
return err
}
if err := visit(value); err != nil {
return err
}
}
return nil
})
if err != nil {
h.Log.Error("failed to find data", "error", err, "prefix", prefix)
}
return err
}

View File

@@ -5,19 +5,19 @@
package badger
import (
"errors"
"log/slog"
"os"
"strings"
"testing"
"time"
badgerdb "github.com/dgraph-io/badger"
badgerdb "github.com/dgraph-io/badger/v4"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/storage"
"github.com/mochi-mqtt/server/v2/packets"
"github.com/mochi-mqtt/server/v2/system"
"github.com/stretchr/testify/require"
"github.com/timshannon/badgerhold"
)
var (
@@ -40,14 +40,37 @@ var (
func teardown(t *testing.T, path string, h *Hook) {
_ = h.Stop()
_ = h.db.Badger().Close()
_ = h.db.Close()
err := os.RemoveAll("./" + strings.Replace(path, "..", "", -1))
require.NoError(t, err)
}
func TestSetGetDelKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
h.Init(nil)
defer teardown(t, h.config.Path, h)
key := "testKey"
value := &storage.Client{ID: "cl1"}
err := h.setKv(key, value)
require.NoError(t, err)
var client storage.Client
err = h.getKv(key, &client)
require.NoError(t, err)
require.Equal(t, "cl1", client.ID)
err = h.delKv(key)
require.NoError(t, err)
err = h.getKv(key, &client)
require.ErrorIs(t, badgerdb.ErrKeyNotFound, err)
}
func TestClientKey(t *testing.T) {
k := clientKey(&mqtt.Client{ID: "cl1"})
require.Equal(t, "cl1", k)
require.Equal(t, storage.ClientKey+"_cl1", k)
}
func TestSubscriptionKey(t *testing.T) {
@@ -102,6 +125,19 @@ func TestInitBadConfig(t *testing.T) {
require.Error(t, err)
}
func TestInitBadOption(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(&Options{
Options: &badgerdb.Options{
NumCompactors: 1,
},
})
// Cannot have 1 compactor. Need at least 2
require.Error(t, err)
}
func TestInitUseDefaults(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
@@ -122,7 +158,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
h.OnSessionEstablished(client, packets.Packet{})
r := new(storage.Client)
err = h.db.Get(clientKey(client), r)
err = h.getKv(clientKey(client), r)
require.NoError(t, err)
require.Equal(t, client.ID, r.ID)
require.Equal(t, client.Properties.Username, r.Username)
@@ -133,15 +169,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) {
h.OnDisconnect(client, nil, false)
r2 := new(storage.Client)
err = h.db.Get(clientKey(client), r2)
err = h.getKv(clientKey(client), r2)
require.NoError(t, err)
require.Equal(t, client.ID, r.ID)
h.OnDisconnect(client, nil, true)
r3 := new(storage.Client)
err = h.db.Get(clientKey(client), r3)
err = h.getKv(clientKey(client), r3)
require.Error(t, err)
require.ErrorIs(t, badgerhold.ErrNotFound, err)
require.ErrorIs(t, badgerdb.ErrKeyNotFound, err)
require.Empty(t, r3.ID)
}
@@ -155,18 +191,19 @@ func TestOnClientExpired(t *testing.T) {
cl := &mqtt.Client{ID: "cl1"}
clientKey := clientKey(cl)
err = h.db.Upsert(clientKey, &storage.Client{ID: cl.ID})
err = h.setKv(clientKey, &storage.Client{ID: cl.ID})
require.NoError(t, err)
r := new(storage.Client)
err = h.db.Get(clientKey, r)
err = h.getKv(clientKey, r)
require.NoError(t, err)
require.Equal(t, cl.ID, r.ID)
h.OnClientExpired(cl)
err = h.db.Get(clientKey, r)
err = h.getKv(clientKey, r)
require.Error(t, err)
require.ErrorIs(t, badgerhold.ErrNotFound, err)
require.ErrorIs(t, badgerdb.ErrKeyNotFound, err)
}
func TestOnClientExpiredNoDB(t *testing.T) {
@@ -211,7 +248,7 @@ func TestOnWillSent(t *testing.T) {
h.OnWillSent(c1, packets.Packet{})
r := new(storage.Client)
err = h.db.Get(clientKey(client), r)
err = h.getKv(clientKey(client), r)
require.NoError(t, err)
require.Equal(t, uint32(1), r.Will.Flag)
@@ -266,16 +303,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) {
h.OnSubscribed(client, pkf, []byte{0})
r := new(storage.Subscription)
err = h.db.Get(subscriptionKey(client, pkf.Filters[0].Filter), r)
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
require.NoError(t, err)
require.Equal(t, client.ID, r.Client)
require.Equal(t, pkf.Filters[0].Filter, r.Filter)
require.Equal(t, byte(0), r.Qos)
h.OnUnsubscribed(client, pkf)
err = h.db.Get(subscriptionKey(client, pkf.Filters[0].Filter), r)
err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r)
require.Error(t, err)
require.Equal(t, badgerhold.ErrNotFound, err)
require.Equal(t, badgerdb.ErrKeyNotFound, err)
}
func TestOnSubscribedNoDB(t *testing.T) {
@@ -326,21 +363,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) {
h.OnRetainMessage(client, pk, 1)
r := new(storage.Message)
err = h.db.Get(retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.NoError(t, err)
require.Equal(t, pk.TopicName, r.TopicName)
require.Equal(t, pk.Payload, r.Payload)
h.OnRetainMessage(client, pk, -1)
err = h.db.Get(retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.Error(t, err)
require.ErrorIs(t, err, badgerhold.ErrNotFound)
require.ErrorIs(t, err, badgerdb.ErrKeyNotFound)
// coverage: delete deleted
h.OnRetainMessage(client, pk, -1)
err = h.db.Get(retainedKey(pk.TopicName), r)
err = h.getKv(retainedKey(pk.TopicName), r)
require.Error(t, err)
require.ErrorIs(t, err, badgerhold.ErrNotFound)
require.ErrorIs(t, err, badgerdb.ErrKeyNotFound)
}
func TestOnRetainedExpired(t *testing.T) {
@@ -356,18 +393,18 @@ func TestOnRetainedExpired(t *testing.T) {
TopicName: "a/b/c",
}
err = h.db.Upsert(m.ID, m)
err = h.setKv(m.ID, m)
require.NoError(t, err)
r := new(storage.Message)
err = h.db.Get(m.ID, r)
err = h.getKv(m.ID, r)
require.NoError(t, err)
require.Equal(t, m.TopicName, r.TopicName)
h.OnRetainedExpired(m.TopicName)
err = h.db.Get(m.ID, r)
err = h.getKv(m.ID, r)
require.Error(t, err)
require.ErrorIs(t, err, badgerhold.ErrNotFound)
require.ErrorIs(t, err, badgerdb.ErrKeyNotFound)
}
func TestOnRetainExpiredNoDB(t *testing.T) {
@@ -419,7 +456,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
h.OnQosPublish(client, pk, time.Now().Unix(), 0)
r := new(storage.Message)
err = h.db.Get(inflightKey(client, pk), r)
err = h.getKv(inflightKey(client, pk), r)
require.NoError(t, err)
require.Equal(t, pk.TopicName, r.TopicName)
require.Equal(t, pk.Payload, r.Payload)
@@ -430,9 +467,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) {
// OnQosDropped is a passthrough to OnQosComplete here
h.OnQosDropped(client, pk)
err = h.db.Get(inflightKey(client, pk), r)
err = h.getKv(inflightKey(client, pk), r)
require.Error(t, err)
require.ErrorIs(t, err, badgerhold.ErrNotFound)
require.ErrorIs(t, err, badgerdb.ErrKeyNotFound)
}
func TestOnQosPublishNoDB(t *testing.T) {
@@ -486,7 +523,7 @@ func TestOnSysInfoTick(t *testing.T) {
h.OnSysInfoTick(info)
r := new(storage.SystemInfo)
err = h.db.Get(storage.SysInfoKey, r)
err = h.getKv(storage.SysInfoKey, r)
require.NoError(t, err)
require.Equal(t, info.Version, r.Version)
require.Equal(t, info.BytesReceived, r.BytesReceived)
@@ -516,13 +553,13 @@ func TestStoredClients(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with clients
err = h.db.Upsert("cl1", &storage.Client{ID: "cl1", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_cl1", &storage.Client{ID: "cl1", T: storage.ClientKey})
require.NoError(t, err)
err = h.db.Upsert("cl2", &storage.Client{ID: "cl2", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_cl2", &storage.Client{ID: "cl2", T: storage.ClientKey})
require.NoError(t, err)
err = h.db.Upsert("cl3", &storage.Client{ID: "cl3", T: storage.ClientKey})
err = h.setKv(storage.ClientKey+"_cl3", &storage.Client{ID: "cl3", T: storage.ClientKey})
require.NoError(t, err)
r, err := h.StoredClients()
@@ -549,13 +586,13 @@ func TestStoredSubscriptions(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with subscriptions
err = h.db.Upsert("sub1", &storage.Subscription{ID: "sub1", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_sub1", &storage.Subscription{ID: "sub1", T: storage.SubscriptionKey})
require.NoError(t, err)
err = h.db.Upsert("sub2", &storage.Subscription{ID: "sub2", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_sub2", &storage.Subscription{ID: "sub2", T: storage.SubscriptionKey})
require.NoError(t, err)
err = h.db.Upsert("sub3", &storage.Subscription{ID: "sub3", T: storage.SubscriptionKey})
err = h.setKv(storage.SubscriptionKey+"_sub3", &storage.Subscription{ID: "sub3", T: storage.SubscriptionKey})
require.NoError(t, err)
r, err := h.StoredSubscriptions()
@@ -582,16 +619,16 @@ func TestStoredRetainedMessages(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Upsert("m1", &storage.Message{ID: "m1", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_m1", &storage.Message{ID: "m1", T: storage.RetainedKey})
require.NoError(t, err)
err = h.db.Upsert("m2", &storage.Message{ID: "m2", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_m2", &storage.Message{ID: "m2", T: storage.RetainedKey})
require.NoError(t, err)
err = h.db.Upsert("m3", &storage.Message{ID: "m3", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_m3", &storage.Message{ID: "m3", T: storage.RetainedKey})
require.NoError(t, err)
err = h.db.Upsert("i3", &storage.Message{ID: "i3", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_i3", &storage.Message{ID: "i3", T: storage.InflightKey})
require.NoError(t, err)
r, err := h.StoredRetainedMessages()
@@ -618,16 +655,16 @@ func TestStoredInflightMessages(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Upsert("i1", &storage.Message{ID: "i1", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_i1", &storage.Message{ID: "i1", T: storage.InflightKey})
require.NoError(t, err)
err = h.db.Upsert("i2", &storage.Message{ID: "i2", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_i2", &storage.Message{ID: "i2", T: storage.InflightKey})
require.NoError(t, err)
err = h.db.Upsert("i3", &storage.Message{ID: "i3", T: storage.InflightKey})
err = h.setKv(storage.InflightKey+"_i3", &storage.Message{ID: "i3", T: storage.InflightKey})
require.NoError(t, err)
err = h.db.Upsert("m1", &storage.Message{ID: "m1", T: storage.RetainedKey})
err = h.setKv(storage.RetainedKey+"_m1", &storage.Message{ID: "m1", T: storage.RetainedKey})
require.NoError(t, err)
r, err := h.StoredInflightMessages()
@@ -654,7 +691,7 @@ func TestStoredSysInfo(t *testing.T) {
defer teardown(t, h.config.Path, h)
// populate with messages
err = h.db.Upsert(storage.SysInfoKey, &storage.SystemInfo{
err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{
ID: storage.SysInfoKey,
Info: system.Info{
Version: "2.0.0",
@@ -707,17 +744,65 @@ func TestDebugf(t *testing.T) {
func TestGcLoop(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
opts := badgerdb.DefaultOptions(defaultDbFile)
opts.ValueLogFileSize = 1 << 20
h.Init(&Options{
GcInterval: 2, // Set the interval for garbage collection.
Options: &badgerhold.Options{
// BadgerDB options. Modify as needed.
Options: badgerdb.Options{
ValueLogFileSize: 1 << 20, // Set the default size of the log file to 1 MB.
},
},
Options: &opts,
})
defer teardown(t, h.config.Path, h)
defer teardown(t, defaultDbFile, h)
h.OnSessionEstablished(client, packets.Packet{})
h.OnDisconnect(client, nil, true)
time.Sleep(3 * time.Second)
}
func TestGetSetDelKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(nil)
defer teardown(t, h.config.Path, h)
require.NoError(t, err)
err = h.setKv("testKey", &storage.Client{ID: "testId"})
require.NoError(t, err)
var obj storage.Client
err = h.getKv("testKey", &obj)
require.NoError(t, err)
err = h.delKv("testKey")
require.NoError(t, err)
err = h.getKv("testKey", &obj)
require.Error(t, err)
require.ErrorIs(t, badgerdb.ErrKeyNotFound, err)
}
func TestIterKv(t *testing.T) {
h := new(Hook)
h.SetOpts(logger, nil)
err := h.Init(nil)
defer teardown(t, h.config.Path, h)
require.NoError(t, err)
h.setKv("prefix_a_1", &storage.Client{ID: "1"})
h.setKv("prefix_a_2", &storage.Client{ID: "2"})
h.setKv("prefix_b_2", &storage.Client{ID: "3"})
var clients []storage.Client
err = h.iterKv("prefix_a", func(data []byte) error {
var item storage.Client
item.UnmarshalBinary(data)
clients = append(clients, item)
return nil
})
require.Equal(t, 2, len(clients))
require.NoError(t, err)
visitErr := errors.New("iter visit error")
err = h.iterKv("prefix_b", func(data []byte) error {
return visitErr
})
require.ErrorIs(t, visitErr, err)
}