diff --git a/src/aof/engine.go b/src/aof/engine.go index 43f8a58..ebbf5a6 100644 --- a/src/aof/engine.go +++ b/src/aof/engine.go @@ -26,7 +26,7 @@ type Engine struct { startRewrite func() finishRewrite func() getState func() map[string]interface{} - setValue func(key string, value interface{}) + setValue func(key string, value interface{}) error handleCommand func(command []byte) } @@ -60,7 +60,7 @@ func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) { } } -func WithSetValueFunc(f func(key string, value interface{})) func(engine *Engine) { +func WithSetValueFunc(f func(key string, value interface{}) error) func(engine *Engine) { return func(engine *Engine) { engine.setValue = f } @@ -94,7 +94,10 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { startRewrite: func() {}, finishRewrite: func() {}, getState: func() map[string]interface{} { return nil }, - setValue: func(key string, value interface{}) {}, + setValue: func(key string, value interface{}) error { + // No-Op by default + return nil + }, handleCommand: func(command []byte) {}, } diff --git a/src/aof/preamble/store.go b/src/aof/preamble/store.go index 08f2265..66b30e4 100644 --- a/src/aof/preamble/store.go +++ b/src/aof/preamble/store.go @@ -22,7 +22,7 @@ type PreambleStore struct { mut sync.Mutex directory string getState func() map[string]interface{} - setValue func(key string, value interface{}) + setValue func(key string, value interface{}) error } func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { @@ -37,7 +37,7 @@ func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore } } -func WithSetValueFunc(f func(key string, value interface{})) func(store *PreambleStore) { +func WithSetValueFunc(f func(key string, value interface{}) error) func(store *PreambleStore) { return func(store *PreambleStore) { store.setValue = f } @@ -58,8 +58,9 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { // No-Op by default return nil }, - setValue: func(key string, value interface{}) { + setValue: func(key string, value interface{}) error { // No-Op by default + return nil }, } @@ -71,7 +72,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { if store.rw == nil { err := os.MkdirAll(path.Join(store.directory, "aof"), os.ModePerm) if err != nil { - log.Println(fmt.Errorf("new preamle store -> mkdir error: %+v", err)) + log.Println(fmt.Errorf("new preamble store -> mkdir error: %+v", err)) } f, err := os.OpenFile(path.Join(store.directory, "aof", "preamble.bin"), os.O_RDWR|os.O_CREATE, os.ModePerm) if err != nil { @@ -136,7 +137,9 @@ func (store *PreambleStore) Restore() error { } for key, value := range state { - store.setValue(key, value) + if err = store.setValue(key, value); err != nil { + return fmt.Errorf("preamble store -> restore: %+v", err) + } } return nil diff --git a/src/server/keyspace.go b/src/server/keyspace.go index c854406..a12357f 100644 --- a/src/server/keyspace.go +++ b/src/server/keyspace.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "github.com/echovault/echovault/src/utils" "slices" "strings" @@ -60,6 +61,10 @@ func (server *Server) KeyExists(key string) bool { // CreateKeyAndLock creates a new key lock and immediately locks it if the key does not exist. // If the key exists, the existing key is locked. func (server *Server) CreateKeyAndLock(ctx context.Context, key string) (bool, error) { + if utils.IsMaxMemoryExceeded() && server.Config.EvictionPolicy == utils.NoEviction { + return false, errors.New("max memory reached, key not created") + } + server.keyCreationLock.Lock() defer server.keyCreationLock.Unlock() @@ -85,8 +90,10 @@ func (server *Server) GetValue(key string) interface{} { // in the snapshot engine. // This count triggers a snapshot when the threshold is reached. // The key must be locked prior to calling this function. -func (server *Server) SetValue(_ context.Context, key string, value interface{}) { - // TODO: If max-memory is exceeded and eviction policy is noeviction, do not store the new value +func (server *Server) SetValue(_ context.Context, key string, value interface{}) error { + if utils.IsMaxMemoryExceeded() && server.Config.EvictionPolicy == utils.NoEviction { + return errors.New("max memory reached, key value not set") + } server.store[key] = value @@ -95,6 +102,8 @@ func (server *Server) SetValue(_ context.Context, key string, value interface{}) if !server.IsInCluster() { server.SnapshotEngine.IncrementChangeCount() } + + return nil } // The SetKeyExpiry receiver function sets the expiry time of a key. diff --git a/src/server/server.go b/src/server/server.go index 7832c7c..9648b7c 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -95,9 +95,16 @@ func NewServer(opts Opts) *Server { GetState: server.GetState, SetLatestSnapshotMilliseconds: server.SetLatestSnapshot, GetLatestSnapshotMilliseconds: server.GetLatestSnapshot, - CreateKeyAndLock: server.CreateKeyAndLock, - KeyUnlock: server.KeyUnlock, - SetValue: server.SetValue, + SetValue: func(key string, value interface{}) error { + if _, err := server.CreateKeyAndLock(context.Background(), key); err != nil { + return err + } + if err := server.SetValue(context.Background(), key, value); err != nil { + return err + } + server.KeyUnlock(key) + return nil + }, }) // Set up standalone AOF engine server.AOFEngine = aof.NewAOFEngine( @@ -106,13 +113,15 @@ func NewServer(opts Opts) *Server { aof.WithStartRewriteFunc(server.StartRewriteAOF), aof.WithFinishRewriteFunc(server.FinishRewriteAOF), aof.WithGetStateFunc(server.GetState), - aof.WithSetValueFunc(func(key string, value interface{}) { + aof.WithSetValueFunc(func(key string, value interface{}) error { if _, err := server.CreateKeyAndLock(context.Background(), key); err != nil { - log.Println(err) - return + return err + } + if err := server.SetValue(context.Background(), key, value); err != nil { + return err } - server.SetValue(context.Background(), key, value) server.KeyUnlock(key) + return nil }), aof.WithHandleCommandFunc(func(command []byte) { _, err := server.handleCommand(context.Background(), command, nil, true) diff --git a/src/snapshot/snapshot.go b/src/snapshot/snapshot.go index f1fb680..c7ce631 100644 --- a/src/snapshot/snapshot.go +++ b/src/snapshot/snapshot.go @@ -30,9 +30,7 @@ type Opts struct { GetState func() map[string]interface{} SetLatestSnapshotMilliseconds func(msec int64) GetLatestSnapshotMilliseconds func() int64 - CreateKeyAndLock func(ctx context.Context, key string) (bool, error) - KeyUnlock func(key string) - SetValue func(ctx context.Context, key string, value interface{}) + SetValue func(key string, value interface{}) error } type Engine struct { @@ -264,11 +262,9 @@ func (engine *Engine) Restore(ctx context.Context) error { engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds) for key, value := range snapshotObject.State { - if _, err = engine.options.CreateKeyAndLock(ctx, key); err != nil { - log.Println(fmt.Errorf("could not load value at key %s with error: %s", key, err.Error())) + if err = engine.options.SetValue(key, value); err != nil { + return fmt.Errorf("snapshot engine -> restore: %+v", err) } - engine.options.SetValue(ctx, key, value) - engine.options.KeyUnlock(key) } log.Println("successfully restored latest snapshot") diff --git a/src/utils/types.go b/src/utils/types.go index 71c464e..2b42cbe 100644 --- a/src/utils/types.go +++ b/src/utils/types.go @@ -14,7 +14,7 @@ type Server interface { KeyExists(key string) bool CreateKeyAndLock(ctx context.Context, key string) (bool, error) GetValue(key string) interface{} - SetValue(ctx context.Context, key string, value interface{}) + SetValue(ctx context.Context, key string, value interface{}) error SetKeyExpiry(key string, expire time.Time, touch bool) RemoveKeyExpiry(key string) GetState() map[string]interface{} diff --git a/src/utils/utils.go b/src/utils/utils.go index 906794c..28ff13c 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -9,6 +9,7 @@ import ( "log" "math/big" "net" + "runtime" "slices" "strconv" "strings" @@ -160,3 +161,22 @@ func ParseMemory(memory string) (uint64, error) { return uint64(bytesInt), nil } + +// IsMaxMemoryExceeded checks whether we have exceeded the current maximum memory limit +func IsMaxMemoryExceeded(config Config) bool { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + // If we're currently using less than the configured max memory, return false + if memStats.HeapInuse < config.MaxMemory { + return false + } + + // If we're currently using more than max memory, force a garbage collection before we start deleting keys. + // This measure is to prevent deleting keys that may be important when some memory can be reclaimed + // by just collecting garbage. + runtime.GC() + + // Return true when whe are above or equal to max memory. + return memStats.HeapInuse >= config.MaxMemory +}