From 52646d15642b07c2ca6c1e4f4c992ba92b34d5ba Mon Sep 17 00:00:00 2001 From: Kelvin Mwinuka Date: Tue, 12 Mar 2024 21:35:39 +0800 Subject: [PATCH] Take expiry into account when creating snashot and AOF preamble. If the key is already expired when snapshot is taken, it will not be persisted. If the key is expired when loading a snapshot/preamble, it will not be restored. --- docker-compose.yaml | 16 ++++++++-------- src/aof/engine.go | 17 ++++++++++++++--- src/aof/preamble/store.go | 39 ++++++++++++++++++++++++++++++++------- src/raft/fms_snapshot.go | 10 +++++----- src/raft/fsm.go | 7 ++++--- src/server/keyspace.go | 5 +++-- src/server/server.go | 18 ++++++++++++++++++ src/snapshot/snapshot.go | 14 +++++++++----- src/utils/types.go | 4 ++-- src/utils/utils.go | 19 +++++++++++++++++++ 10 files changed, 114 insertions(+), 35 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 6251cb1..caa25b4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -27,11 +27,11 @@ services: - FORWARD_COMMAND=false - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=true + - RESTORE_SNAPSHOT=true + - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -75,7 +75,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -120,7 +120,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -165,7 +165,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -210,7 +210,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -255,7 +255,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs diff --git a/src/aof/engine.go b/src/aof/engine.go index ebbf5a6..78ab8ce 100644 --- a/src/aof/engine.go +++ b/src/aof/engine.go @@ -4,8 +4,10 @@ import ( "fmt" logstore "github.com/echovault/echovault/src/aof/log" "github.com/echovault/echovault/src/aof/preamble" + "github.com/echovault/echovault/src/utils" "log" "sync" + "time" ) // This package handles AOF logging in standalone mode only. @@ -25,8 +27,9 @@ type Engine struct { startRewrite func() finishRewrite func() - getState func() map[string]interface{} + getState func() map[string]utils.KeyData setValue func(key string, value interface{}) error + setExpiry func(key string, expireAt time.Time) error handleCommand func(command []byte) } @@ -54,7 +57,7 @@ func WithFinishRewriteFunc(f func()) func(engine *Engine) { } } -func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) { +func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) { return func(engine *Engine) { engine.getState = f } @@ -66,6 +69,12 @@ func WithSetValueFunc(f func(key string, value interface{}) error) func(engine * } } +func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(engine *Engine) { + return func(engine *Engine) { + engine.setExpiry = f + } +} + func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) { return func(engine *Engine) { engine.handleCommand = f @@ -93,11 +102,12 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { logCount: 0, startRewrite: func() {}, finishRewrite: func() {}, - getState: func() map[string]interface{} { return nil }, + getState: func() map[string]utils.KeyData { return nil }, setValue: func(key string, value interface{}) error { // No-Op by default return nil }, + setExpiry: func(key string, expireAt time.Time) error { return nil }, handleCommand: func(command []byte) {}, } @@ -111,6 +121,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { preamble.WithReadWriter(engine.preambleRW), preamble.WithGetStateFunc(engine.getState), preamble.WithSetValueFunc(engine.setValue), + preamble.WithSetExpiryFunc(engine.setExpiry), ) // Setup AOF log store engine diff --git a/src/aof/preamble/store.go b/src/aof/preamble/store.go index 66b30e4..e895350 100644 --- a/src/aof/preamble/store.go +++ b/src/aof/preamble/store.go @@ -3,11 +3,13 @@ package preamble import ( "encoding/json" "fmt" + "github.com/echovault/echovault/src/utils" "io" "log" "os" "path" "sync" + "time" ) type PreambleReadWriter interface { @@ -21,8 +23,9 @@ type PreambleStore struct { rw PreambleReadWriter mut sync.Mutex directory string - getState func() map[string]interface{} + getState func() map[string]utils.KeyData setValue func(key string, value interface{}) error + setExpiry func(key string, expireAt time.Time) error } func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { @@ -31,7 +34,7 @@ func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { } } -func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore) { +func WithGetStateFunc(f func() map[string]utils.KeyData) func(store *PreambleStore) { return func(store *PreambleStore) { store.getState = f } @@ -43,6 +46,12 @@ func WithSetValueFunc(f func(key string, value interface{}) error) func(store *P } } +func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.setExpiry = f + } +} + func WithDirectory(directory string) func(store *PreambleStore) { return func(store *PreambleStore) { store.directory = directory @@ -54,7 +63,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { rw: nil, mut: sync.Mutex{}, directory: "", - getState: func() map[string]interface{} { + getState: func() map[string]utils.KeyData { // No-Op by default return nil }, @@ -62,6 +71,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { // No-Op by default return nil }, + setExpiry: func(key string, expireAt time.Time) error { return nil }, } for _, option := range options { @@ -89,7 +99,7 @@ func (store *PreambleStore) CreatePreamble() error { store.mut.Unlock() // Get current state. - state := store.getState() + state := store.filterExpiredKeys(store.getState()) o, err := json.Marshal(state) if err != nil { return err @@ -130,16 +140,17 @@ func (store *PreambleStore) Restore() error { return nil } - state := make(map[string]interface{}) + state := make(map[string]utils.KeyData) if err = json.Unmarshal(b, &state); err != nil { return err } - for key, value := range state { - if err = store.setValue(key, value); err != nil { + for key, value := range store.filterExpiredKeys(state) { + if err = store.setValue(key, value.Value); err != nil { return fmt.Errorf("preamble store -> restore: %+v", err) } + store.setExpiry(key, value.ExpireAt) } return nil @@ -150,3 +161,17 @@ func (store *PreambleStore) Close() error { defer store.mut.Unlock() return store.rw.Close() } + +// filterExpiredKeys filters out keys that are already expired, so they are not persisted. +func (store *PreambleStore) filterExpiredKeys(state map[string]utils.KeyData) map[string]utils.KeyData { + var keysToDelete []string + for k, v := range state { + if v.ExpireAt.Before(time.Now()) { + keysToDelete = append(keysToDelete, k) + } + } + for _, key := range keysToDelete { + delete(state, key) + } + return state +} diff --git a/src/raft/fms_snapshot.go b/src/raft/fms_snapshot.go index 3694454..5f71fc8 100644 --- a/src/raft/fms_snapshot.go +++ b/src/raft/fms_snapshot.go @@ -10,7 +10,7 @@ import ( type SnapshotOpts struct { config utils.Config - data map[string]interface{} + data map[string]utils.KeyData startSnapshot func() finishSnapshot func() setLatestSnapshot func(msec int64) @@ -32,24 +32,24 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error { msec, err := strconv.Atoi(strings.Split(sink.ID(), "-")[2]) if err != nil { - sink.Cancel() + _ = sink.Cancel() return err } snapshotObject := utils.SnapshotObject{ - State: s.options.data, + State: utils.FilterExpiredKeys(s.options.data), LatestSnapshotMilliseconds: int64(msec), } o, err := json.Marshal(snapshotObject) if err != nil { - sink.Cancel() + _ = sink.Cancel() return err } if _, err = sink.Write(o); err != nil { - sink.Cancel() + _ = sink.Cancel() return err } diff --git a/src/raft/fsm.go b/src/raft/fsm.go index 09037bd..9dd2d74 100644 --- a/src/raft/fsm.go +++ b/src/raft/fsm.go @@ -120,7 +120,7 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error { } data := utils.SnapshotObject{ - State: make(map[string]interface{}), + State: make(map[string]utils.KeyData), LatestSnapshotMilliseconds: 0, } @@ -131,13 +131,14 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error { // Set state ctx := context.Background() - for k, v := range data.State { + for k, v := range utils.FilterExpiredKeys(data.State) { if _, err = fsm.options.Server.CreateKeyAndLock(ctx, k); err != nil { log.Fatal(err) } - if err = fsm.options.Server.SetValue(ctx, k, v); err != nil { + if err = fsm.options.Server.SetValue(ctx, k, v.Value); err != nil { log.Fatal(err) } + fsm.options.Server.SetExpiry(ctx, k, v.ExpireAt, false) fsm.options.Server.KeyUnlock(ctx, k) } // Set latest snapshot milliseconds diff --git a/src/server/keyspace.go b/src/server/keyspace.go index 055ed89..21937ac 100644 --- a/src/server/keyspace.go +++ b/src/server/keyspace.go @@ -230,14 +230,15 @@ func (server *Server) RemoveExpiry(key string) { // functions that require a deep copy of the state. // The copy only starts when there's no current copy in progress (represented by StateCopyInProgress atomic boolean) // and when there's no current state mutation in progress (represented by StateMutationInProgress atomic boolean) -func (server *Server) GetState() map[string]interface{} { +func (server *Server) GetState() map[string]utils.KeyData { + // Wait unit there's no state mutation or copy in progress before starting a new copy process. for { if !server.StateCopyInProgress.Load() && !server.StateMutationInProgress.Load() { server.StateCopyInProgress.Store(true) break } } - data := make(map[string]interface{}) + data := make(map[string]utils.KeyData) for k, v := range server.store { data[k] = v } diff --git a/src/server/server.go b/src/server/server.go index 685e8e9..8ab56b4 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -125,6 +125,15 @@ func NewServer(opts Opts) *Server { server.KeyUnlock(ctx, key) return nil }, + SetExpiry: func(key string, expireAt time.Time) error { + ctx := context.Background() + if _, err := server.KeyLock(ctx, key); err != nil { + return err + } + server.SetExpiry(ctx, key, expireAt, false) + server.KeyUnlock(ctx, key) + return nil + }, }) // Set up standalone AOF engine server.AOFEngine = aof.NewAOFEngine( @@ -144,6 +153,15 @@ func NewServer(opts Opts) *Server { server.KeyUnlock(ctx, key) return nil }), + aof.WithSetExpiryFunc(func(key string, expireAt time.Time) error { + ctx := context.Background() + if _, err := server.KeyLock(ctx, key); err != nil { + return err + } + server.SetExpiry(ctx, key, expireAt, false) + server.KeyUnlock(ctx, key) + return nil + }), aof.WithHandleCommandFunc(func(command []byte) { _, err := server.handleCommand(context.Background(), command, nil, true) if err != nil { diff --git a/src/snapshot/snapshot.go b/src/snapshot/snapshot.go index c7ce631..9db8e14 100644 --- a/src/snapshot/snapshot.go +++ b/src/snapshot/snapshot.go @@ -27,10 +27,11 @@ type Opts struct { Config utils.Config StartSnapshot func() FinishSnapshot func() - GetState func() map[string]interface{} + GetState func() map[string]utils.KeyData SetLatestSnapshotMilliseconds func(msec int64) GetLatestSnapshotMilliseconds func() int64 SetValue func(key string, value interface{}) error + SetExpiry func(key string, expireAt time.Time) error } type Engine struct { @@ -125,7 +126,7 @@ func (engine *Engine) TakeSnapshot() error { // Get current state snapshotObject := utils.SnapshotObject{ - State: engine.options.GetState(), + State: utils.FilterExpiredKeys(engine.options.GetState()), LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(), } out, err := json.Marshal(snapshotObject) @@ -261,9 +262,12 @@ func (engine *Engine) Restore(ctx context.Context) error { engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds) - for key, value := range snapshotObject.State { - if err = engine.options.SetValue(key, value); err != nil { - return fmt.Errorf("snapshot engine -> restore: %+v", err) + for key, value := range utils.FilterExpiredKeys(snapshotObject.State) { + if err = engine.options.SetValue(key, value.Value); err != nil { + return fmt.Errorf("snapshot engine -> restore value: %+v", err) + } + if err = engine.options.SetExpiry(key, value.ExpireAt); err != nil { + return fmt.Errorf("snapshot engine -> restore expiry: %+v", err) } } diff --git a/src/utils/types.go b/src/utils/types.go index 2ceb0d9..528676e 100644 --- a/src/utils/types.go +++ b/src/utils/types.go @@ -25,7 +25,7 @@ type Server interface { SetExpiry(ctx context.Context, key string, expire time.Time, touch bool) RemoveExpiry(key string) DeleteKey(ctx context.Context, key string) error - GetState() map[string]interface{} + GetState() map[string]KeyData GetAllCommands(ctx context.Context) []Command GetACL() interface{} GetPubSub() interface{} @@ -89,6 +89,6 @@ type ACL interface { type PubSub interface{} type SnapshotObject struct { - State map[string]interface{} + State map[string]KeyData LatestSnapshotMilliseconds int64 } diff --git a/src/utils/utils.go b/src/utils/utils.go index 8fb237d..a56862a 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -185,3 +185,22 @@ func IsMaxMemoryExceeded(maxMemory uint64) bool { // Return true when whe are above or equal to max memory. return memStats.HeapInuse >= maxMemory } + +// FilterExpiredKeys filters out keys that are already expired, so they are not persisted. +func FilterExpiredKeys(state map[string]KeyData) map[string]KeyData { + var keysToDelete []string + for k, v := range state { + // Skip keys with no expiry time. + if v.ExpireAt == (time.Time{}) { + continue + } + // If the key is already expired, mark it for deletion. + if v.ExpireAt.Before(time.Now()) { + keysToDelete = append(keysToDelete, k) + } + } + for _, key := range keysToDelete { + delete(state, key) + } + return state +}