Implemented options pattern for AOF engine

This commit is contained in:
Kelvin Clement Mwinuka
2024-02-25 16:15:27 +08:00
parent 83f070986d
commit 2bdfc2e15c
2 changed files with 112 additions and 57 deletions

View File

@@ -1,71 +1,120 @@
package aof package aof
import ( import (
"context"
logstore "github.com/echovault/echovault/src/server/aof/log" logstore "github.com/echovault/echovault/src/server/aof/log"
"github.com/echovault/echovault/src/server/aof/preamble" "github.com/echovault/echovault/src/server/aof/preamble"
"github.com/echovault/echovault/src/utils"
"log" "log"
"net"
"sync" "sync"
) )
// This package handles AOF logging in standalone mode only. // This package handles AOF logging in standalone mode only.
// Logging in clusters is handled in the raft layer. // Logging in clusters is handled in the raft layer.
type Opts struct {
Config utils.Config
GetState func() map[string]interface{}
StartRewriteAOF func()
FinishRewriteAOF func()
CreateKeyAndLock func(ctx context.Context, key string) (bool, error)
KeyUnlock func(key string)
SetValue func(ctx context.Context, key string, value interface{})
HandleCommand func(ctx context.Context, command []byte, conn *net.Conn, replay bool) ([]byte, error)
}
type Engine struct { type Engine struct {
options Opts syncStrategy string
directory string
preambleRW preamble.PreambleReadWriter
appendRW logstore.AppendReadWriter
mut sync.Mutex mut sync.Mutex
logChan chan []byte logChan chan []byte
logCount uint64 logCount uint64
preambleStore *preamble.PreambleStore preambleStore *preamble.PreambleStore
appendStore *logstore.AppendStore appendStore *logstore.AppendStore
startRewrite func()
finishRewrite func()
getState func() map[string]interface{}
setValue func(key string, value interface{})
handleCommand func(command []byte)
} }
func NewAOFEngine(opts Opts, appendRW logstore.AppendReadWriter, preambleRW preamble.PreambleReadWriter) (*Engine, error) { func WithStrategy(strategy string) func(engine *Engine) {
return func(engine *Engine) {
engine.syncStrategy = strategy
}
}
func WithDirectory(directory string) func(engine *Engine) {
return func(engine *Engine) {
engine.directory = directory
}
}
func WithStartRewriteFunc(f func()) func(engine *Engine) {
return func(engine *Engine) {
engine.startRewrite = f
}
}
func WithFinishRewriteFunc(f func()) func(engine *Engine) {
return func(engine *Engine) {
engine.finishRewrite = f
}
}
func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) {
return func(engine *Engine) {
engine.getState = f
}
}
func WithSetValueFunc(f func(key string, value interface{})) func(engine *Engine) {
return func(engine *Engine) {
engine.setValue = f
}
}
func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) {
return func(engine *Engine) {
engine.handleCommand = f
}
}
func WithPreambleReadWriter(rw preamble.PreambleReadWriter) func(engine *Engine) {
return func(engine *Engine) {
engine.preambleRW = rw
}
}
func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) {
return func(engine *Engine) {
engine.appendRW = rw
}
}
func NewAOFEngine(options ...func(engine *Engine)) *Engine {
engine := &Engine{ engine := &Engine{
options: opts, syncStrategy: "everysec",
directory: "",
mut: sync.Mutex{}, mut: sync.Mutex{},
logChan: make(chan []byte, 4096), logChan: make(chan []byte, 4096),
logCount: 0, logCount: 0,
startRewrite: func() {},
finishRewrite: func() {},
getState: func() map[string]interface{} { return nil },
setValue: func(key string, value interface{}) {},
handleCommand: func(command []byte) {},
}
for _, option := range options {
option(engine)
} }
// Setup Preamble engine // Setup Preamble engine
engine.preambleStore = preamble.NewPreambleStore( engine.preambleStore = preamble.NewPreambleStore(
preamble.WithDirectory(engine.options.Config.DataDir), preamble.WithDirectory(engine.directory),
preamble.WithReadWriter(preambleRW), preamble.WithReadWriter(engine.preambleRW),
preamble.WithGetStateFunc(opts.GetState), preamble.WithGetStateFunc(engine.getState),
preamble.WithSetValueFunc(func(key string, value interface{}) { preamble.WithSetValueFunc(engine.setValue),
if _, err := engine.options.CreateKeyAndLock(context.Background(), key); err != nil {
log.Println(err)
}
engine.options.SetValue(context.Background(), key, value)
engine.options.KeyUnlock(key)
}),
) )
// Setup AOF log store engine // Setup AOF log store engine
engine.appendStore = logstore.NewAppendStore( engine.appendStore = logstore.NewAppendStore(
logstore.WithDirectory(engine.options.Config.DataDir), logstore.WithDirectory(engine.directory),
logstore.WithStrategy(engine.options.Config.AOFSyncStrategy), logstore.WithStrategy(engine.syncStrategy),
logstore.WithReadWriter(appendRW), logstore.WithReadWriter(engine.appendRW),
logstore.WithHandleCommandFunc(func(command []byte) { logstore.WithHandleCommandFunc(engine.handleCommand),
_, err := engine.options.HandleCommand(context.Background(), command, nil, true)
if err != nil {
log.Println(err)
}
}),
) )
// 3. Start the goroutine to pick up queued commands in order to write them to the file. // 3. Start the goroutine to pick up queued commands in order to write them to the file.
@@ -79,7 +128,7 @@ func NewAOFEngine(opts Opts, appendRW logstore.AppendReadWriter, preambleRW prea
} }
}() }()
return engine, nil return engine
} }
func (engine *Engine) QueueCommand(command []byte) { func (engine *Engine) QueueCommand(command []byte) {
@@ -90,8 +139,8 @@ func (engine *Engine) RewriteLog() error {
engine.mut.Lock() engine.mut.Lock()
defer engine.mut.Unlock() defer engine.mut.Unlock()
engine.options.StartRewriteAOF() engine.startRewrite()
defer engine.options.FinishRewriteAOF() defer engine.finishRewrite()
// Create AOF preamble // Create AOF preamble
if err := engine.preambleStore.CreatePreamble(); err != nil { if err := engine.preambleStore.CreatePreamble(); err != nil {

View File

@@ -95,21 +95,27 @@ func NewServer(opts Opts) *Server {
SetValue: server.SetValue, SetValue: server.SetValue,
}) })
// Set up standalone AOF engine // Set up standalone AOF engine
server.AOFEngine = aof.NewAOFEngine(
engine, err := aof.NewAOFEngine(aof.Opts{ aof.WithDirectory(opts.Config.DataDir),
Config: opts.Config, aof.WithStrategy(opts.Config.AOFSyncStrategy),
GetState: server.GetState, aof.WithStartRewriteFunc(server.StartRewriteAOF),
StartRewriteAOF: server.StartRewriteAOF, aof.WithFinishRewriteFunc(server.FinishRewriteAOF),
FinishRewriteAOF: server.FinishRewriteAOF, aof.WithGetStateFunc(server.GetState),
CreateKeyAndLock: server.CreateKeyAndLock, aof.WithSetValueFunc(func(key string, value interface{}) {
KeyUnlock: server.KeyUnlock, if _, err := server.CreateKeyAndLock(context.Background(), key); err != nil {
SetValue: server.SetValue, log.Println(err)
HandleCommand: server.handleCommand, return
}, nil, nil) }
server.SetValue(context.Background(), key, value)
server.KeyUnlock(key)
}),
aof.WithHandleCommandFunc(func(command []byte) {
_, err := server.handleCommand(context.Background(), command, nil, true)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
server.AOFEngine = engine }),
)
} }
return server return server
} }