mirror of
https://github.com/mochi-mqtt/server.git
synced 2025-09-26 20:21:12 +08:00
Add BadgerDB garbage collection. (#371)
* For issues #370, #369, and #363, add BadgerDB garbage collection. * Add default configuration for defaultGcInterval. * Solve DATA RACE. * Place Badger's configuration in main.go for users to adjust as needed. * Add TestGcLoop() for coverage. * Modify GcInterval to shorten test time. * Add the GcDiscardRatio option for the Badger hook, and include more detailed comments in the example. --------- Co-authored-by: JB <28275108+mochi-co@users.noreply.github.com>
This commit is contained in:
@@ -9,11 +9,14 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
badgerdb "github.com/dgraph-io/badger"
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/auth"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage/badger"
|
||||
"github.com/mochi-mqtt/server/v2/listeners"
|
||||
"github.com/timshannon/badgerhold"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -31,8 +34,30 @@ func main() {
|
||||
server := mqtt.New(nil)
|
||||
_ = server.AddHook(new(auth.AllowHook), nil)
|
||||
|
||||
// AddHook adds a BadgerDB hook to the server with the specified options.
|
||||
// GcInterval specifies the interval at which BadgerDB garbage collection process runs.
|
||||
// Refer to https://dgraph.io/docs/badger/get-started/#garbage-collection for more information.
|
||||
err := server.AddHook(new(badger.Hook), &badger.Options{
|
||||
Path: badgerPath,
|
||||
Path: badgerPath,
|
||||
|
||||
// Set the interval for garbage collection. Adjust according to your actual scenario.
|
||||
GcInterval: 5 * time.Minute,
|
||||
|
||||
// GcDiscardRatio specifies the ratio of log discard compared to the maximum possible log discard.
|
||||
// Setting it to a higher value would result in fewer space reclaims, while setting it to a lower value
|
||||
// would result in more space reclaims at the cost of increased activity on the LSM tree.
|
||||
// discardRatio must be in the range (0.0, 1.0), both endpoints excluded, otherwise, it will be set to the default value of 0.5.
|
||||
// Adjust according to your actual scenario.
|
||||
GcDiscardRatio: 0.5,
|
||||
|
||||
Options: &badgerhold.Options{
|
||||
// BadgerDB options. Adjust according to your actual scenario.
|
||||
Options: badgerdb.Options{
|
||||
NumCompactors: 2, // Number of compactors. Compactions can be expensive.
|
||||
MaxTableSize: 64 << 20, // Maximum size of each table (64 MB).
|
||||
ValueLogFileSize: 100 * (1 << 20), // Set the default size of the log file to 100 MB.
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage"
|
||||
@@ -21,6 +22,8 @@ import (
|
||||
const (
|
||||
// defaultDbFile is the default file path for the badger db file.
|
||||
defaultDbFile = ".badger"
|
||||
defaultGcInterval = 5 * time.Minute
|
||||
defaultGcDiscardRatio = 0.5
|
||||
)
|
||||
|
||||
// clientKey returns a primary key for a client.
|
||||
@@ -51,6 +54,15 @@ func sysInfoKey() string {
|
||||
// Options contains configuration settings for the BadgerDB instance.
|
||||
type Options struct {
|
||||
Options *badgerhold.Options
|
||||
|
||||
// The interval for garbage collection.
|
||||
GcInterval time.Duration
|
||||
|
||||
// GcDiscardRatio specifies the ratio of log discard compared to the maximum possible log discard.
|
||||
// Setting it to a higher value would result in fewer space reclaims, while setting it to a lower value
|
||||
// would result in more space reclaims at the cost of increased activity on the LSM tree.
|
||||
// discardRatio must be in the range (0.0, 1.0), both endpoints excluded, otherwise, it will be set to the default value of 0.5.
|
||||
GcDiscardRatio float64
|
||||
Path string
|
||||
}
|
||||
|
||||
@@ -58,6 +70,7 @@ type Options struct {
|
||||
type Hook struct {
|
||||
mqtt.HookBase
|
||||
config *Options // options for configuring the BadgerDB instance.
|
||||
gcTicker *time.Ticker // Ticker for BadgerDB garbage collection.
|
||||
db *badgerhold.Store // the BadgerDB instance.
|
||||
}
|
||||
|
||||
@@ -89,6 +102,21 @@ func (h *Hook) Provides(b byte) bool {
|
||||
}, []byte{b})
|
||||
}
|
||||
|
||||
// GcLoop periodically runs the garbage collection process to reclaim space in the value log files.
|
||||
// It uses a ticker to trigger the garbage collection at regular intervals specified by the configuration.
|
||||
// Refer to: https://dgraph.io/docs/badger/get-started/#garbage-collection
|
||||
func (h *Hook) GcLoop() {
|
||||
for range h.gcTicker.C {
|
||||
again:
|
||||
// Run the garbage collection process with a threshold.
|
||||
// If the process returns nil (success), repeat the process.
|
||||
err := h.db.Badger().RunValueLogGC(h.config.GcDiscardRatio)
|
||||
if err == nil {
|
||||
goto again // Retry garbage collection if successful.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes and connects to the badger instance.
|
||||
func (h *Hook) Init(config any) error {
|
||||
if _, ok := config.(*Options); !ok && config != nil {
|
||||
@@ -104,6 +132,14 @@ func (h *Hook) Init(config any) error {
|
||||
h.config.Path = defaultDbFile
|
||||
}
|
||||
|
||||
if h.config.GcInterval == 0 {
|
||||
h.config.GcInterval = defaultGcInterval
|
||||
}
|
||||
|
||||
if h.config.GcDiscardRatio <= 0.0 || h.config.GcDiscardRatio >= 1.0{
|
||||
h.config.GcDiscardRatio = defaultGcDiscardRatio
|
||||
}
|
||||
|
||||
options := badgerhold.DefaultOptions
|
||||
options.Dir = h.config.Path
|
||||
options.ValueDir = h.config.Path
|
||||
@@ -115,11 +151,17 @@ func (h *Hook) Init(config any) error {
|
||||
return err
|
||||
}
|
||||
|
||||
h.gcTicker = time.NewTicker(h.config.GcInterval)
|
||||
go h.GcLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop closes the badger instance.
|
||||
func (h *Hook) Stop() error {
|
||||
if h.gcTicker != nil {
|
||||
h.gcTicker.Stop()
|
||||
}
|
||||
return h.db.Close()
|
||||
}
|
||||
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
badgerdb "github.com/dgraph-io/badger"
|
||||
mqtt "github.com/mochi-mqtt/server/v2"
|
||||
"github.com/mochi-mqtt/server/v2/hooks/storage"
|
||||
"github.com/mochi-mqtt/server/v2/packets"
|
||||
@@ -702,3 +703,21 @@ func TestDebugf(t *testing.T) {
|
||||
h.SetOpts(logger, nil)
|
||||
h.Debugf("test", 1, 2, 3)
|
||||
}
|
||||
|
||||
func TestGcLoop(t *testing.T) {
|
||||
h := new(Hook)
|
||||
h.SetOpts(logger, nil)
|
||||
h.Init(&Options{
|
||||
GcInterval: 2 * time.Second, // Set the interval for garbage collection.
|
||||
Options: &badgerhold.Options{
|
||||
// BadgerDB options. Modify as needed.
|
||||
Options: badgerdb.Options{
|
||||
ValueLogFileSize: 1 << 20, // Set the default size of the log file to 1 MB.
|
||||
},
|
||||
},
|
||||
})
|
||||
defer teardown(t, h.config.Path, h)
|
||||
h.OnSessionEstablished(client, packets.Packet{})
|
||||
h.OnDisconnect(client, nil, true)
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
Reference in New Issue
Block a user