Updated Embedded ACL test and enabled memory adjustment after key update

This commit is contained in:
Kelvin Mwinuka
2024-07-01 08:55:37 +08:00
parent d787c6cc31
commit 5a04bcade0
3 changed files with 941 additions and 849 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -418,8 +418,6 @@ func TestEchoVault_ACLConfig(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
// Create new server instance
conf := DefaultConfig()
conf.DataDir = ""

View File

@@ -15,6 +15,7 @@
package echovault
import (
"container/heap"
"context"
"errors"
"fmt"
@@ -23,8 +24,10 @@ import (
"github.com/echovault/echovault/internal/eviction"
"log"
"math/rand"
"runtime"
"slices"
"strings"
"sync"
"time"
)
@@ -363,174 +366,200 @@ func (server *EchoVault) updateKeysInCache(ctx context.Context, keys []string) e
}
}
// TODO: Adjust memory by taking all databases into account (largest database?).
//if err := server.adjustMemoryUsage(ctx); err != nil {
// return fmt.Errorf("updateKeysInCache: %+v", err)
//}
server.storeLock.Lock()
defer server.storeLock.Unlock()
wg := sync.WaitGroup{}
errChan := make(chan error)
doneChan := make(chan struct{})
for db, _ := range server.store {
wg.Add(1)
ctx := context.WithValue(ctx, "Database", db)
go func(ctx context.Context, database int, wg *sync.WaitGroup, errChan *chan error) {
if err := server.adjustMemoryUsage(ctx); err != nil {
*errChan <- fmt.Errorf("adjustMemoryUsade database %d", database)
}
wg.Done()
}(ctx, db, &wg, &errChan)
}
go func() {
wg.Wait()
doneChan <- struct{}{}
}()
select {
case err := <-errChan:
return fmt.Errorf("adjustMemoryUsage error: %+v", err)
case <-doneChan:
}
return nil
}
// TODO: Implement support for multiple databases.
// adjustMemoryUsage should only be called from standalone echovault or from raft cluster leader.
//func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
// // If max memory is 0, there's no need to adjust memory usage.
// if server.config.MaxMemory == 0 {
// return nil
// }
// // Check if memory usage is above max-memory.
// // If it is, pop items from the cache until we get under the limit.
// var memStats runtime.MemStats
// runtime.ReadMemStats(&memStats)
// // If we're using less memory than the max-memory, there's no need to evict.
// if memStats.HeapInuse < server.config.MaxMemory {
// return nil
// }
// // Force a garbage collection first before we start evicting keys.
// runtime.GC()
// runtime.ReadMemStats(&memStats)
// if memStats.HeapInuse < server.config.MaxMemory {
// return nil
// }
// // We've done a GC, but we're still at or above the max memory limit.
// // Start a loop that evicts keys until either the heap is empty or
// // we're below the max memory limit.
// server.storeLock.Lock()
// defer server.storeLock.Unlock()
// switch {
// case slices.Contains([]string{constants.AllKeysLFU, constants.VolatileLFU}, strings.ToLower(server.config.EvictionPolicy)):
// // Remove keys from LFU cache until we're below the max memory limit or
// // until the LFU cache is empty.
// server.lfuCache.mutex.Lock()
// defer server.lfuCache.mutex.Unlock()
// for {
// // Return if cache is empty
// if server.lfuCache.cache.Len() == 0 {
// return fmt.Errorf("adjustMemoryUsage -> LFU cache empty")
// }
//
// key := heap.Pop(&server.lfuCache.cache).(string)
// if !server.isInCluster() {
// // If in standalone mode, directly delete the key
// if err := server.deleteKey(key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
// }
// } else if server.isInCluster() && server.raft.IsRaftLeader() {
// // If in raft cluster, send command to delete key from cluster
// if err := server.raftApplyDeleteKey(ctx, key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
// }
// }
//
// // Run garbage collection
// runtime.GC()
// // Return if we're below max memory
// runtime.ReadMemStats(&memStats)
// if memStats.HeapInuse < server.config.MaxMemory {
// return nil
// }
// }
// case slices.Contains([]string{constants.AllKeysLRU, constants.VolatileLRU}, strings.ToLower(server.config.EvictionPolicy)):
// // Remove keys from th LRU cache until we're below the max memory limit or
// // until the LRU cache is empty.
// server.lruCache.mutex.Lock()
// defer server.lruCache.mutex.Unlock()
// for {
// // Return if cache is empty
// if server.lruCache.cache.Len() == 0 {
// return fmt.Errorf("adjsutMemoryUsage -> LRU cache empty")
// }
//
// key := heap.Pop(&server.lruCache.cache).(string)
// if !server.isInCluster() {
// // If in standalone mode, directly delete the key.
// if err := server.deleteKey(key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
// }
// } else if server.isInCluster() && server.raft.IsRaftLeader() {
// // If in cluster mode and the node is a cluster leader,
// // send command to delete the key from the cluster.
// if err := server.raftApplyDeleteKey(ctx, key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
// }
// }
//
// // Run garbage collection
// runtime.GC()
// // Return if we're below max memory
// runtime.ReadMemStats(&memStats)
// if memStats.HeapInuse < server.config.MaxMemory {
// return nil
// }
// }
// case slices.Contains([]string{constants.AllKeysRandom}, strings.ToLower(server.config.EvictionPolicy)):
// // Remove random keys until we're below the max memory limit
// // or there are no more keys remaining.
// for {
// server.storeLock.Lock()
// // If there are no keys, return error
// if len(server.store) == 0 {
// err := errors.New("no keys to evict")
// server.storeLock.Unlock()
// return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
// }
// // Get random key
// idx := rand.Intn(len(server.store))
// for key, _ := range server.store {
// if idx == 0 {
// if !server.isInCluster() {
// // If in standalone mode, directly delete the key
// if err := server.deleteKey(key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
// }
// } else if server.isInCluster() && server.raft.IsRaftLeader() {
// if err := server.raftApplyDeleteKey(ctx, key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
// }
// }
// // Run garbage collection
// runtime.GC()
// // Return if we're below max memory
// runtime.ReadMemStats(&memStats)
// if memStats.HeapInuse < server.config.MaxMemory {
// return nil
// }
// }
// idx--
// }
// }
// case slices.Contains([]string{constants.VolatileRandom}, strings.ToLower(server.config.EvictionPolicy)):
// // Remove random keys with an associated expiry time until we're below the max memory limit
// // or there are no more keys with expiry time.
// for {
// // Get random volatile key
// server.keysWithExpiry.rwMutex.RLock()
// idx := rand.Intn(len(server.keysWithExpiry.keys))
// key := server.keysWithExpiry.keys[idx]
// server.keysWithExpiry.rwMutex.RUnlock()
//
// if !server.isInCluster() {
// // If in standalone mode, directly delete the key
// if err := server.deleteKey(key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
// }
// } else if server.isInCluster() && server.raft.IsRaftLeader() {
// if err := server.raftApplyDeleteKey(ctx, key); err != nil {
// return fmt.Errorf("adjustMemoryUsage -> volatile keys randome: %+v", err)
// }
// }
//
// // Run garbage collection
// runtime.GC()
// // Return if we're below max memory
// runtime.ReadMemStats(&memStats)
// if memStats.HeapInuse < server.config.MaxMemory {
// return nil
// }
// }
// default:
// return nil
// }
// }
func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
// If max memory is 0, there's no need to adjust memory usage.
if server.config.MaxMemory == 0 {
return nil
}
database := ctx.Value("Database").(int)
// Check if memory usage is above max-memory.
// If it is, pop items from the cache until we get under the limit.
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
// If we're using less memory than the max-memory, there's no need to evict.
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
// Force a garbage collection first before we start evicting keys.
runtime.GC()
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
// We've done a GC, but we're still at or above the max memory limit.
// Start a loop that evicts keys until either the heap is empty or
// we're below the max memory limit.
switch {
case slices.Contains([]string{constants.AllKeysLFU, constants.VolatileLFU}, strings.ToLower(server.config.EvictionPolicy)):
// Remove keys from LFU cache until we're below the max memory limit or
// until the LFU cache is empty.
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
for {
// Return if cache is empty
if server.lfuCache.cache[database].Len() == 0 {
return fmt.Errorf("adjustMemoryUsage -> LFU cache empty")
}
key := heap.Pop(server.lfuCache.cache[database]).(string)
if !server.isInCluster() {
// If in standalone mode, directly delete the key
if err := server.deleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
}
} else if server.isInCluster() && server.raft.IsRaftLeader() {
// If in raft cluster, send command to delete key from cluster
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
}
}
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
case slices.Contains([]string{constants.AllKeysLRU, constants.VolatileLRU}, strings.ToLower(server.config.EvictionPolicy)):
// Remove keys from th LRU cache until we're below the max memory limit or
// until the LRU cache is empty.
server.lruCache.mutex.Lock()
defer server.lruCache.mutex.Unlock()
for {
// Return if cache is empty
if server.lruCache.cache[database].Len() == 0 {
return fmt.Errorf("adjustMemoryUsage -> LRU cache empty")
}
key := heap.Pop(server.lruCache.cache[database]).(string)
if !server.isInCluster() {
// If in standalone mode, directly delete the key.
if err := server.deleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
}
} else if server.isInCluster() && server.raft.IsRaftLeader() {
// If in cluster mode and the node is a cluster leader,
// send command to delete the key from the cluster.
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
}
}
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
case slices.Contains([]string{constants.AllKeysRandom}, strings.ToLower(server.config.EvictionPolicy)):
// Remove random keys until we're below the max memory limit
// or there are no more keys remaining.
for {
// If there are no keys, return error
if len(server.store) == 0 {
err := errors.New("no keys to evict")
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
// Get random key in the database
idx := rand.Intn(len(server.store))
for db, data := range server.store {
if db == database {
for key, _ := range data {
if idx == 0 {
if !server.isInCluster() {
// If in standalone mode, directly delete the key
if err := server.deleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
} else if server.isInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
}
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
idx--
}
}
}
}
case slices.Contains([]string{constants.VolatileRandom}, strings.ToLower(server.config.EvictionPolicy)):
// Remove random keys with an associated expiry time until we're below the max memory limit
// or there are no more keys with expiry time.
for {
// Get random volatile key
server.keysWithExpiry.rwMutex.RLock()
idx := rand.Intn(len(server.keysWithExpiry.keys))
key := server.keysWithExpiry.keys[database][idx]
server.keysWithExpiry.rwMutex.RUnlock()
if !server.isInCluster() {
// If in standalone mode, directly delete the key
if err := server.deleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
}
} else if server.isInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys randome: %+v", err)
}
}
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
default:
return nil
}
}
// evictKeysWithExpiredTTL is a function that samples keys with an associated TTL
// and evicts keys that are currently expired.