diff --git a/docker-compose.yaml b/docker-compose.yaml index 6251cb1..caa25b4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -27,11 +27,11 @@ services: - FORWARD_COMMAND=false - SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_INTERVAL=5m30s - - RESTORE_SNAPSHOT=false - - RESTORE_AOF=true + - RESTORE_SNAPSHOT=true + - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -75,7 +75,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -120,7 +120,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -165,7 +165,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -210,7 +210,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs @@ -255,7 +255,7 @@ services: - RESTORE_AOF=false - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - - EVICTION_POLICY=allkeys-lfu + - EVICTION_POLICY=noeviction - EVICTION_SAMPLE=20 - EVICTION_INTERVAL=100ms # List of server cert/key pairs diff --git a/src/aof/engine.go b/src/aof/engine.go index ebbf5a6..78ab8ce 100644 --- a/src/aof/engine.go +++ b/src/aof/engine.go @@ -4,8 +4,10 @@ import ( "fmt" logstore "github.com/echovault/echovault/src/aof/log" "github.com/echovault/echovault/src/aof/preamble" + "github.com/echovault/echovault/src/utils" "log" "sync" + "time" ) // This package handles AOF logging in standalone mode only. @@ -25,8 +27,9 @@ type Engine struct { startRewrite func() finishRewrite func() - getState func() map[string]interface{} + getState func() map[string]utils.KeyData setValue func(key string, value interface{}) error + setExpiry func(key string, expireAt time.Time) error handleCommand func(command []byte) } @@ -54,7 +57,7 @@ func WithFinishRewriteFunc(f func()) func(engine *Engine) { } } -func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) { +func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) { return func(engine *Engine) { engine.getState = f } @@ -66,6 +69,12 @@ func WithSetValueFunc(f func(key string, value interface{}) error) func(engine * } } +func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(engine *Engine) { + return func(engine *Engine) { + engine.setExpiry = f + } +} + func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) { return func(engine *Engine) { engine.handleCommand = f @@ -93,11 +102,12 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { logCount: 0, startRewrite: func() {}, finishRewrite: func() {}, - getState: func() map[string]interface{} { return nil }, + getState: func() map[string]utils.KeyData { return nil }, setValue: func(key string, value interface{}) error { // No-Op by default return nil }, + setExpiry: func(key string, expireAt time.Time) error { return nil }, handleCommand: func(command []byte) {}, } @@ -111,6 +121,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { preamble.WithReadWriter(engine.preambleRW), preamble.WithGetStateFunc(engine.getState), preamble.WithSetValueFunc(engine.setValue), + preamble.WithSetExpiryFunc(engine.setExpiry), ) // Setup AOF log store engine diff --git a/src/aof/preamble/store.go b/src/aof/preamble/store.go index 66b30e4..e895350 100644 --- a/src/aof/preamble/store.go +++ b/src/aof/preamble/store.go @@ -3,11 +3,13 @@ package preamble import ( "encoding/json" "fmt" + "github.com/echovault/echovault/src/utils" "io" "log" "os" "path" "sync" + "time" ) type PreambleReadWriter interface { @@ -21,8 +23,9 @@ type PreambleStore struct { rw PreambleReadWriter mut sync.Mutex directory string - getState func() map[string]interface{} + getState func() map[string]utils.KeyData setValue func(key string, value interface{}) error + setExpiry func(key string, expireAt time.Time) error } func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { @@ -31,7 +34,7 @@ func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { } } -func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore) { +func WithGetStateFunc(f func() map[string]utils.KeyData) func(store *PreambleStore) { return func(store *PreambleStore) { store.getState = f } @@ -43,6 +46,12 @@ func WithSetValueFunc(f func(key string, value interface{}) error) func(store *P } } +func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.setExpiry = f + } +} + func WithDirectory(directory string) func(store *PreambleStore) { return func(store *PreambleStore) { store.directory = directory @@ -54,7 +63,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { rw: nil, mut: sync.Mutex{}, directory: "", - getState: func() map[string]interface{} { + getState: func() map[string]utils.KeyData { // No-Op by default return nil }, @@ -62,6 +71,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { // No-Op by default return nil }, + setExpiry: func(key string, expireAt time.Time) error { return nil }, } for _, option := range options { @@ -89,7 +99,7 @@ func (store *PreambleStore) CreatePreamble() error { store.mut.Unlock() // Get current state. - state := store.getState() + state := store.filterExpiredKeys(store.getState()) o, err := json.Marshal(state) if err != nil { return err @@ -130,16 +140,17 @@ func (store *PreambleStore) Restore() error { return nil } - state := make(map[string]interface{}) + state := make(map[string]utils.KeyData) if err = json.Unmarshal(b, &state); err != nil { return err } - for key, value := range state { - if err = store.setValue(key, value); err != nil { + for key, value := range store.filterExpiredKeys(state) { + if err = store.setValue(key, value.Value); err != nil { return fmt.Errorf("preamble store -> restore: %+v", err) } + store.setExpiry(key, value.ExpireAt) } return nil @@ -150,3 +161,17 @@ func (store *PreambleStore) Close() error { defer store.mut.Unlock() return store.rw.Close() } + +// filterExpiredKeys filters out keys that are already expired, so they are not persisted. +func (store *PreambleStore) filterExpiredKeys(state map[string]utils.KeyData) map[string]utils.KeyData { + var keysToDelete []string + for k, v := range state { + if v.ExpireAt.Before(time.Now()) { + keysToDelete = append(keysToDelete, k) + } + } + for _, key := range keysToDelete { + delete(state, key) + } + return state +} diff --git a/src/raft/fms_snapshot.go b/src/raft/fms_snapshot.go index 3694454..5f71fc8 100644 --- a/src/raft/fms_snapshot.go +++ b/src/raft/fms_snapshot.go @@ -10,7 +10,7 @@ import ( type SnapshotOpts struct { config utils.Config - data map[string]interface{} + data map[string]utils.KeyData startSnapshot func() finishSnapshot func() setLatestSnapshot func(msec int64) @@ -32,24 +32,24 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error { msec, err := strconv.Atoi(strings.Split(sink.ID(), "-")[2]) if err != nil { - sink.Cancel() + _ = sink.Cancel() return err } snapshotObject := utils.SnapshotObject{ - State: s.options.data, + State: utils.FilterExpiredKeys(s.options.data), LatestSnapshotMilliseconds: int64(msec), } o, err := json.Marshal(snapshotObject) if err != nil { - sink.Cancel() + _ = sink.Cancel() return err } if _, err = sink.Write(o); err != nil { - sink.Cancel() + _ = sink.Cancel() return err } diff --git a/src/raft/fsm.go b/src/raft/fsm.go index 09037bd..9dd2d74 100644 --- a/src/raft/fsm.go +++ b/src/raft/fsm.go @@ -120,7 +120,7 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error { } data := utils.SnapshotObject{ - State: make(map[string]interface{}), + State: make(map[string]utils.KeyData), LatestSnapshotMilliseconds: 0, } @@ -131,13 +131,14 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error { // Set state ctx := context.Background() - for k, v := range data.State { + for k, v := range utils.FilterExpiredKeys(data.State) { if _, err = fsm.options.Server.CreateKeyAndLock(ctx, k); err != nil { log.Fatal(err) } - if err = fsm.options.Server.SetValue(ctx, k, v); err != nil { + if err = fsm.options.Server.SetValue(ctx, k, v.Value); err != nil { log.Fatal(err) } + fsm.options.Server.SetExpiry(ctx, k, v.ExpireAt, false) fsm.options.Server.KeyUnlock(ctx, k) } // Set latest snapshot milliseconds diff --git a/src/server/keyspace.go b/src/server/keyspace.go index 055ed89..21937ac 100644 --- a/src/server/keyspace.go +++ b/src/server/keyspace.go @@ -230,14 +230,15 @@ func (server *Server) RemoveExpiry(key string) { // functions that require a deep copy of the state. // The copy only starts when there's no current copy in progress (represented by StateCopyInProgress atomic boolean) // and when there's no current state mutation in progress (represented by StateMutationInProgress atomic boolean) -func (server *Server) GetState() map[string]interface{} { +func (server *Server) GetState() map[string]utils.KeyData { + // Wait unit there's no state mutation or copy in progress before starting a new copy process. for { if !server.StateCopyInProgress.Load() && !server.StateMutationInProgress.Load() { server.StateCopyInProgress.Store(true) break } } - data := make(map[string]interface{}) + data := make(map[string]utils.KeyData) for k, v := range server.store { data[k] = v } diff --git a/src/server/server.go b/src/server/server.go index 685e8e9..8ab56b4 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -125,6 +125,15 @@ func NewServer(opts Opts) *Server { server.KeyUnlock(ctx, key) return nil }, + SetExpiry: func(key string, expireAt time.Time) error { + ctx := context.Background() + if _, err := server.KeyLock(ctx, key); err != nil { + return err + } + server.SetExpiry(ctx, key, expireAt, false) + server.KeyUnlock(ctx, key) + return nil + }, }) // Set up standalone AOF engine server.AOFEngine = aof.NewAOFEngine( @@ -144,6 +153,15 @@ func NewServer(opts Opts) *Server { server.KeyUnlock(ctx, key) return nil }), + aof.WithSetExpiryFunc(func(key string, expireAt time.Time) error { + ctx := context.Background() + if _, err := server.KeyLock(ctx, key); err != nil { + return err + } + server.SetExpiry(ctx, key, expireAt, false) + server.KeyUnlock(ctx, key) + return nil + }), aof.WithHandleCommandFunc(func(command []byte) { _, err := server.handleCommand(context.Background(), command, nil, true) if err != nil { diff --git a/src/snapshot/snapshot.go b/src/snapshot/snapshot.go index c7ce631..9db8e14 100644 --- a/src/snapshot/snapshot.go +++ b/src/snapshot/snapshot.go @@ -27,10 +27,11 @@ type Opts struct { Config utils.Config StartSnapshot func() FinishSnapshot func() - GetState func() map[string]interface{} + GetState func() map[string]utils.KeyData SetLatestSnapshotMilliseconds func(msec int64) GetLatestSnapshotMilliseconds func() int64 SetValue func(key string, value interface{}) error + SetExpiry func(key string, expireAt time.Time) error } type Engine struct { @@ -125,7 +126,7 @@ func (engine *Engine) TakeSnapshot() error { // Get current state snapshotObject := utils.SnapshotObject{ - State: engine.options.GetState(), + State: utils.FilterExpiredKeys(engine.options.GetState()), LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(), } out, err := json.Marshal(snapshotObject) @@ -261,9 +262,12 @@ func (engine *Engine) Restore(ctx context.Context) error { engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds) - for key, value := range snapshotObject.State { - if err = engine.options.SetValue(key, value); err != nil { - return fmt.Errorf("snapshot engine -> restore: %+v", err) + for key, value := range utils.FilterExpiredKeys(snapshotObject.State) { + if err = engine.options.SetValue(key, value.Value); err != nil { + return fmt.Errorf("snapshot engine -> restore value: %+v", err) + } + if err = engine.options.SetExpiry(key, value.ExpireAt); err != nil { + return fmt.Errorf("snapshot engine -> restore expiry: %+v", err) } } diff --git a/src/utils/types.go b/src/utils/types.go index 2ceb0d9..528676e 100644 --- a/src/utils/types.go +++ b/src/utils/types.go @@ -25,7 +25,7 @@ type Server interface { SetExpiry(ctx context.Context, key string, expire time.Time, touch bool) RemoveExpiry(key string) DeleteKey(ctx context.Context, key string) error - GetState() map[string]interface{} + GetState() map[string]KeyData GetAllCommands(ctx context.Context) []Command GetACL() interface{} GetPubSub() interface{} @@ -89,6 +89,6 @@ type ACL interface { type PubSub interface{} type SnapshotObject struct { - State map[string]interface{} + State map[string]KeyData LatestSnapshotMilliseconds int64 } diff --git a/src/utils/utils.go b/src/utils/utils.go index 8fb237d..a56862a 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -185,3 +185,22 @@ func IsMaxMemoryExceeded(maxMemory uint64) bool { // Return true when whe are above or equal to max memory. return memStats.HeapInuse >= maxMemory } + +// FilterExpiredKeys filters out keys that are already expired, so they are not persisted. +func FilterExpiredKeys(state map[string]KeyData) map[string]KeyData { + var keysToDelete []string + for k, v := range state { + // Skip keys with no expiry time. + if v.ExpireAt == (time.Time{}) { + continue + } + // If the key is already expired, mark it for deletion. + if v.ExpireAt.Before(time.Now()) { + keysToDelete = append(keysToDelete, k) + } + } + for _, key := range keysToDelete { + delete(state, key) + } + return state +}