From aea3e0675c3b68c5377e93168615ae4e64b356f2 Mon Sep 17 00:00:00 2001 From: Kelvin Mwinuka Date: Mon, 25 Mar 2024 11:29:49 +0800 Subject: [PATCH] Updated snapshot engine to user options pattern. Updated SetValue and SetExpiry functions in snapshot and aof engine to SetKeyData function that handles setting both value and expiry of a key upon restore. --- src/aof/engine.go | 71 ++++++++----------- src/aof/preamble/store.go | 40 ++++------- src/server/server.go | 64 ++++++----------- src/snapshot/snapshot.go | 142 +++++++++++++++++++++++++++----------- 4 files changed, 165 insertions(+), 152 deletions(-) diff --git a/src/aof/engine.go b/src/aof/engine.go index 31b1df7..11fad71 100644 --- a/src/aof/engine.go +++ b/src/aof/engine.go @@ -21,11 +21,10 @@ import ( "github.com/echovault/echovault/src/utils" "log" "sync" - "time" ) // This package handles AOF logging in standalone mode only. -// Logging in clusters is handled in the raft layer. +// Logging in replication clusters is handled in the raft layer. type Engine struct { syncStrategy string @@ -39,12 +38,11 @@ type Engine struct { preambleStore *preamble.PreambleStore appendStore *logstore.AppendStore - startRewrite func() - finishRewrite func() - getState func() map[string]utils.KeyData - setValue func(key string, value interface{}) error - setExpiry func(key string, expireAt time.Time) error - handleCommand func(command []byte) + startRewriteFunc func() + finishRewriteFunc func() + getStateFunc func() map[string]utils.KeyData + setKeyDataFunc func(key string, data utils.KeyData) + handleCommand func(command []byte) } func WithStrategy(strategy string) func(engine *Engine) { @@ -61,31 +59,25 @@ func WithDirectory(directory string) func(engine *Engine) { func WithStartRewriteFunc(f func()) func(engine *Engine) { return func(engine *Engine) { - engine.startRewrite = f + engine.startRewriteFunc = f } } func WithFinishRewriteFunc(f func()) func(engine *Engine) { return func(engine *Engine) { - engine.finishRewrite = f + engine.finishRewriteFunc = f } } func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) { return func(engine *Engine) { - engine.getState = f + engine.getStateFunc = f } } -func WithSetValueFunc(f func(key string, value interface{}) error) func(engine *Engine) { +func WithSetKeyDataFunc(f func(key string, data utils.KeyData)) func(engine *Engine) { return func(engine *Engine) { - engine.setValue = f - } -} - -func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(engine *Engine) { - return func(engine *Engine) { - engine.setExpiry = f + engine.setKeyDataFunc = f } } @@ -109,33 +101,24 @@ func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) { func NewAOFEngine(options ...func(engine *Engine)) *Engine { engine := &Engine{ - syncStrategy: "everysec", - directory: "", - mut: sync.Mutex{}, - logChan: make(chan []byte, 4096), - logCount: 0, - startRewrite: func() {}, - finishRewrite: func() {}, - getState: func() map[string]utils.KeyData { return nil }, - setValue: func(key string, value interface{}) error { - // No-Op by default - return nil - }, - setExpiry: func(key string, expireAt time.Time) error { return nil }, - handleCommand: func(command []byte) {}, - } - - for _, option := range options { - option(engine) + syncStrategy: "everysec", + directory: "", + mut: sync.Mutex{}, + logChan: make(chan []byte, 4096), + logCount: 0, + startRewriteFunc: func() {}, + finishRewriteFunc: func() {}, + getStateFunc: func() map[string]utils.KeyData { return nil }, + setKeyDataFunc: func(key string, data utils.KeyData) {}, + handleCommand: func(command []byte) {}, } // Setup Preamble engine engine.preambleStore = preamble.NewPreambleStore( preamble.WithDirectory(engine.directory), preamble.WithReadWriter(engine.preambleRW), - preamble.WithGetStateFunc(engine.getState), - preamble.WithSetValueFunc(engine.setValue), - preamble.WithSetExpiryFunc(engine.setExpiry), + preamble.WithGetStateFunc(engine.getStateFunc), + preamble.WithSetKeyDataFunc(engine.setKeyDataFunc), ) // Setup AOF log store engine @@ -146,6 +129,10 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { logstore.WithHandleCommandFunc(engine.handleCommand), ) + for _, option := range options { + option(engine) + } + // 3. Start the goroutine to pick up queued commands in order to write them to the file. // LogCommand will get the open file handler from the struct top perform the AOF operation. go func() { @@ -168,8 +155,8 @@ func (engine *Engine) RewriteLog() error { engine.mut.Lock() defer engine.mut.Unlock() - engine.startRewrite() - defer engine.finishRewrite() + engine.startRewriteFunc() + defer engine.finishRewriteFunc() // Create AOF preamble if err := engine.preambleStore.CreatePreamble(); err != nil { diff --git a/src/aof/preamble/store.go b/src/aof/preamble/store.go index cd7f90c..e785955 100644 --- a/src/aof/preamble/store.go +++ b/src/aof/preamble/store.go @@ -34,12 +34,11 @@ type PreambleReadWriter interface { } type PreambleStore struct { - rw PreambleReadWriter - mut sync.Mutex - directory string - getState func() map[string]utils.KeyData - setValue func(key string, value interface{}) error - setExpiry func(key string, expireAt time.Time) error + rw PreambleReadWriter + mut sync.Mutex + directory string + getStateFunc func() map[string]utils.KeyData + setKeyDataFunc func(key string, data utils.KeyData) } func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { @@ -50,19 +49,13 @@ func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { func WithGetStateFunc(f func() map[string]utils.KeyData) func(store *PreambleStore) { return func(store *PreambleStore) { - store.getState = f + store.getStateFunc = f } } -func WithSetValueFunc(f func(key string, value interface{}) error) func(store *PreambleStore) { +func WithSetKeyDataFunc(f func(key string, data utils.KeyData)) func(store *PreambleStore) { return func(store *PreambleStore) { - store.setValue = f - } -} - -func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(store *PreambleStore) { - return func(store *PreambleStore) { - store.setExpiry = f + store.setKeyDataFunc = f } } @@ -77,15 +70,11 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { rw: nil, mut: sync.Mutex{}, directory: "", - getState: func() map[string]utils.KeyData { + getStateFunc: func() map[string]utils.KeyData { // No-Op by default return nil }, - setValue: func(key string, value interface{}) error { - // No-Op by default - return nil - }, - setExpiry: func(key string, expireAt time.Time) error { return nil }, + setKeyDataFunc: func(key string, data utils.KeyData) {}, } for _, option := range options { @@ -113,7 +102,7 @@ func (store *PreambleStore) CreatePreamble() error { store.mut.Unlock() // Get current state. - state := store.filterExpiredKeys(store.getState()) + state := store.filterExpiredKeys(store.getStateFunc()) o, err := json.Marshal(state) if err != nil { return err @@ -160,11 +149,8 @@ func (store *PreambleStore) Restore() error { return err } - for key, value := range store.filterExpiredKeys(state) { - if err = store.setValue(key, value.Value); err != nil { - return fmt.Errorf("preamble store -> restore: %+v", err) - } - store.setExpiry(key, value.ExpireAt) + for key, data := range store.filterExpiredKeys(state) { + store.setKeyDataFunc(key, data) } return nil diff --git a/src/server/server.go b/src/server/server.go index 019af5d..a4f0d87 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -121,34 +121,27 @@ func NewServer(opts Opts) *Server { }) } else { // Set up standalone snapshot engine - server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{ - Config: opts.Config, - StartSnapshot: server.StartSnapshot, - FinishSnapshot: server.FinishSnapshot, - GetState: server.GetState, - SetLatestSnapshotMilliseconds: server.SetLatestSnapshot, - GetLatestSnapshotMilliseconds: server.GetLatestSnapshot, - SetValue: func(key string, value interface{}) error { + server.SnapshotEngine = snapshot.NewSnapshotEngine( + snapshot.WithDirectory(opts.Config.DataDir), + snapshot.WithThreshold(opts.Config.SnapShotThreshold), + snapshot.WithInterval(opts.Config.SnapshotInterval), + snapshot.WithGetStateFunc(server.GetState), + snapshot.WithStartSnapshotFunc(server.StartSnapshot), + snapshot.WithFinishSnapshotFunc(server.FinishSnapshot), + snapshot.WithSetLatestSnapshotTimeFunc(server.SetLatestSnapshot), + snapshot.WithGetLatestSnapshotTimeFunc(server.GetLatestSnapshot), + snapshot.WithSetKeyDataFunc(func(key string, data utils.KeyData) { ctx := context.Background() if _, err := server.CreateKeyAndLock(ctx, key); err != nil { - return err + log.Println(err) } - if err := server.SetValue(ctx, key, value); err != nil { - return err + if err := server.SetValue(ctx, key, data.Value); err != nil { + log.Println(err) } + server.SetExpiry(ctx, key, data.ExpireAt, false) server.KeyUnlock(ctx, key) - return nil - }, - SetExpiry: func(key string, expireAt time.Time) error { - ctx := context.Background() - if _, err := server.KeyLock(ctx, key); err != nil { - return err - } - server.SetExpiry(ctx, key, expireAt, false) - server.KeyUnlock(ctx, key) - return nil - }, - }) + }), + ) // Set up standalone AOF engine server.AOFEngine = aof.NewAOFEngine( aof.WithDirectory(opts.Config.DataDir), @@ -156,25 +149,16 @@ func NewServer(opts Opts) *Server { aof.WithStartRewriteFunc(server.StartRewriteAOF), aof.WithFinishRewriteFunc(server.FinishRewriteAOF), aof.WithGetStateFunc(server.GetState), - aof.WithSetValueFunc(func(key string, value interface{}) error { + aof.WithSetKeyDataFunc(func(key string, value utils.KeyData) { ctx := context.Background() if _, err := server.CreateKeyAndLock(ctx, key); err != nil { - return err + log.Println(err) } - if err := server.SetValue(ctx, key, value); err != nil { - return err + if err := server.SetValue(ctx, key, value.Value); err != nil { + log.Println(err) } + server.SetExpiry(ctx, key, value.ExpireAt, false) server.KeyUnlock(ctx, key) - return nil - }), - aof.WithSetExpiryFunc(func(key string, expireAt time.Time) error { - ctx := context.Background() - if _, err := server.KeyLock(ctx, key); err != nil { - return err - } - server.SetExpiry(ctx, key, expireAt, false) - server.KeyUnlock(ctx, key) - return nil }), aof.WithHandleCommandFunc(func(command []byte) { _, err := server.handleCommand(context.Background(), command, nil, true) @@ -378,13 +362,11 @@ func (server *Server) Start(ctx context.Context) { // Restore from snapshot if snapshot restore is enabled and AOF restore is disabled if conf.RestoreSnapshot && !conf.RestoreAOF { - err := server.SnapshotEngine.Restore(ctx) + err := server.SnapshotEngine.Restore() if err != nil { log.Println(err) } } - server.SnapshotEngine.Start(ctx) - } server.StartTCP(ctx) @@ -472,6 +454,4 @@ func (server *Server) InitialiseCaches() { mutex: sync.Mutex{}, cache: eviction.NewCacheLRU(), } - // TODO: If eviction policy is volatile-ttl, start goroutine that continuously reads the mem stats - // TODO: before triggering purge once max-memory is reached } diff --git a/src/snapshot/snapshot.go b/src/snapshot/snapshot.go index e8fe7d3..28038b9 100644 --- a/src/snapshot/snapshot.go +++ b/src/snapshot/snapshot.go @@ -15,7 +15,6 @@ package snapshot import ( - "context" "crypto/md5" "encoding/json" "errors" @@ -37,48 +36,114 @@ type Manifest struct { LatestSnapshotHash [16]byte } -type Opts struct { - Config utils.Config - StartSnapshot func() - FinishSnapshot func() - GetState func() map[string]utils.KeyData - SetLatestSnapshotMilliseconds func(msec int64) - GetLatestSnapshotMilliseconds func() int64 - SetValue func(key string, value interface{}) error - SetExpiry func(key string, expireAt time.Time) error -} - type Engine struct { - options Opts - changeCount uint64 + changeCount uint64 + directory string + snapshotInterval time.Duration + snapshotThreshold uint64 + startSnapshotFunc func() + finishSnapshotFunc func() + getStateFunc func() map[string]utils.KeyData + setLatestSnapshotTimeFunc func(msec int64) + getLatestSnapshotTimeFunc func() int64 + setKeyDataFunc func(key string, data utils.KeyData) } -func NewSnapshotEngine(opts Opts) *Engine { - return &Engine{ - options: opts, +func WithDirectory(directory string) func(engine *Engine) { + return func(engine *Engine) { + engine.directory = directory } } -func (engine *Engine) Start(ctx context.Context) { - if engine.options.Config.SnapshotInterval != 0 { +func WithInterval(interval time.Duration) func(engine *Engine) { + return func(engine *Engine) { + engine.snapshotInterval = interval + } +} + +func WithThreshold(threshold uint64) func(engine *Engine) { + return func(engine *Engine) { + engine.snapshotThreshold = threshold + } +} + +func WithStartSnapshotFunc(f func()) func(engine *Engine) { + return func(engine *Engine) { + engine.startSnapshotFunc = f + } +} + +func WithFinishSnapshotFunc(f func()) func(engine *Engine) { + return func(engine *Engine) { + engine.finishSnapshotFunc = f + } +} + +func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) { + return func(engine *Engine) { + engine.getStateFunc = f + } +} + +func WithSetLatestSnapshotTimeFunc(f func(mset int64)) func(engine *Engine) { + return func(engine *Engine) { + engine.setLatestSnapshotTimeFunc = f + } +} + +func WithGetLatestSnapshotTimeFunc(f func() int64) func(engine *Engine) { + return func(engine *Engine) { + engine.getLatestSnapshotTimeFunc = f + } +} + +func WithSetKeyDataFunc(f func(key string, data utils.KeyData)) func(engine *Engine) { + return func(engine *Engine) { + engine.setKeyDataFunc = f + } +} + +func NewSnapshotEngine(options ...func(engine *Engine)) *Engine { + engine := &Engine{ + changeCount: 0, + directory: "", + snapshotInterval: 5 * time.Minute, + snapshotThreshold: 1000, + startSnapshotFunc: func() {}, + finishSnapshotFunc: func() {}, + getStateFunc: func() map[string]utils.KeyData { + return map[string]utils.KeyData{} + }, + setLatestSnapshotTimeFunc: func(msec int64) {}, + getLatestSnapshotTimeFunc: func() int64 { + return 0 + }, + setKeyDataFunc: func(key string, data utils.KeyData) {}, + } + + for _, option := range options { + option(engine) + } + + if engine.snapshotInterval != 0 { go func() { for { - <-time.After(engine.options.Config.SnapshotInterval) - if engine.changeCount == engine.options.Config.SnapShotThreshold { + <-time.After(engine.snapshotInterval) + if engine.changeCount == engine.snapshotThreshold { if err := engine.TakeSnapshot(); err != nil { log.Println(err) } } } }() - // Reset change count at startup - engine.resetChangeCount() } + + return engine } func (engine *Engine) TakeSnapshot() error { - engine.options.StartSnapshot() - defer engine.options.FinishSnapshot() + engine.startSnapshotFunc() + defer engine.finishSnapshotFunc() // Extract current time now := time.Now() @@ -95,7 +160,7 @@ func (engine *Engine) TakeSnapshot() error { var firstSnapshot bool // Tracks whether the snapshot being attempted is the first one - dirname := path.Join(engine.options.Config.DataDir, "snapshots") + dirname := path.Join(engine.directory, "snapshots") if err := os.MkdirAll(dirname, os.ModePerm); err != nil { log.Println(err) return err @@ -140,8 +205,8 @@ func (engine *Engine) TakeSnapshot() error { // Get current state snapshotObject := utils.SnapshotObject{ - State: utils.FilterExpiredKeys(engine.options.GetState()), - LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(), + State: utils.FilterExpiredKeys(engine.getStateFunc()), + LatestSnapshotMilliseconds: engine.getLatestSnapshotTimeFunc(), } out, err := json.Marshal(snapshotObject) if err != nil { @@ -193,7 +258,7 @@ func (engine *Engine) TakeSnapshot() error { } // Create snapshot directory - dirname = path.Join(engine.options.Config.DataDir, "snapshots", fmt.Sprintf("%d", msec)) + dirname = path.Join(engine.directory, "snapshots", fmt.Sprintf("%d", msec)) if err := os.MkdirAll(dirname, os.ModePerm); err != nil { return err } @@ -219,7 +284,7 @@ func (engine *Engine) TakeSnapshot() error { } // Set the latest snapshot in unix milliseconds - engine.options.SetLatestSnapshotMilliseconds(msec) + engine.setLatestSnapshotTimeFunc(msec) // Reset the change count engine.resetChangeCount() @@ -227,8 +292,8 @@ func (engine *Engine) TakeSnapshot() error { return nil } -func (engine *Engine) Restore(ctx context.Context) error { - mf, err := os.Open(path.Join(engine.options.Config.DataDir, "snapshots", "manifest.bin")) +func (engine *Engine) Restore() error { + mf, err := os.Open(path.Join(engine.directory, "snapshots", "manifest.bin")) if err != nil && errors.Is(err, fs.ErrNotExist) { return errors.New("no snapshot manifest, skipping snapshot restore") } @@ -252,7 +317,7 @@ func (engine *Engine) Restore(ctx context.Context) error { } sf, err := os.Open(path.Join( - engine.options.Config.DataDir, + engine.directory, "snapshots", fmt.Sprintf("%d", manifest.LatestSnapshotMilliseconds), "state.bin")) @@ -274,15 +339,10 @@ func (engine *Engine) Restore(ctx context.Context) error { return err } - engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds) + engine.setLatestSnapshotTimeFunc(snapshotObject.LatestSnapshotMilliseconds) - for key, value := range utils.FilterExpiredKeys(snapshotObject.State) { - if err = engine.options.SetValue(key, value.Value); err != nil { - return fmt.Errorf("snapshot engine -> restore value: %+v", err) - } - if err = engine.options.SetExpiry(key, value.ExpireAt); err != nil { - return fmt.Errorf("snapshot engine -> restore expiry: %+v", err) - } + for key, data := range utils.FilterExpiredKeys(snapshotObject.State) { + engine.setKeyDataFunc(key, data) } log.Println("successfully restored latest snapshot")