From e2cb6888696628b1880bd34d703331f8bfa11fa1 Mon Sep 17 00:00:00 2001 From: werben Date: Tue, 2 Apr 2024 00:51:02 +0800 Subject: [PATCH] Replace badgerhold with directly using BadgerDB v4 (#376) * Replace badgerhold with directly using BadgerDB v4. * Optimize code and test cases. * Optimize code. * Set the default size of the log file to 100 MB. * Resolve merge conflicts. --- examples/persistence/badger/main.go | 24 ++- go.mod | 10 +- go.sum | 43 ++---- hooks/storage/badger/badger.go | 231 +++++++++++++++++----------- hooks/storage/badger/badger_test.go | 187 ++++++++++++++++------ 5 files changed, 309 insertions(+), 186 deletions(-) diff --git a/examples/persistence/badger/main.go b/examples/persistence/badger/main.go index f402d4f..49f4f6b 100644 --- a/examples/persistence/badger/main.go +++ b/examples/persistence/badger/main.go @@ -5,16 +5,16 @@ package main import ( - badgerdb "github.com/dgraph-io/badger" - mqtt "github.com/mochi-mqtt/server/v2" - "github.com/mochi-mqtt/server/v2/hooks/auth" - "github.com/mochi-mqtt/server/v2/hooks/storage/badger" - "github.com/mochi-mqtt/server/v2/listeners" - "github.com/timshannon/badgerhold" "log" "os" "os/signal" "syscall" + + badgerdb "github.com/dgraph-io/badger/v4" + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/hooks/auth" + "github.com/mochi-mqtt/server/v2/hooks/storage/badger" + "github.com/mochi-mqtt/server/v2/listeners" ) func main() { @@ -32,6 +32,9 @@ func main() { server := mqtt.New(nil) _ = server.AddHook(new(auth.AllowHook), nil) + badgerOpts := badgerdb.DefaultOptions(badgerPath) // BadgerDB options. Adjust according to your actual scenario. + badgerOpts.ValueLogFileSize = 100 * (1 << 20) // Set the default size of the log file to 100 MB. + // AddHook adds a BadgerDB hook to the server with the specified options. // GcInterval specifies the interval at which BadgerDB garbage collection process runs. // Refer to https://dgraph.io/docs/badger/get-started/#garbage-collection for more information. @@ -48,14 +51,7 @@ func main() { // Adjust according to your actual scenario. GcDiscardRatio: 0.5, - Options: &badgerhold.Options{ - // BadgerDB options. Adjust according to your actual scenario. - Options: badgerdb.Options{ - NumCompactors: 2, // Number of compactors. Compactions can be expensive. - MaxTableSize: 64 << 20, // Maximum size of each table (64 MB). - ValueLogFileSize: 100 * (1 << 20), // Set the default size of the log file to 100 MB. - }, - }, + Options: &badgerOpts, }) if err != nil { log.Fatal(err) diff --git a/go.mod b/go.mod index 4dc706d..81a20c1 100644 --- a/go.mod +++ b/go.mod @@ -7,19 +7,17 @@ require ( github.com/asdine/storm v2.1.2+incompatible github.com/asdine/storm/v3 v3.2.1 github.com/cockroachdb/pebble v1.1.0 - github.com/dgraph-io/badger v1.6.0 + github.com/dgraph-io/badger/v4 v4.2.0 github.com/go-redis/redis/v8 v8.11.5 github.com/gorilla/websocket v1.5.0 github.com/jinzhu/copier v0.3.5 github.com/rs/xid v1.4.0 github.com/stretchr/testify v1.8.1 - github.com/timshannon/badgerhold v1.0.0 go.etcd.io/bbolt v1.3.5 gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -29,13 +27,16 @@ require ( github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/getsentry/sentry-go v0.18.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/glog v1.0.0 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/klauspost/compress v1.15.15 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect @@ -48,6 +49,7 @@ require ( github.com/prometheus/procfs v0.7.3 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect + go.opencensus.io v0.22.5 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum index db860e6..66267f9 100644 --- a/go.sum +++ b/go.sum @@ -31,9 +31,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= @@ -50,7 +47,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis/v2 v2.23.0 h1:+lwAJYjvvdIVg6doFHuotFjueJ/7KY10xo/vm3X3Scw= github.com/alicebob/miniredis/v2 v2.23.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88= -github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/asdine/storm v2.1.2+incompatible h1:dczuIkyqwY2LrtXPz8ixMrU/OFgZp71kbKTHGrXYt/Q= github.com/asdine/storm v2.1.2+incompatible/go.mod h1:RarYDc9hq1UPLImuiXK3BIWPJLdIygvV3PsInK0FbVQ= github.com/asdine/storm/v3 v3.2.1 h1:I5AqhkPK6nBZ/qJXySdI7ot5BlXSZ7qvDY1zAn5ZJac= @@ -81,16 +77,14 @@ github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwP github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= -github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= -github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= -github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= -github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= +github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= +github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= +github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -101,7 +95,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/getsentry/sentry-go v0.18.0 h1:MtBW5H9QgdcJabtZcuJG80BMOwaBpkRDZkxRkNC1sN0= @@ -124,8 +117,11 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -156,6 +152,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -184,9 +182,7 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -212,12 +208,9 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= -github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -231,7 +224,6 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= -github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -269,16 +261,9 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= -github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= -github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= -github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -290,12 +275,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/timshannon/badgerhold v1.0.0 h1:LtqnDRVP7294FWRiZCIfQa6Tt0bGmlzbO8c364QC2Y8= -github.com/timshannon/badgerhold v1.0.0/go.mod h1:Vv2Jj0PAfzqViEpGvJzLP8PY07x1iXLgKRuLY7bqPOE= -github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= -github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -310,8 +291,9 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -403,7 +385,6 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -413,7 +394,6 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -440,6 +420,7 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/hooks/storage/badger/badger.go b/hooks/storage/badger/badger.go index 3868ee2..2d3a1be 100644 --- a/hooks/storage/badger/badger.go +++ b/hooks/storage/badger/badger.go @@ -11,12 +11,11 @@ import ( "strings" "time" + badgerdb "github.com/dgraph-io/badger/v4" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/hooks/storage" "github.com/mochi-mqtt/server/v2/packets" "github.com/mochi-mqtt/server/v2/system" - - "github.com/timshannon/badgerhold" ) const ( @@ -28,7 +27,7 @@ const ( // clientKey returns a primary key for a client. func clientKey(cl *mqtt.Client) string { - return cl.ID + return storage.ClientKey + "_" + cl.ID } // subscriptionKey returns a primary key for a subscription. @@ -51,9 +50,15 @@ func sysInfoKey() string { return storage.SysInfoKey } +// Serializable is an interface for objects that can be serialized and deserialized. +type Serializable interface { + UnmarshalBinary([]byte) error + MarshalBinary() (data []byte, err error) +} + // Options contains configuration settings for the BadgerDB instance. type Options struct { - Options *badgerhold.Options + Options *badgerdb.Options Path string `yaml:"path" json:"path"` // GcDiscardRatio specifies the ratio of log discard compared to the maximum possible log discard. // Setting it to a higher value would result in fewer space reclaims, while setting it to a lower value @@ -66,9 +71,9 @@ type Options struct { // Hook is a persistent storage hook based using BadgerDB file store as a backend. type Hook struct { mqtt.HookBase - config *Options // options for configuring the BadgerDB instance. - gcTicker *time.Ticker // Ticker for BadgerDB garbage collection. - db *badgerhold.Store // the BadgerDB instance. + config *Options // options for configuring the BadgerDB instance. + gcTicker *time.Ticker // Ticker for BadgerDB garbage collection. + db *badgerdb.DB // the BadgerDB instance. } // ID returns the id of the hook. @@ -102,12 +107,12 @@ func (h *Hook) Provides(b byte) bool { // GcLoop periodically runs the garbage collection process to reclaim space in the value log files. // It uses a ticker to trigger the garbage collection at regular intervals specified by the configuration. // Refer to: https://dgraph.io/docs/badger/get-started/#garbage-collection -func (h *Hook) GcLoop() { +func (h *Hook) gcLoop() { for range h.gcTicker.C { again: // Run the garbage collection process with a threshold. // If the process returns nil (success), repeat the process. - err := h.db.Badger().RunValueLogGC(h.config.GcDiscardRatio) + err := h.db.RunValueLogGC(h.config.GcDiscardRatio) if err == nil { goto again // Retry garbage collection if successful. } @@ -121,11 +126,12 @@ func (h *Hook) Init(config any) error { } if config == nil { - config = new(Options) + h.config = new(Options) + } else { + h.config = config.(*Options) } - h.config = config.(*Options) - if h.config.Path == "" { + if len(h.config.Path) == 0 { h.config.Path = defaultDbFile } @@ -137,19 +143,20 @@ func (h *Hook) Init(config any) error { h.config.GcDiscardRatio = defaultGcDiscardRatio } - options := badgerhold.DefaultOptions - options.Dir = h.config.Path - options.ValueDir = h.config.Path - options.Logger = h + if h.config.Options == nil { + defaultOpts := badgerdb.DefaultOptions(h.config.Path) + h.config.Options = &defaultOpts + } + h.config.Options.Logger = h var err error - h.db, err = badgerhold.Open(options) + h.db, err = badgerdb.Open(*h.config.Options) if err != nil { return err } h.gcTicker = time.NewTicker(time.Duration(h.config.GcInterval) * time.Second) - go h.GcLoop() + go h.gcLoop() return nil } @@ -181,7 +188,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) { props := cl.Properties.Props.Copy(false) in := &storage.Client{ - ID: clientKey(cl), + ID: cl.ID, T: storage.ClientKey, Remote: cl.Net.Remote, Listener: cl.Net.Listener, @@ -202,7 +209,7 @@ func (h *Hook) updateClient(cl *mqtt.Client) { Will: storage.ClientWill(cl.Properties.Will), } - err := h.db.Upsert(in.ID, in) + err := h.setKv(clientKey(cl), in) if err != nil { h.Log.Error("failed to upsert client data", "error", err, "data", in) } @@ -225,10 +232,7 @@ func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) { return } - err := h.db.Delete(clientKey(cl), new(storage.Client)) - if err != nil { - h.Log.Error("failed to delete client data", "error", err, "data", clientKey(cl)) - } + _ = h.delKv(clientKey(cl)) } // OnSubscribed adds one or more client subscriptions to the store. @@ -252,10 +256,7 @@ func (h *Hook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []by RetainAsPublished: pk.Filters[i].RetainAsPublished, } - err := h.db.Upsert(in.ID, in) - if err != nil { - h.Log.Error("failed to upsert subscription data", "error", err, "data", in) - } + _ = h.setKv(in.ID, in) } } @@ -267,10 +268,7 @@ func (h *Hook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) { } for i := 0; i < len(pk.Filters); i++ { - err := h.db.Delete(subscriptionKey(cl, pk.Filters[i].Filter), new(storage.Subscription)) - if err != nil { - h.Log.Error("failed to delete subscription data", "error", err, "data", subscriptionKey(cl, pk.Filters[i].Filter)) - } + _ = h.delKv(subscriptionKey(cl, pk.Filters[i].Filter)) } } @@ -282,11 +280,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { } if r == -1 { - err := h.db.Delete(retainedKey(pk.TopicName), new(storage.Message)) - if err != nil { - h.Log.Error("failed to delete retained message data", "error", err, "data", retainedKey(pk.TopicName)) - } - + _ = h.delKv(retainedKey(pk.TopicName)) return } @@ -311,10 +305,7 @@ func (h *Hook) OnRetainMessage(cl *mqtt.Client, pk packets.Packet, r int64) { }, } - err := h.db.Upsert(in.ID, in) - if err != nil { - h.Log.Error("failed to upsert retained message data", "error", err, "data", in) - } + _ = h.setKv(in.ID, in) } // OnQosPublish adds or updates an inflight message in the store. @@ -347,10 +338,7 @@ func (h *Hook) OnQosPublish(cl *mqtt.Client, pk packets.Packet, sent int64, rese }, } - err := h.db.Upsert(in.ID, in) - if err != nil { - h.Log.Error("failed to upsert qos inflight data", "error", err, "data", in) - } + _ = h.setKv(in.ID, in) } // OnQosComplete removes a resolved inflight message from the store. @@ -360,10 +348,7 @@ func (h *Hook) OnQosComplete(cl *mqtt.Client, pk packets.Packet) { return } - err := h.db.Delete(inflightKey(cl, pk), new(storage.Message)) - if err != nil { - h.Log.Error("failed to delete inflight message data", "error", err, "data", inflightKey(cl, pk)) - } + _ = h.delKv(inflightKey(cl, pk)) } // OnQosDropped removes a dropped inflight message from the store. @@ -388,10 +373,7 @@ func (h *Hook) OnSysInfoTick(sys *system.Info) { Info: *sys.Clone(), } - err := h.db.Upsert(in.ID, in) - if err != nil { - h.Log.Error("failed to upsert $SYS data", "error", err, "data", in) - } + _ = h.setKv(in.ID, in) } // OnRetainedExpired deletes expired retained messages from the store. @@ -401,10 +383,7 @@ func (h *Hook) OnRetainedExpired(filter string) { return } - err := h.db.Delete(retainedKey(filter), new(storage.Message)) - if err != nil { - h.Log.Error("failed to delete expired retained message data", "error", err, "id", retainedKey(filter)) - } + _ = h.delKv(retainedKey(filter)) } // OnClientExpired deleted expired clients from the store. @@ -414,10 +393,7 @@ func (h *Hook) OnClientExpired(cl *mqtt.Client) { return } - err := h.db.Delete(clientKey(cl), new(storage.Client)) - if err != nil { - h.Log.Error("failed to delete expired client data", "error", err, "id", clientKey(cl)) - } + _ = h.delKv(clientKey(cl)) } // StoredClients returns all stored clients from the store. @@ -427,12 +403,15 @@ func (h *Hook) StoredClients() (v []storage.Client, err error) { return } - err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.ClientKey)) - if err != nil && !errors.Is(err, badgerhold.ErrNotFound) { - return - } - - return v, nil + err = h.iterKv(storage.ClientKey, func(value []byte) error { + obj := storage.Client{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + return } // StoredSubscriptions returns all stored subscriptions from the store. @@ -442,12 +421,16 @@ func (h *Hook) StoredSubscriptions() (v []storage.Subscription, err error) { return } - err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.SubscriptionKey)) - if err != nil && !errors.Is(err, badgerhold.ErrNotFound) { - return - } - - return v, nil + v = make([]storage.Subscription, 0) + err = h.iterKv(storage.SubscriptionKey, func(value []byte) error { + obj := storage.Subscription{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + return } // StoredRetainedMessages returns all stored retained messages from the store. @@ -457,12 +440,20 @@ func (h *Hook) StoredRetainedMessages() (v []storage.Message, err error) { return } - err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.RetainedKey)) - if err != nil && !errors.Is(err, badgerhold.ErrNotFound) { + v = make([]storage.Message, 0) + err = h.iterKv(storage.RetainedKey, func(value []byte) error { + obj := storage.Message{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + + if err != nil && !errors.Is(err, badgerdb.ErrKeyNotFound) { return } - - return v, nil + return } // StoredInflightMessages returns all stored inflight messages from the store. @@ -472,12 +463,16 @@ func (h *Hook) StoredInflightMessages() (v []storage.Message, err error) { return } - err = h.db.Find(&v, badgerhold.Where("T").Eq(storage.InflightKey)) - if err != nil && !errors.Is(err, badgerhold.ErrNotFound) { - return - } - - return v, nil + v = make([]storage.Message, 0) + err = h.iterKv(storage.InflightKey, func(value []byte) error { + obj := storage.Message{} + err = obj.UnmarshalBinary(value) + if err == nil { + v = append(v, obj) + } + return err + }) + return } // StoredSysInfo returns the system info from the store. @@ -487,11 +482,10 @@ func (h *Hook) StoredSysInfo() (v storage.SystemInfo, err error) { return } - err = h.db.Get(storage.SysInfoKey, &v) - if err != nil && !errors.Is(err, badgerhold.ErrNotFound) { + err = h.getKv(storage.SysInfoKey, &v) + if err != nil && !errors.Is(err, badgerdb.ErrKeyNotFound) { return } - return v, nil } @@ -515,3 +509,68 @@ func (h *Hook) Infof(m string, v ...interface{}) { func (h *Hook) Debugf(m string, v ...interface{}) { h.Log.Debug(fmt.Sprintf(strings.ToLower(strings.Trim(m, "\n")), v...), "v", v) } + +// setKv stores a key-value pair in the database. +func (h *Hook) setKv(k string, v storage.Serializable) error { + err := h.db.Update(func(txn *badgerdb.Txn) error { + data, _ := v.MarshalBinary() + return txn.Set([]byte(k), data) + }) + if err != nil { + h.Log.Error("failed to upsert data", "error", err, "key", k) + } + return err +} + +// delKv deletes a key-value pair from the database. +func (h *Hook) delKv(k string) error { + err := h.db.Update(func(txn *badgerdb.Txn) error { + return txn.Delete([]byte(k)) + }) + + if err != nil { + h.Log.Error("failed to delete data", "error", err, "key", k) + } + return err +} + +// getKv retrieves the value associated with a key from the database. +func (h *Hook) getKv(k string, v storage.Serializable) error { + return h.db.View(func(txn *badgerdb.Txn) error { + item, err := txn.Get([]byte(k)) + if err != nil { + return err + } + value, err := item.ValueCopy(nil) + if err != nil { + return err + } + return v.UnmarshalBinary(value) + }) +} + +// iterKv iterates over key-value pairs with keys having the specified prefix in the database. +func (h *Hook) iterKv(prefix string, visit func([]byte) error) error { + err := h.db.View(func(txn *badgerdb.Txn) error { + iterator := txn.NewIterator(badgerdb.DefaultIteratorOptions) + defer iterator.Close() + + for iterator.Seek([]byte(prefix)); iterator.ValidForPrefix([]byte(prefix)); iterator.Next() { + item := iterator.Item() + value, err := item.ValueCopy(nil) + + if err != nil { + return err + } + + if err := visit(value); err != nil { + return err + } + } + return nil + }) + if err != nil { + h.Log.Error("failed to find data", "error", err, "prefix", prefix) + } + return err +} diff --git a/hooks/storage/badger/badger_test.go b/hooks/storage/badger/badger_test.go index 7553fe7..028c00c 100644 --- a/hooks/storage/badger/badger_test.go +++ b/hooks/storage/badger/badger_test.go @@ -5,19 +5,19 @@ package badger import ( + "errors" "log/slog" "os" "strings" "testing" "time" - badgerdb "github.com/dgraph-io/badger" + badgerdb "github.com/dgraph-io/badger/v4" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/hooks/storage" "github.com/mochi-mqtt/server/v2/packets" "github.com/mochi-mqtt/server/v2/system" "github.com/stretchr/testify/require" - "github.com/timshannon/badgerhold" ) var ( @@ -40,14 +40,37 @@ var ( func teardown(t *testing.T, path string, h *Hook) { _ = h.Stop() - _ = h.db.Badger().Close() + _ = h.db.Close() err := os.RemoveAll("./" + strings.Replace(path, "..", "", -1)) require.NoError(t, err) } +func TestSetGetDelKv(t *testing.T) { + h := new(Hook) + h.SetOpts(logger, nil) + h.Init(nil) + defer teardown(t, h.config.Path, h) + + key := "testKey" + value := &storage.Client{ID: "cl1"} + err := h.setKv(key, value) + require.NoError(t, err) + + var client storage.Client + err = h.getKv(key, &client) + require.NoError(t, err) + require.Equal(t, "cl1", client.ID) + + err = h.delKv(key) + require.NoError(t, err) + + err = h.getKv(key, &client) + require.ErrorIs(t, badgerdb.ErrKeyNotFound, err) +} + func TestClientKey(t *testing.T) { k := clientKey(&mqtt.Client{ID: "cl1"}) - require.Equal(t, "cl1", k) + require.Equal(t, storage.ClientKey+"_cl1", k) } func TestSubscriptionKey(t *testing.T) { @@ -102,6 +125,19 @@ func TestInitBadConfig(t *testing.T) { require.Error(t, err) } +func TestInitBadOption(t *testing.T) { + h := new(Hook) + h.SetOpts(logger, nil) + + err := h.Init(&Options{ + Options: &badgerdb.Options{ + NumCompactors: 1, + }, + }) + // Cannot have 1 compactor. Need at least 2 + require.Error(t, err) +} + func TestInitUseDefaults(t *testing.T) { h := new(Hook) h.SetOpts(logger, nil) @@ -122,7 +158,7 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) { h.OnSessionEstablished(client, packets.Packet{}) r := new(storage.Client) - err = h.db.Get(clientKey(client), r) + err = h.getKv(clientKey(client), r) require.NoError(t, err) require.Equal(t, client.ID, r.ID) require.Equal(t, client.Properties.Username, r.Username) @@ -133,15 +169,15 @@ func TestOnSessionEstablishedThenOnDisconnect(t *testing.T) { h.OnDisconnect(client, nil, false) r2 := new(storage.Client) - err = h.db.Get(clientKey(client), r2) + err = h.getKv(clientKey(client), r2) require.NoError(t, err) require.Equal(t, client.ID, r.ID) h.OnDisconnect(client, nil, true) r3 := new(storage.Client) - err = h.db.Get(clientKey(client), r3) + err = h.getKv(clientKey(client), r3) require.Error(t, err) - require.ErrorIs(t, badgerhold.ErrNotFound, err) + require.ErrorIs(t, badgerdb.ErrKeyNotFound, err) require.Empty(t, r3.ID) } @@ -155,18 +191,19 @@ func TestOnClientExpired(t *testing.T) { cl := &mqtt.Client{ID: "cl1"} clientKey := clientKey(cl) - err = h.db.Upsert(clientKey, &storage.Client{ID: cl.ID}) + err = h.setKv(clientKey, &storage.Client{ID: cl.ID}) require.NoError(t, err) r := new(storage.Client) - err = h.db.Get(clientKey, r) + err = h.getKv(clientKey, r) + require.NoError(t, err) require.Equal(t, cl.ID, r.ID) h.OnClientExpired(cl) - err = h.db.Get(clientKey, r) + err = h.getKv(clientKey, r) require.Error(t, err) - require.ErrorIs(t, badgerhold.ErrNotFound, err) + require.ErrorIs(t, badgerdb.ErrKeyNotFound, err) } func TestOnClientExpiredNoDB(t *testing.T) { @@ -211,7 +248,7 @@ func TestOnWillSent(t *testing.T) { h.OnWillSent(c1, packets.Packet{}) r := new(storage.Client) - err = h.db.Get(clientKey(client), r) + err = h.getKv(clientKey(client), r) require.NoError(t, err) require.Equal(t, uint32(1), r.Will.Flag) @@ -266,16 +303,16 @@ func TestOnSubscribedThenOnUnsubscribed(t *testing.T) { h.OnSubscribed(client, pkf, []byte{0}) r := new(storage.Subscription) - err = h.db.Get(subscriptionKey(client, pkf.Filters[0].Filter), r) + err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r) require.NoError(t, err) require.Equal(t, client.ID, r.Client) require.Equal(t, pkf.Filters[0].Filter, r.Filter) require.Equal(t, byte(0), r.Qos) h.OnUnsubscribed(client, pkf) - err = h.db.Get(subscriptionKey(client, pkf.Filters[0].Filter), r) + err = h.getKv(subscriptionKey(client, pkf.Filters[0].Filter), r) require.Error(t, err) - require.Equal(t, badgerhold.ErrNotFound, err) + require.Equal(t, badgerdb.ErrKeyNotFound, err) } func TestOnSubscribedNoDB(t *testing.T) { @@ -326,21 +363,21 @@ func TestOnRetainMessageThenUnset(t *testing.T) { h.OnRetainMessage(client, pk, 1) r := new(storage.Message) - err = h.db.Get(retainedKey(pk.TopicName), r) + err = h.getKv(retainedKey(pk.TopicName), r) require.NoError(t, err) require.Equal(t, pk.TopicName, r.TopicName) require.Equal(t, pk.Payload, r.Payload) h.OnRetainMessage(client, pk, -1) - err = h.db.Get(retainedKey(pk.TopicName), r) + err = h.getKv(retainedKey(pk.TopicName), r) require.Error(t, err) - require.ErrorIs(t, err, badgerhold.ErrNotFound) + require.ErrorIs(t, err, badgerdb.ErrKeyNotFound) // coverage: delete deleted h.OnRetainMessage(client, pk, -1) - err = h.db.Get(retainedKey(pk.TopicName), r) + err = h.getKv(retainedKey(pk.TopicName), r) require.Error(t, err) - require.ErrorIs(t, err, badgerhold.ErrNotFound) + require.ErrorIs(t, err, badgerdb.ErrKeyNotFound) } func TestOnRetainedExpired(t *testing.T) { @@ -356,18 +393,18 @@ func TestOnRetainedExpired(t *testing.T) { TopicName: "a/b/c", } - err = h.db.Upsert(m.ID, m) + err = h.setKv(m.ID, m) require.NoError(t, err) r := new(storage.Message) - err = h.db.Get(m.ID, r) + err = h.getKv(m.ID, r) require.NoError(t, err) require.Equal(t, m.TopicName, r.TopicName) h.OnRetainedExpired(m.TopicName) - err = h.db.Get(m.ID, r) + err = h.getKv(m.ID, r) require.Error(t, err) - require.ErrorIs(t, err, badgerhold.ErrNotFound) + require.ErrorIs(t, err, badgerdb.ErrKeyNotFound) } func TestOnRetainExpiredNoDB(t *testing.T) { @@ -419,7 +456,7 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) { h.OnQosPublish(client, pk, time.Now().Unix(), 0) r := new(storage.Message) - err = h.db.Get(inflightKey(client, pk), r) + err = h.getKv(inflightKey(client, pk), r) require.NoError(t, err) require.Equal(t, pk.TopicName, r.TopicName) require.Equal(t, pk.Payload, r.Payload) @@ -430,9 +467,9 @@ func TestOnQosPublishThenQOSComplete(t *testing.T) { // OnQosDropped is a passthrough to OnQosComplete here h.OnQosDropped(client, pk) - err = h.db.Get(inflightKey(client, pk), r) + err = h.getKv(inflightKey(client, pk), r) require.Error(t, err) - require.ErrorIs(t, err, badgerhold.ErrNotFound) + require.ErrorIs(t, err, badgerdb.ErrKeyNotFound) } func TestOnQosPublishNoDB(t *testing.T) { @@ -486,7 +523,7 @@ func TestOnSysInfoTick(t *testing.T) { h.OnSysInfoTick(info) r := new(storage.SystemInfo) - err = h.db.Get(storage.SysInfoKey, r) + err = h.getKv(storage.SysInfoKey, r) require.NoError(t, err) require.Equal(t, info.Version, r.Version) require.Equal(t, info.BytesReceived, r.BytesReceived) @@ -516,13 +553,13 @@ func TestStoredClients(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with clients - err = h.db.Upsert("cl1", &storage.Client{ID: "cl1", T: storage.ClientKey}) + err = h.setKv(storage.ClientKey+"_cl1", &storage.Client{ID: "cl1", T: storage.ClientKey}) require.NoError(t, err) - err = h.db.Upsert("cl2", &storage.Client{ID: "cl2", T: storage.ClientKey}) + err = h.setKv(storage.ClientKey+"_cl2", &storage.Client{ID: "cl2", T: storage.ClientKey}) require.NoError(t, err) - err = h.db.Upsert("cl3", &storage.Client{ID: "cl3", T: storage.ClientKey}) + err = h.setKv(storage.ClientKey+"_cl3", &storage.Client{ID: "cl3", T: storage.ClientKey}) require.NoError(t, err) r, err := h.StoredClients() @@ -549,13 +586,13 @@ func TestStoredSubscriptions(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with subscriptions - err = h.db.Upsert("sub1", &storage.Subscription{ID: "sub1", T: storage.SubscriptionKey}) + err = h.setKv(storage.SubscriptionKey+"_sub1", &storage.Subscription{ID: "sub1", T: storage.SubscriptionKey}) require.NoError(t, err) - err = h.db.Upsert("sub2", &storage.Subscription{ID: "sub2", T: storage.SubscriptionKey}) + err = h.setKv(storage.SubscriptionKey+"_sub2", &storage.Subscription{ID: "sub2", T: storage.SubscriptionKey}) require.NoError(t, err) - err = h.db.Upsert("sub3", &storage.Subscription{ID: "sub3", T: storage.SubscriptionKey}) + err = h.setKv(storage.SubscriptionKey+"_sub3", &storage.Subscription{ID: "sub3", T: storage.SubscriptionKey}) require.NoError(t, err) r, err := h.StoredSubscriptions() @@ -582,16 +619,16 @@ func TestStoredRetainedMessages(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with messages - err = h.db.Upsert("m1", &storage.Message{ID: "m1", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_m1", &storage.Message{ID: "m1", T: storage.RetainedKey}) require.NoError(t, err) - err = h.db.Upsert("m2", &storage.Message{ID: "m2", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_m2", &storage.Message{ID: "m2", T: storage.RetainedKey}) require.NoError(t, err) - err = h.db.Upsert("m3", &storage.Message{ID: "m3", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_m3", &storage.Message{ID: "m3", T: storage.RetainedKey}) require.NoError(t, err) - err = h.db.Upsert("i3", &storage.Message{ID: "i3", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_i3", &storage.Message{ID: "i3", T: storage.InflightKey}) require.NoError(t, err) r, err := h.StoredRetainedMessages() @@ -618,16 +655,16 @@ func TestStoredInflightMessages(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with messages - err = h.db.Upsert("i1", &storage.Message{ID: "i1", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_i1", &storage.Message{ID: "i1", T: storage.InflightKey}) require.NoError(t, err) - err = h.db.Upsert("i2", &storage.Message{ID: "i2", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_i2", &storage.Message{ID: "i2", T: storage.InflightKey}) require.NoError(t, err) - err = h.db.Upsert("i3", &storage.Message{ID: "i3", T: storage.InflightKey}) + err = h.setKv(storage.InflightKey+"_i3", &storage.Message{ID: "i3", T: storage.InflightKey}) require.NoError(t, err) - err = h.db.Upsert("m1", &storage.Message{ID: "m1", T: storage.RetainedKey}) + err = h.setKv(storage.RetainedKey+"_m1", &storage.Message{ID: "m1", T: storage.RetainedKey}) require.NoError(t, err) r, err := h.StoredInflightMessages() @@ -654,7 +691,7 @@ func TestStoredSysInfo(t *testing.T) { defer teardown(t, h.config.Path, h) // populate with messages - err = h.db.Upsert(storage.SysInfoKey, &storage.SystemInfo{ + err = h.setKv(storage.SysInfoKey, &storage.SystemInfo{ ID: storage.SysInfoKey, Info: system.Info{ Version: "2.0.0", @@ -707,17 +744,65 @@ func TestDebugf(t *testing.T) { func TestGcLoop(t *testing.T) { h := new(Hook) h.SetOpts(logger, nil) + opts := badgerdb.DefaultOptions(defaultDbFile) + opts.ValueLogFileSize = 1 << 20 h.Init(&Options{ GcInterval: 2, // Set the interval for garbage collection. - Options: &badgerhold.Options{ - // BadgerDB options. Modify as needed. - Options: badgerdb.Options{ - ValueLogFileSize: 1 << 20, // Set the default size of the log file to 1 MB. - }, - }, + Options: &opts, }) - defer teardown(t, h.config.Path, h) + defer teardown(t, defaultDbFile, h) h.OnSessionEstablished(client, packets.Packet{}) h.OnDisconnect(client, nil, true) time.Sleep(3 * time.Second) } + +func TestGetSetDelKv(t *testing.T) { + h := new(Hook) + h.SetOpts(logger, nil) + err := h.Init(nil) + defer teardown(t, h.config.Path, h) + require.NoError(t, err) + + err = h.setKv("testKey", &storage.Client{ID: "testId"}) + require.NoError(t, err) + + var obj storage.Client + err = h.getKv("testKey", &obj) + require.NoError(t, err) + + err = h.delKv("testKey") + require.NoError(t, err) + + err = h.getKv("testKey", &obj) + require.Error(t, err) + require.ErrorIs(t, badgerdb.ErrKeyNotFound, err) +} + +func TestIterKv(t *testing.T) { + h := new(Hook) + h.SetOpts(logger, nil) + + err := h.Init(nil) + defer teardown(t, h.config.Path, h) + require.NoError(t, err) + + h.setKv("prefix_a_1", &storage.Client{ID: "1"}) + h.setKv("prefix_a_2", &storage.Client{ID: "2"}) + h.setKv("prefix_b_2", &storage.Client{ID: "3"}) + + var clients []storage.Client + err = h.iterKv("prefix_a", func(data []byte) error { + var item storage.Client + item.UnmarshalBinary(data) + clients = append(clients, item) + return nil + }) + require.Equal(t, 2, len(clients)) + require.NoError(t, err) + + visitErr := errors.New("iter visit error") + err = h.iterKv("prefix_b", func(data []byte) error { + return visitErr + }) + require.ErrorIs(t, visitErr, err) +}