Take expiry into account when creating snashot and AOF preamble. If the key is already expired when snapshot is taken, it will not be persisted. If the key is expired when loading a snapshot/preamble, it will not be restored.

This commit is contained in:
Kelvin Mwinuka
2024-03-12 21:35:39 +08:00
parent f27a0dda79
commit 52646d1564
10 changed files with 114 additions and 35 deletions

View File

@@ -27,11 +27,11 @@ services:
- FORWARD_COMMAND=false - FORWARD_COMMAND=false
- SNAPSHOT_THRESHOLD=1000 - SNAPSHOT_THRESHOLD=1000
- SNAPSHOT_INTERVAL=5m30s - SNAPSHOT_INTERVAL=5m30s
- RESTORE_SNAPSHOT=false - RESTORE_SNAPSHOT=true
- RESTORE_AOF=true - RESTORE_AOF=false
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=noeviction
- EVICTION_SAMPLE=20 - EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms - EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
@@ -75,7 +75,7 @@ services:
- RESTORE_AOF=false - RESTORE_AOF=false
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=noeviction
- EVICTION_SAMPLE=20 - EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms - EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
@@ -120,7 +120,7 @@ services:
- RESTORE_AOF=false - RESTORE_AOF=false
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=noeviction
- EVICTION_SAMPLE=20 - EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms - EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
@@ -165,7 +165,7 @@ services:
- RESTORE_AOF=false - RESTORE_AOF=false
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=noeviction
- EVICTION_SAMPLE=20 - EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms - EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
@@ -210,7 +210,7 @@ services:
- RESTORE_AOF=false - RESTORE_AOF=false
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=noeviction
- EVICTION_SAMPLE=20 - EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms - EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
@@ -255,7 +255,7 @@ services:
- RESTORE_AOF=false - RESTORE_AOF=false
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=noeviction
- EVICTION_SAMPLE=20 - EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms - EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs

View File

@@ -4,8 +4,10 @@ import (
"fmt" "fmt"
logstore "github.com/echovault/echovault/src/aof/log" logstore "github.com/echovault/echovault/src/aof/log"
"github.com/echovault/echovault/src/aof/preamble" "github.com/echovault/echovault/src/aof/preamble"
"github.com/echovault/echovault/src/utils"
"log" "log"
"sync" "sync"
"time"
) )
// This package handles AOF logging in standalone mode only. // This package handles AOF logging in standalone mode only.
@@ -25,8 +27,9 @@ type Engine struct {
startRewrite func() startRewrite func()
finishRewrite func() finishRewrite func()
getState func() map[string]interface{} getState func() map[string]utils.KeyData
setValue func(key string, value interface{}) error setValue func(key string, value interface{}) error
setExpiry func(key string, expireAt time.Time) error
handleCommand func(command []byte) handleCommand func(command []byte)
} }
@@ -54,7 +57,7 @@ func WithFinishRewriteFunc(f func()) func(engine *Engine) {
} }
} }
func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) { func WithGetStateFunc(f func() map[string]utils.KeyData) func(engine *Engine) {
return func(engine *Engine) { return func(engine *Engine) {
engine.getState = f engine.getState = f
} }
@@ -66,6 +69,12 @@ func WithSetValueFunc(f func(key string, value interface{}) error) func(engine *
} }
} }
func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(engine *Engine) {
return func(engine *Engine) {
engine.setExpiry = f
}
}
func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) { func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) {
return func(engine *Engine) { return func(engine *Engine) {
engine.handleCommand = f engine.handleCommand = f
@@ -93,11 +102,12 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {
logCount: 0, logCount: 0,
startRewrite: func() {}, startRewrite: func() {},
finishRewrite: func() {}, finishRewrite: func() {},
getState: func() map[string]interface{} { return nil }, getState: func() map[string]utils.KeyData { return nil },
setValue: func(key string, value interface{}) error { setValue: func(key string, value interface{}) error {
// No-Op by default // No-Op by default
return nil return nil
}, },
setExpiry: func(key string, expireAt time.Time) error { return nil },
handleCommand: func(command []byte) {}, handleCommand: func(command []byte) {},
} }
@@ -111,6 +121,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {
preamble.WithReadWriter(engine.preambleRW), preamble.WithReadWriter(engine.preambleRW),
preamble.WithGetStateFunc(engine.getState), preamble.WithGetStateFunc(engine.getState),
preamble.WithSetValueFunc(engine.setValue), preamble.WithSetValueFunc(engine.setValue),
preamble.WithSetExpiryFunc(engine.setExpiry),
) )
// Setup AOF log store engine // Setup AOF log store engine

View File

@@ -3,11 +3,13 @@ package preamble
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/echovault/echovault/src/utils"
"io" "io"
"log" "log"
"os" "os"
"path" "path"
"sync" "sync"
"time"
) )
type PreambleReadWriter interface { type PreambleReadWriter interface {
@@ -21,8 +23,9 @@ type PreambleStore struct {
rw PreambleReadWriter rw PreambleReadWriter
mut sync.Mutex mut sync.Mutex
directory string directory string
getState func() map[string]interface{} getState func() map[string]utils.KeyData
setValue func(key string, value interface{}) error setValue func(key string, value interface{}) error
setExpiry func(key string, expireAt time.Time) error
} }
func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
@@ -31,7 +34,7 @@ func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
} }
} }
func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore) { func WithGetStateFunc(f func() map[string]utils.KeyData) func(store *PreambleStore) {
return func(store *PreambleStore) { return func(store *PreambleStore) {
store.getState = f store.getState = f
} }
@@ -43,6 +46,12 @@ func WithSetValueFunc(f func(key string, value interface{}) error) func(store *P
} }
} }
func WithSetExpiryFunc(f func(key string, expireAt time.Time) error) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.setExpiry = f
}
}
func WithDirectory(directory string) func(store *PreambleStore) { func WithDirectory(directory string) func(store *PreambleStore) {
return func(store *PreambleStore) { return func(store *PreambleStore) {
store.directory = directory store.directory = directory
@@ -54,7 +63,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
rw: nil, rw: nil,
mut: sync.Mutex{}, mut: sync.Mutex{},
directory: "", directory: "",
getState: func() map[string]interface{} { getState: func() map[string]utils.KeyData {
// No-Op by default // No-Op by default
return nil return nil
}, },
@@ -62,6 +71,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
// No-Op by default // No-Op by default
return nil return nil
}, },
setExpiry: func(key string, expireAt time.Time) error { return nil },
} }
for _, option := range options { for _, option := range options {
@@ -89,7 +99,7 @@ func (store *PreambleStore) CreatePreamble() error {
store.mut.Unlock() store.mut.Unlock()
// Get current state. // Get current state.
state := store.getState() state := store.filterExpiredKeys(store.getState())
o, err := json.Marshal(state) o, err := json.Marshal(state)
if err != nil { if err != nil {
return err return err
@@ -130,16 +140,17 @@ func (store *PreambleStore) Restore() error {
return nil return nil
} }
state := make(map[string]interface{}) state := make(map[string]utils.KeyData)
if err = json.Unmarshal(b, &state); err != nil { if err = json.Unmarshal(b, &state); err != nil {
return err return err
} }
for key, value := range state { for key, value := range store.filterExpiredKeys(state) {
if err = store.setValue(key, value); err != nil { if err = store.setValue(key, value.Value); err != nil {
return fmt.Errorf("preamble store -> restore: %+v", err) return fmt.Errorf("preamble store -> restore: %+v", err)
} }
store.setExpiry(key, value.ExpireAt)
} }
return nil return nil
@@ -150,3 +161,17 @@ func (store *PreambleStore) Close() error {
defer store.mut.Unlock() defer store.mut.Unlock()
return store.rw.Close() return store.rw.Close()
} }
// filterExpiredKeys filters out keys that are already expired, so they are not persisted.
func (store *PreambleStore) filterExpiredKeys(state map[string]utils.KeyData) map[string]utils.KeyData {
var keysToDelete []string
for k, v := range state {
if v.ExpireAt.Before(time.Now()) {
keysToDelete = append(keysToDelete, k)
}
}
for _, key := range keysToDelete {
delete(state, key)
}
return state
}

View File

@@ -10,7 +10,7 @@ import (
type SnapshotOpts struct { type SnapshotOpts struct {
config utils.Config config utils.Config
data map[string]interface{} data map[string]utils.KeyData
startSnapshot func() startSnapshot func()
finishSnapshot func() finishSnapshot func()
setLatestSnapshot func(msec int64) setLatestSnapshot func(msec int64)
@@ -32,24 +32,24 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
msec, err := strconv.Atoi(strings.Split(sink.ID(), "-")[2]) msec, err := strconv.Atoi(strings.Split(sink.ID(), "-")[2])
if err != nil { if err != nil {
sink.Cancel() _ = sink.Cancel()
return err return err
} }
snapshotObject := utils.SnapshotObject{ snapshotObject := utils.SnapshotObject{
State: s.options.data, State: utils.FilterExpiredKeys(s.options.data),
LatestSnapshotMilliseconds: int64(msec), LatestSnapshotMilliseconds: int64(msec),
} }
o, err := json.Marshal(snapshotObject) o, err := json.Marshal(snapshotObject)
if err != nil { if err != nil {
sink.Cancel() _ = sink.Cancel()
return err return err
} }
if _, err = sink.Write(o); err != nil { if _, err = sink.Write(o); err != nil {
sink.Cancel() _ = sink.Cancel()
return err return err
} }

View File

@@ -120,7 +120,7 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error {
} }
data := utils.SnapshotObject{ data := utils.SnapshotObject{
State: make(map[string]interface{}), State: make(map[string]utils.KeyData),
LatestSnapshotMilliseconds: 0, LatestSnapshotMilliseconds: 0,
} }
@@ -131,13 +131,14 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error {
// Set state // Set state
ctx := context.Background() ctx := context.Background()
for k, v := range data.State { for k, v := range utils.FilterExpiredKeys(data.State) {
if _, err = fsm.options.Server.CreateKeyAndLock(ctx, k); err != nil { if _, err = fsm.options.Server.CreateKeyAndLock(ctx, k); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if err = fsm.options.Server.SetValue(ctx, k, v); err != nil { if err = fsm.options.Server.SetValue(ctx, k, v.Value); err != nil {
log.Fatal(err) log.Fatal(err)
} }
fsm.options.Server.SetExpiry(ctx, k, v.ExpireAt, false)
fsm.options.Server.KeyUnlock(ctx, k) fsm.options.Server.KeyUnlock(ctx, k)
} }
// Set latest snapshot milliseconds // Set latest snapshot milliseconds

View File

@@ -230,14 +230,15 @@ func (server *Server) RemoveExpiry(key string) {
// functions that require a deep copy of the state. // functions that require a deep copy of the state.
// The copy only starts when there's no current copy in progress (represented by StateCopyInProgress atomic boolean) // The copy only starts when there's no current copy in progress (represented by StateCopyInProgress atomic boolean)
// and when there's no current state mutation in progress (represented by StateMutationInProgress atomic boolean) // and when there's no current state mutation in progress (represented by StateMutationInProgress atomic boolean)
func (server *Server) GetState() map[string]interface{} { func (server *Server) GetState() map[string]utils.KeyData {
// Wait unit there's no state mutation or copy in progress before starting a new copy process.
for { for {
if !server.StateCopyInProgress.Load() && !server.StateMutationInProgress.Load() { if !server.StateCopyInProgress.Load() && !server.StateMutationInProgress.Load() {
server.StateCopyInProgress.Store(true) server.StateCopyInProgress.Store(true)
break break
} }
} }
data := make(map[string]interface{}) data := make(map[string]utils.KeyData)
for k, v := range server.store { for k, v := range server.store {
data[k] = v data[k] = v
} }

View File

@@ -125,6 +125,15 @@ func NewServer(opts Opts) *Server {
server.KeyUnlock(ctx, key) server.KeyUnlock(ctx, key)
return nil return nil
}, },
SetExpiry: func(key string, expireAt time.Time) error {
ctx := context.Background()
if _, err := server.KeyLock(ctx, key); err != nil {
return err
}
server.SetExpiry(ctx, key, expireAt, false)
server.KeyUnlock(ctx, key)
return nil
},
}) })
// Set up standalone AOF engine // Set up standalone AOF engine
server.AOFEngine = aof.NewAOFEngine( server.AOFEngine = aof.NewAOFEngine(
@@ -144,6 +153,15 @@ func NewServer(opts Opts) *Server {
server.KeyUnlock(ctx, key) server.KeyUnlock(ctx, key)
return nil return nil
}), }),
aof.WithSetExpiryFunc(func(key string, expireAt time.Time) error {
ctx := context.Background()
if _, err := server.KeyLock(ctx, key); err != nil {
return err
}
server.SetExpiry(ctx, key, expireAt, false)
server.KeyUnlock(ctx, key)
return nil
}),
aof.WithHandleCommandFunc(func(command []byte) { aof.WithHandleCommandFunc(func(command []byte) {
_, err := server.handleCommand(context.Background(), command, nil, true) _, err := server.handleCommand(context.Background(), command, nil, true)
if err != nil { if err != nil {

View File

@@ -27,10 +27,11 @@ type Opts struct {
Config utils.Config Config utils.Config
StartSnapshot func() StartSnapshot func()
FinishSnapshot func() FinishSnapshot func()
GetState func() map[string]interface{} GetState func() map[string]utils.KeyData
SetLatestSnapshotMilliseconds func(msec int64) SetLatestSnapshotMilliseconds func(msec int64)
GetLatestSnapshotMilliseconds func() int64 GetLatestSnapshotMilliseconds func() int64
SetValue func(key string, value interface{}) error SetValue func(key string, value interface{}) error
SetExpiry func(key string, expireAt time.Time) error
} }
type Engine struct { type Engine struct {
@@ -125,7 +126,7 @@ func (engine *Engine) TakeSnapshot() error {
// Get current state // Get current state
snapshotObject := utils.SnapshotObject{ snapshotObject := utils.SnapshotObject{
State: engine.options.GetState(), State: utils.FilterExpiredKeys(engine.options.GetState()),
LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(), LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(),
} }
out, err := json.Marshal(snapshotObject) out, err := json.Marshal(snapshotObject)
@@ -261,9 +262,12 @@ func (engine *Engine) Restore(ctx context.Context) error {
engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds) engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds)
for key, value := range snapshotObject.State { for key, value := range utils.FilterExpiredKeys(snapshotObject.State) {
if err = engine.options.SetValue(key, value); err != nil { if err = engine.options.SetValue(key, value.Value); err != nil {
return fmt.Errorf("snapshot engine -> restore: %+v", err) return fmt.Errorf("snapshot engine -> restore value: %+v", err)
}
if err = engine.options.SetExpiry(key, value.ExpireAt); err != nil {
return fmt.Errorf("snapshot engine -> restore expiry: %+v", err)
} }
} }

View File

@@ -25,7 +25,7 @@ type Server interface {
SetExpiry(ctx context.Context, key string, expire time.Time, touch bool) SetExpiry(ctx context.Context, key string, expire time.Time, touch bool)
RemoveExpiry(key string) RemoveExpiry(key string)
DeleteKey(ctx context.Context, key string) error DeleteKey(ctx context.Context, key string) error
GetState() map[string]interface{} GetState() map[string]KeyData
GetAllCommands(ctx context.Context) []Command GetAllCommands(ctx context.Context) []Command
GetACL() interface{} GetACL() interface{}
GetPubSub() interface{} GetPubSub() interface{}
@@ -89,6 +89,6 @@ type ACL interface {
type PubSub interface{} type PubSub interface{}
type SnapshotObject struct { type SnapshotObject struct {
State map[string]interface{} State map[string]KeyData
LatestSnapshotMilliseconds int64 LatestSnapshotMilliseconds int64
} }

View File

@@ -185,3 +185,22 @@ func IsMaxMemoryExceeded(maxMemory uint64) bool {
// Return true when whe are above or equal to max memory. // Return true when whe are above or equal to max memory.
return memStats.HeapInuse >= maxMemory return memStats.HeapInuse >= maxMemory
} }
// FilterExpiredKeys filters out keys that are already expired, so they are not persisted.
func FilterExpiredKeys(state map[string]KeyData) map[string]KeyData {
var keysToDelete []string
for k, v := range state {
// Skip keys with no expiry time.
if v.ExpireAt == (time.Time{}) {
continue
}
// If the key is already expired, mark it for deletion.
if v.ExpireAt.Before(time.Now()) {
keysToDelete = append(keysToDelete, k)
}
}
for _, key := range keysToDelete {
delete(state, key)
}
return state
}