mirror of
https://github.com/EchoVault/SugarDB.git
synced 2025-10-21 15:09:30 +08:00
Created IsMaxMemoryExceeded utility function that is used to check whether the server has exceeded the max memory.
Check whether max memory is exceeded before setting value or creating new key. Return error from SetValue function when max memory is exceeded. This change has been propagated to the aof and snapshot engines as well. Snapshot engine now only accepts SetValue option. No longer does it neet to use CreateKeyAndLock & KeyUnlock functions itself.
This commit is contained in:
@@ -26,7 +26,7 @@ type Engine struct {
|
||||
startRewrite func()
|
||||
finishRewrite func()
|
||||
getState func() map[string]interface{}
|
||||
setValue func(key string, value interface{})
|
||||
setValue func(key string, value interface{}) error
|
||||
handleCommand func(command []byte)
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) {
|
||||
}
|
||||
}
|
||||
|
||||
func WithSetValueFunc(f func(key string, value interface{})) func(engine *Engine) {
|
||||
func WithSetValueFunc(f func(key string, value interface{}) error) func(engine *Engine) {
|
||||
return func(engine *Engine) {
|
||||
engine.setValue = f
|
||||
}
|
||||
@@ -94,7 +94,10 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {
|
||||
startRewrite: func() {},
|
||||
finishRewrite: func() {},
|
||||
getState: func() map[string]interface{} { return nil },
|
||||
setValue: func(key string, value interface{}) {},
|
||||
setValue: func(key string, value interface{}) error {
|
||||
// No-Op by default
|
||||
return nil
|
||||
},
|
||||
handleCommand: func(command []byte) {},
|
||||
}
|
||||
|
||||
|
@@ -22,7 +22,7 @@ type PreambleStore struct {
|
||||
mut sync.Mutex
|
||||
directory string
|
||||
getState func() map[string]interface{}
|
||||
setValue func(key string, value interface{})
|
||||
setValue func(key string, value interface{}) error
|
||||
}
|
||||
|
||||
func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
|
||||
@@ -37,7 +37,7 @@ func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore
|
||||
}
|
||||
}
|
||||
|
||||
func WithSetValueFunc(f func(key string, value interface{})) func(store *PreambleStore) {
|
||||
func WithSetValueFunc(f func(key string, value interface{}) error) func(store *PreambleStore) {
|
||||
return func(store *PreambleStore) {
|
||||
store.setValue = f
|
||||
}
|
||||
@@ -58,8 +58,9 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
|
||||
// No-Op by default
|
||||
return nil
|
||||
},
|
||||
setValue: func(key string, value interface{}) {
|
||||
setValue: func(key string, value interface{}) error {
|
||||
// No-Op by default
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -71,7 +72,7 @@ func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
|
||||
if store.rw == nil {
|
||||
err := os.MkdirAll(path.Join(store.directory, "aof"), os.ModePerm)
|
||||
if err != nil {
|
||||
log.Println(fmt.Errorf("new preamle store -> mkdir error: %+v", err))
|
||||
log.Println(fmt.Errorf("new preamble store -> mkdir error: %+v", err))
|
||||
}
|
||||
f, err := os.OpenFile(path.Join(store.directory, "aof", "preamble.bin"), os.O_RDWR|os.O_CREATE, os.ModePerm)
|
||||
if err != nil {
|
||||
@@ -136,7 +137,9 @@ func (store *PreambleStore) Restore() error {
|
||||
}
|
||||
|
||||
for key, value := range state {
|
||||
store.setValue(key, value)
|
||||
if err = store.setValue(key, value); err != nil {
|
||||
return fmt.Errorf("preamble store -> restore: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/echovault/echovault/src/utils"
|
||||
"slices"
|
||||
"strings"
|
||||
@@ -60,6 +61,10 @@ func (server *Server) KeyExists(key string) bool {
|
||||
// CreateKeyAndLock creates a new key lock and immediately locks it if the key does not exist.
|
||||
// If the key exists, the existing key is locked.
|
||||
func (server *Server) CreateKeyAndLock(ctx context.Context, key string) (bool, error) {
|
||||
if utils.IsMaxMemoryExceeded() && server.Config.EvictionPolicy == utils.NoEviction {
|
||||
return false, errors.New("max memory reached, key not created")
|
||||
}
|
||||
|
||||
server.keyCreationLock.Lock()
|
||||
defer server.keyCreationLock.Unlock()
|
||||
|
||||
@@ -85,8 +90,10 @@ func (server *Server) GetValue(key string) interface{} {
|
||||
// in the snapshot engine.
|
||||
// This count triggers a snapshot when the threshold is reached.
|
||||
// The key must be locked prior to calling this function.
|
||||
func (server *Server) SetValue(_ context.Context, key string, value interface{}) {
|
||||
// TODO: If max-memory is exceeded and eviction policy is noeviction, do not store the new value
|
||||
func (server *Server) SetValue(_ context.Context, key string, value interface{}) error {
|
||||
if utils.IsMaxMemoryExceeded() && server.Config.EvictionPolicy == utils.NoEviction {
|
||||
return errors.New("max memory reached, key value not set")
|
||||
}
|
||||
|
||||
server.store[key] = value
|
||||
|
||||
@@ -95,6 +102,8 @@ func (server *Server) SetValue(_ context.Context, key string, value interface{})
|
||||
if !server.IsInCluster() {
|
||||
server.SnapshotEngine.IncrementChangeCount()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// The SetKeyExpiry receiver function sets the expiry time of a key.
|
||||
|
@@ -95,9 +95,16 @@ func NewServer(opts Opts) *Server {
|
||||
GetState: server.GetState,
|
||||
SetLatestSnapshotMilliseconds: server.SetLatestSnapshot,
|
||||
GetLatestSnapshotMilliseconds: server.GetLatestSnapshot,
|
||||
CreateKeyAndLock: server.CreateKeyAndLock,
|
||||
KeyUnlock: server.KeyUnlock,
|
||||
SetValue: server.SetValue,
|
||||
SetValue: func(key string, value interface{}) error {
|
||||
if _, err := server.CreateKeyAndLock(context.Background(), key); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := server.SetValue(context.Background(), key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
server.KeyUnlock(key)
|
||||
return nil
|
||||
},
|
||||
})
|
||||
// Set up standalone AOF engine
|
||||
server.AOFEngine = aof.NewAOFEngine(
|
||||
@@ -106,13 +113,15 @@ func NewServer(opts Opts) *Server {
|
||||
aof.WithStartRewriteFunc(server.StartRewriteAOF),
|
||||
aof.WithFinishRewriteFunc(server.FinishRewriteAOF),
|
||||
aof.WithGetStateFunc(server.GetState),
|
||||
aof.WithSetValueFunc(func(key string, value interface{}) {
|
||||
aof.WithSetValueFunc(func(key string, value interface{}) error {
|
||||
if _, err := server.CreateKeyAndLock(context.Background(), key); err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
if err := server.SetValue(context.Background(), key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
server.SetValue(context.Background(), key, value)
|
||||
server.KeyUnlock(key)
|
||||
return nil
|
||||
}),
|
||||
aof.WithHandleCommandFunc(func(command []byte) {
|
||||
_, err := server.handleCommand(context.Background(), command, nil, true)
|
||||
|
@@ -30,9 +30,7 @@ type Opts struct {
|
||||
GetState func() map[string]interface{}
|
||||
SetLatestSnapshotMilliseconds func(msec int64)
|
||||
GetLatestSnapshotMilliseconds func() int64
|
||||
CreateKeyAndLock func(ctx context.Context, key string) (bool, error)
|
||||
KeyUnlock func(key string)
|
||||
SetValue func(ctx context.Context, key string, value interface{})
|
||||
SetValue func(key string, value interface{}) error
|
||||
}
|
||||
|
||||
type Engine struct {
|
||||
@@ -264,11 +262,9 @@ func (engine *Engine) Restore(ctx context.Context) error {
|
||||
engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds)
|
||||
|
||||
for key, value := range snapshotObject.State {
|
||||
if _, err = engine.options.CreateKeyAndLock(ctx, key); err != nil {
|
||||
log.Println(fmt.Errorf("could not load value at key %s with error: %s", key, err.Error()))
|
||||
if err = engine.options.SetValue(key, value); err != nil {
|
||||
return fmt.Errorf("snapshot engine -> restore: %+v", err)
|
||||
}
|
||||
engine.options.SetValue(ctx, key, value)
|
||||
engine.options.KeyUnlock(key)
|
||||
}
|
||||
|
||||
log.Println("successfully restored latest snapshot")
|
||||
|
@@ -14,7 +14,7 @@ type Server interface {
|
||||
KeyExists(key string) bool
|
||||
CreateKeyAndLock(ctx context.Context, key string) (bool, error)
|
||||
GetValue(key string) interface{}
|
||||
SetValue(ctx context.Context, key string, value interface{})
|
||||
SetValue(ctx context.Context, key string, value interface{}) error
|
||||
SetKeyExpiry(key string, expire time.Time, touch bool)
|
||||
RemoveKeyExpiry(key string)
|
||||
GetState() map[string]interface{}
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"log"
|
||||
"math/big"
|
||||
"net"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -160,3 +161,22 @@ func ParseMemory(memory string) (uint64, error) {
|
||||
|
||||
return uint64(bytesInt), nil
|
||||
}
|
||||
|
||||
// IsMaxMemoryExceeded checks whether we have exceeded the current maximum memory limit
|
||||
func IsMaxMemoryExceeded(config Config) bool {
|
||||
var memStats runtime.MemStats
|
||||
runtime.ReadMemStats(&memStats)
|
||||
|
||||
// If we're currently using less than the configured max memory, return false
|
||||
if memStats.HeapInuse < config.MaxMemory {
|
||||
return false
|
||||
}
|
||||
|
||||
// If we're currently using more than max memory, force a garbage collection before we start deleting keys.
|
||||
// This measure is to prevent deleting keys that may be important when some memory can be reclaimed
|
||||
// by just collecting garbage.
|
||||
runtime.GC()
|
||||
|
||||
// Return true when whe are above or equal to max memory.
|
||||
return memStats.HeapInuse >= config.MaxMemory
|
||||
}
|
||||
|
Reference in New Issue
Block a user