Updated snapshot engine to user options pattern. Updated SetValue and SetExpiry functions in snapshot and aof engine to SetKeyData function that handles setting both value and expiry of a key upon restore.

This commit is contained in:
Kelvin Mwinuka
2024-03-25 11:29:49 +08:00
parent 88a3d8b6c6
commit aea3e0675c
4 changed files with 165 additions and 152 deletions

View File

@@ -21,11 +21,10 @@ import (
"github.com/echovault/echovault/src/utils" "github.com/echovault/echovault/src/utils"
"log" "log"
"sync" "sync"
"time"
) )
// This package handles AOF logging in standalone mode only. // This package handles AOF logging in standalone mode only.
// Logging in clusters is handled in the raft layer. // Logging in replication clusters is handled in the raft layer.
type Engine struct { type Engine struct {
syncStrategy string syncStrategy string
@@ -39,12 +38,11 @@ type Engine struct {
preambleStore *preamble.PreambleStore preambleStore *preamble.PreambleStore
appendStore *logstore.AppendStore appendStore *logstore.AppendStore
startRewrite func() startRewriteFunc func()
finishRewrite func() finishRewriteFunc func()
getState func() map[string]utils.KeyData getStateFunc func() map[string]utils.KeyData
setValue func(key string, value interface{}) error setKeyDataFunc func(key string, data utils.KeyData)
setExpiry func(key string, expireAt time.Time) error handleCommand func(command []byte)
handleCommand func(command []byte)
} }
func WithStrategy(strategy string) func(engine *Engine) { func WithStrategy(strategy string) func(engine *Engine) {
@@ -61,31 +59,25 @@ func WithDirectory(directory string) func(engine *Engine) {
func WithStartRewriteFunc(f func()) func(engine *Engine) { func WithStartRewriteFunc(f func()) func(engine *Engine) {
return func(engine *Engine) { return func(engine *Engine) {
engine.startRewrite = f engine.startRewriteFunc = f
} }
} }
func WithFinishRewriteFunc(f func()) func(engine *Engine) { func WithFinishRewriteFunc(f func()) func(engine *Engine) {
return func(engine *Engine) { return func(engine *Engine) {
engine.finishRewrite = f engine.finishRewriteFunc = f
} }
} }
func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) { func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) {
return func(engine *Engine) { return func(engine *Engine) {
engine.getState = f engine.getStateFunc = f
} }
} }
func WithSetValueFunc(f func(key string, value interface{}) error) func(engine *Engine) { func WithSetKeyDataFunc(f func(key string, data utils.KeyData)) func(engine *Engine) {
return func(engine *Engine) { return func(engine *Engine) {
engine.setValue = f engine.setKeyDataFunc = f
}
}
func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(engine *Engine) {
return func(engine *Engine) {
engine.setExpiry = f
} }
} }
@@ -109,33 +101,24 @@ func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) {
func NewAOFEngine(options ...func(engine *Engine)) *Engine { func NewAOFEngine(options ...func(engine *Engine)) *Engine {
engine := &Engine{ engine := &Engine{
syncStrategy: "everysec", syncStrategy: "everysec",
directory: "", directory: "",
mut: sync.Mutex{}, mut: sync.Mutex{},
logChan: make(chan []byte, 4096), logChan: make(chan []byte, 4096),
logCount: 0, logCount: 0,
startRewrite: func() {}, startRewriteFunc: func() {},
finishRewrite: func() {}, finishRewriteFunc: func() {},
getState: func() map[string]utils.KeyData { return nil }, getStateFunc: func() map[string]utils.KeyData { return nil },
setValue: func(key string, value interface{}) error { setKeyDataFunc: func(key string, data utils.KeyData) {},
// No-Op by default handleCommand: func(command []byte) {},
return nil
},
setExpiry: func(key string, expireAt time.Time) error { return nil },
handleCommand: func(command []byte) {},
}
for _, option := range options {
option(engine)
} }
// Setup Preamble engine // Setup Preamble engine
engine.preambleStore = preamble.NewPreambleStore( engine.preambleStore = preamble.NewPreambleStore(
preamble.WithDirectory(engine.directory), preamble.WithDirectory(engine.directory),
preamble.WithReadWriter(engine.preambleRW), preamble.WithReadWriter(engine.preambleRW),
preamble.WithGetStateFunc(engine.getState), preamble.WithGetStateFunc(engine.getStateFunc),
preamble.WithSetValueFunc(engine.setValue), preamble.WithSetKeyDataFunc(engine.setKeyDataFunc),
preamble.WithSetExpiryFunc(engine.setExpiry),
) )
// Setup AOF log store engine // Setup AOF log store engine
@@ -146,6 +129,10 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {
logstore.WithHandleCommandFunc(engine.handleCommand), logstore.WithHandleCommandFunc(engine.handleCommand),
) )
for _, option := range options {
option(engine)
}
// 3. Start the goroutine to pick up queued commands in order to write them to the file. // 3. Start the goroutine to pick up queued commands in order to write them to the file.
// LogCommand will get the open file handler from the struct top perform the AOF operation. // LogCommand will get the open file handler from the struct top perform the AOF operation.
go func() { go func() {
@@ -168,8 +155,8 @@ func (engine *Engine) RewriteLog() error {
engine.mut.Lock() engine.mut.Lock()
defer engine.mut.Unlock() defer engine.mut.Unlock()
engine.startRewrite() engine.startRewriteFunc()
defer engine.finishRewrite() defer engine.finishRewriteFunc()
// Create AOF preamble // Create AOF preamble
if err := engine.preambleStore.CreatePreamble(); err != nil { if err := engine.preambleStore.CreatePreamble(); err != nil {

View File

@@ -34,12 +34,11 @@ type PreambleReadWriter interface {
} }
type PreambleStore struct { type PreambleStore struct {
rw PreambleReadWriter rw PreambleReadWriter
mut sync.Mutex mut sync.Mutex
directory string directory string
getState func() map[string]utils.KeyData getStateFunc func() map[string]utils.KeyData
setValue func(key string, value interface{}) error setKeyDataFunc func(key string, data utils.KeyData)
setExpiry func(key string, expireAt time.Time) error
} }
func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
@@ -50,19 +49,13 @@ func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
func WithGetStateFunc(f func() map[string]utils.KeyData) func(store *PreambleStore) { func WithGetStateFunc(f func() map[string]utils.KeyData) func(store *PreambleStore) {
return func(store *PreambleStore) { return func(store *PreambleStore) {
store.getState = f store.getStateFunc = f
} }
} }
func WithSetValueFunc(f func(key string, value interface{}) error) func(store *PreambleStore) { func WithSetKeyDataFunc(f func(key string, data utils.KeyData)) func(store *PreambleStore) {
return func(store *PreambleStore) { return func(store *PreambleStore) {
store.setValue = f store.setKeyDataFunc = f
}
}
func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.setExpiry = f
} }
} }
@@ -77,15 +70,11 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
rw: nil, rw: nil,
mut: sync.Mutex{}, mut: sync.Mutex{},
directory: "", directory: "",
getState: func() map[string]utils.KeyData { getStateFunc: func() map[string]utils.KeyData {
// No-Op by default // No-Op by default
return nil return nil
}, },
setValue: func(key string, value interface{}) error { setKeyDataFunc: func(key string, data utils.KeyData) {},
// No-Op by default
return nil
},
setExpiry: func(key string, expireAt time.Time) error { return nil },
} }
for _, option := range options { for _, option := range options {
@@ -113,7 +102,7 @@ func (store *PreambleStore) CreatePreamble() error {
store.mut.Unlock() store.mut.Unlock()
// Get current state. // Get current state.
state := store.filterExpiredKeys(store.getState()) state := store.filterExpiredKeys(store.getStateFunc())
o, err := json.Marshal(state) o, err := json.Marshal(state)
if err != nil { if err != nil {
return err return err
@@ -160,11 +149,8 @@ func (store *PreambleStore) Restore() error {
return err return err
} }
for key, value := range store.filterExpiredKeys(state) { for key, data := range store.filterExpiredKeys(state) {
if err = store.setValue(key, value.Value); err != nil { store.setKeyDataFunc(key, data)
return fmt.Errorf("preamble store -> restore: %+v", err)
}
store.setExpiry(key, value.ExpireAt)
} }
return nil return nil

View File

@@ -121,34 +121,27 @@ func NewServer(opts Opts) *Server {
}) })
} else { } else {
// Set up standalone snapshot engine // Set up standalone snapshot engine
server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{ server.SnapshotEngine = snapshot.NewSnapshotEngine(
Config: opts.Config, snapshot.WithDirectory(opts.Config.DataDir),
StartSnapshot: server.StartSnapshot, snapshot.WithThreshold(opts.Config.SnapShotThreshold),
FinishSnapshot: server.FinishSnapshot, snapshot.WithInterval(opts.Config.SnapshotInterval),
GetState: server.GetState, snapshot.WithGetStateFunc(server.GetState),
SetLatestSnapshotMilliseconds: server.SetLatestSnapshot, snapshot.WithStartSnapshotFunc(server.StartSnapshot),
GetLatestSnapshotMilliseconds: server.GetLatestSnapshot, snapshot.WithFinishSnapshotFunc(server.FinishSnapshot),
SetValue: func(key string, value interface{}) error { snapshot.WithSetLatestSnapshotTimeFunc(server.SetLatestSnapshot),
snapshot.WithGetLatestSnapshotTimeFunc(server.GetLatestSnapshot),
snapshot.WithSetKeyDataFunc(func(key string, data utils.KeyData) {
ctx := context.Background() ctx := context.Background()
if _, err := server.CreateKeyAndLock(ctx, key); err != nil { if _, err := server.CreateKeyAndLock(ctx, key); err != nil {
return err log.Println(err)
} }
if err := server.SetValue(ctx, key, value); err != nil { if err := server.SetValue(ctx, key, data.Value); err != nil {
return err log.Println(err)
} }
server.SetExpiry(ctx, key, data.ExpireAt, false)
server.KeyUnlock(ctx, key) server.KeyUnlock(ctx, key)
return nil }),
}, )
SetExpiry: func(key string, expireAt time.Time) error {
ctx := context.Background()
if _, err := server.KeyLock(ctx, key); err != nil {
return err
}
server.SetExpiry(ctx, key, expireAt, false)
server.KeyUnlock(ctx, key)
return nil
},
})
// Set up standalone AOF engine // Set up standalone AOF engine
server.AOFEngine = aof.NewAOFEngine( server.AOFEngine = aof.NewAOFEngine(
aof.WithDirectory(opts.Config.DataDir), aof.WithDirectory(opts.Config.DataDir),
@@ -156,25 +149,16 @@ func NewServer(opts Opts) *Server {
aof.WithStartRewriteFunc(server.StartRewriteAOF), aof.WithStartRewriteFunc(server.StartRewriteAOF),
aof.WithFinishRewriteFunc(server.FinishRewriteAOF), aof.WithFinishRewriteFunc(server.FinishRewriteAOF),
aof.WithGetStateFunc(server.GetState), aof.WithGetStateFunc(server.GetState),
aof.WithSetValueFunc(func(key string, value interface{}) error { aof.WithSetKeyDataFunc(func(key string, value utils.KeyData) {
ctx := context.Background() ctx := context.Background()
if _, err := server.CreateKeyAndLock(ctx, key); err != nil { if _, err := server.CreateKeyAndLock(ctx, key); err != nil {
return err log.Println(err)
} }
if err := server.SetValue(ctx, key, value); err != nil { if err := server.SetValue(ctx, key, value.Value); err != nil {
return err log.Println(err)
} }
server.SetExpiry(ctx, key, value.ExpireAt, false)
server.KeyUnlock(ctx, key) server.KeyUnlock(ctx, key)
return nil
}),
aof.WithSetExpiryFunc(func(key string, expireAt time.Time) error {
ctx := context.Background()
if _, err := server.KeyLock(ctx, key); err != nil {
return err
}
server.SetExpiry(ctx, key, expireAt, false)
server.KeyUnlock(ctx, key)
return nil
}), }),
aof.WithHandleCommandFunc(func(command []byte) { aof.WithHandleCommandFunc(func(command []byte) {
_, err := server.handleCommand(context.Background(), command, nil, true) _, err := server.handleCommand(context.Background(), command, nil, true)
@@ -378,13 +362,11 @@ func (server *Server) Start(ctx context.Context) {
// Restore from snapshot if snapshot restore is enabled and AOF restore is disabled // Restore from snapshot if snapshot restore is enabled and AOF restore is disabled
if conf.RestoreSnapshot && !conf.RestoreAOF { if conf.RestoreSnapshot && !conf.RestoreAOF {
err := server.SnapshotEngine.Restore(ctx) err := server.SnapshotEngine.Restore()
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
} }
server.SnapshotEngine.Start(ctx)
} }
server.StartTCP(ctx) server.StartTCP(ctx)
@@ -472,6 +454,4 @@ func (server *Server) InitialiseCaches() {
mutex: sync.Mutex{}, mutex: sync.Mutex{},
cache: eviction.NewCacheLRU(), cache: eviction.NewCacheLRU(),
} }
// TODO: If eviction policy is volatile-ttl, start goroutine that continuously reads the mem stats
// TODO: before triggering purge once max-memory is reached
} }

View File

@@ -15,7 +15,6 @@
package snapshot package snapshot
import ( import (
"context"
"crypto/md5" "crypto/md5"
"encoding/json" "encoding/json"
"errors" "errors"
@@ -37,48 +36,114 @@ type Manifest struct {
LatestSnapshotHash [16]byte LatestSnapshotHash [16]byte
} }
type Opts struct {
Config utils.Config
StartSnapshot func()
FinishSnapshot func()
GetState func() map[string]utils.KeyData
SetLatestSnapshotMilliseconds func(msec int64)
GetLatestSnapshotMilliseconds func() int64
SetValue func(key string, value interface{}) error
SetExpiry func(key string, expireAt time.Time) error
}
type Engine struct { type Engine struct {
options Opts changeCount uint64
changeCount uint64 directory string
snapshotInterval time.Duration
snapshotThreshold uint64
startSnapshotFunc func()
finishSnapshotFunc func()
getStateFunc func() map[string]utils.KeyData
setLatestSnapshotTimeFunc func(msec int64)
getLatestSnapshotTimeFunc func() int64
setKeyDataFunc func(key string, data utils.KeyData)
} }
func NewSnapshotEngine(opts Opts) *Engine { func WithDirectory(directory string) func(engine *Engine) {
return &Engine{ return func(engine *Engine) {
options: opts, engine.directory = directory
} }
} }
func (engine *Engine) Start(ctx context.Context) { func WithInterval(interval time.Duration) func(engine *Engine) {
if engine.options.Config.SnapshotInterval != 0 { return func(engine *Engine) {
engine.snapshotInterval = interval
}
}
func WithThreshold(threshold uint64) func(engine *Engine) {
return func(engine *Engine) {
engine.snapshotThreshold = threshold
}
}
func WithStartSnapshotFunc(f func()) func(engine *Engine) {
return func(engine *Engine) {
engine.startSnapshotFunc = f
}
}
func WithFinishSnapshotFunc(f func()) func(engine *Engine) {
return func(engine *Engine) {
engine.finishSnapshotFunc = f
}
}
func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) {
return func(engine *Engine) {
engine.getStateFunc = f
}
}
func WithSetLatestSnapshotTimeFunc(f func(mset int64)) func(engine *Engine) {
return func(engine *Engine) {
engine.setLatestSnapshotTimeFunc = f
}
}
func WithGetLatestSnapshotTimeFunc(f func() int64) func(engine *Engine) {
return func(engine *Engine) {
engine.getLatestSnapshotTimeFunc = f
}
}
func WithSetKeyDataFunc(f func(key string, data utils.KeyData)) func(engine *Engine) {
return func(engine *Engine) {
engine.setKeyDataFunc = f
}
}
func NewSnapshotEngine(options ...func(engine *Engine)) *Engine {
engine := &Engine{
changeCount: 0,
directory: "",
snapshotInterval: 5 * time.Minute,
snapshotThreshold: 1000,
startSnapshotFunc: func() {},
finishSnapshotFunc: func() {},
getStateFunc: func() map[string]utils.KeyData {
return map[string]utils.KeyData{}
},
setLatestSnapshotTimeFunc: func(msec int64) {},
getLatestSnapshotTimeFunc: func() int64 {
return 0
},
setKeyDataFunc: func(key string, data utils.KeyData) {},
}
for _, option := range options {
option(engine)
}
if engine.snapshotInterval != 0 {
go func() { go func() {
for { for {
<-time.After(engine.options.Config.SnapshotInterval) <-time.After(engine.snapshotInterval)
if engine.changeCount == engine.options.Config.SnapShotThreshold { if engine.changeCount == engine.snapshotThreshold {
if err := engine.TakeSnapshot(); err != nil { if err := engine.TakeSnapshot(); err != nil {
log.Println(err) log.Println(err)
} }
} }
} }
}() }()
// Reset change count at startup
engine.resetChangeCount()
} }
return engine
} }
func (engine *Engine) TakeSnapshot() error { func (engine *Engine) TakeSnapshot() error {
engine.options.StartSnapshot() engine.startSnapshotFunc()
defer engine.options.FinishSnapshot() defer engine.finishSnapshotFunc()
// Extract current time // Extract current time
now := time.Now() now := time.Now()
@@ -95,7 +160,7 @@ func (engine *Engine) TakeSnapshot() error {
var firstSnapshot bool // Tracks whether the snapshot being attempted is the first one var firstSnapshot bool // Tracks whether the snapshot being attempted is the first one
dirname := path.Join(engine.options.Config.DataDir, "snapshots") dirname := path.Join(engine.directory, "snapshots")
if err := os.MkdirAll(dirname, os.ModePerm); err != nil { if err := os.MkdirAll(dirname, os.ModePerm); err != nil {
log.Println(err) log.Println(err)
return err return err
@@ -140,8 +205,8 @@ func (engine *Engine) TakeSnapshot() error {
// Get current state // Get current state
snapshotObject := utils.SnapshotObject{ snapshotObject := utils.SnapshotObject{
State: utils.FilterExpiredKeys(engine.options.GetState()), State: utils.FilterExpiredKeys(engine.getStateFunc()),
LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(), LatestSnapshotMilliseconds: engine.getLatestSnapshotTimeFunc(),
} }
out, err := json.Marshal(snapshotObject) out, err := json.Marshal(snapshotObject)
if err != nil { if err != nil {
@@ -193,7 +258,7 @@ func (engine *Engine) TakeSnapshot() error {
} }
// Create snapshot directory // Create snapshot directory
dirname = path.Join(engine.options.Config.DataDir, "snapshots", fmt.Sprintf("%d", msec)) dirname = path.Join(engine.directory, "snapshots", fmt.Sprintf("%d", msec))
if err := os.MkdirAll(dirname, os.ModePerm); err != nil { if err := os.MkdirAll(dirname, os.ModePerm); err != nil {
return err return err
} }
@@ -219,7 +284,7 @@ func (engine *Engine) TakeSnapshot() error {
} }
// Set the latest snapshot in unix milliseconds // Set the latest snapshot in unix milliseconds
engine.options.SetLatestSnapshotMilliseconds(msec) engine.setLatestSnapshotTimeFunc(msec)
// Reset the change count // Reset the change count
engine.resetChangeCount() engine.resetChangeCount()
@@ -227,8 +292,8 @@ func (engine *Engine) TakeSnapshot() error {
return nil return nil
} }
func (engine *Engine) Restore(ctx context.Context) error { func (engine *Engine) Restore() error {
mf, err := os.Open(path.Join(engine.options.Config.DataDir, "snapshots", "manifest.bin")) mf, err := os.Open(path.Join(engine.directory, "snapshots", "manifest.bin"))
if err != nil && errors.Is(err, fs.ErrNotExist) { if err != nil && errors.Is(err, fs.ErrNotExist) {
return errors.New("no snapshot manifest, skipping snapshot restore") return errors.New("no snapshot manifest, skipping snapshot restore")
} }
@@ -252,7 +317,7 @@ func (engine *Engine) Restore(ctx context.Context) error {
} }
sf, err := os.Open(path.Join( sf, err := os.Open(path.Join(
engine.options.Config.DataDir, engine.directory,
"snapshots", "snapshots",
fmt.Sprintf("%d", manifest.LatestSnapshotMilliseconds), fmt.Sprintf("%d", manifest.LatestSnapshotMilliseconds),
"state.bin")) "state.bin"))
@@ -274,15 +339,10 @@ func (engine *Engine) Restore(ctx context.Context) error {
return err return err
} }
engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds) engine.setLatestSnapshotTimeFunc(snapshotObject.LatestSnapshotMilliseconds)
for key, value := range utils.FilterExpiredKeys(snapshotObject.State) { for key, data := range utils.FilterExpiredKeys(snapshotObject.State) {
if err = engine.options.SetValue(key, value.Value); err != nil { engine.setKeyDataFunc(key, data)
return fmt.Errorf("snapshot engine -> restore value: %+v", err)
}
if err = engine.options.SetExpiry(key, value.ExpireAt); err != nil {
return fmt.Errorf("snapshot engine -> restore expiry: %+v", err)
}
} }
log.Println("successfully restored latest snapshot") log.Println("successfully restored latest snapshot")