Added mutexes for lfu and lru caches on server struct

This commit is contained in:
Kelvin Mwinuka
2024-03-08 02:21:26 +08:00
parent 82be1f6068
commit 0b9fbb0132
4 changed files with 159 additions and 130 deletions

View File

@@ -18,12 +18,12 @@ type CacheLFU struct {
entries []*EntryLFU
}
func NewCacheLFU() *CacheLFU {
cache := &CacheLFU{
func NewCacheLFU() CacheLFU {
cache := CacheLFU{
keys: make(map[string]bool),
entries: make([]*EntryLFU, 0),
}
heap.Init(cache)
heap.Init(&cache)
return cache
}

View File

@@ -17,12 +17,12 @@ type CacheLRU struct {
entries []*EntryLRU
}
func NewCacheLRU() *CacheLRU {
cache := &CacheLRU{
func NewCacheLRU() CacheLRU {
cache := CacheLRU{
keys: make(map[string]bool),
entries: make([]*EntryLRU, 0),
}
heap.Init(cache)
heap.Init(&cache)
return cache
}

View File

@@ -144,9 +144,9 @@ func (server *Server) RemoveKeyExpiry(key string) {
server.keyExpiry[key] = time.Time{}
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy):
server.lfuCache.Delete(key)
server.lfuCache.cache.Delete(key)
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.Config.EvictionPolicy):
server.lruCache.Delete(key)
server.lruCache.cache.Delete(key)
}
}
@@ -179,16 +179,24 @@ func (server *Server) updateKeyInCache(ctx context.Context, key string) error {
}
switch strings.ToLower(server.Config.EvictionPolicy) {
case utils.AllKeysLFU:
server.lfuCache.Update(key)
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
server.lfuCache.cache.Update(key)
case utils.AllKeysLRU:
server.lruCache.Update(key)
server.lruCache.mutex.Lock()
defer server.lruCache.mutex.Unlock()
server.lruCache.cache.Update(key)
case utils.VolatileLFU:
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
if _, ok := server.keyExpiry[key]; ok {
server.lfuCache.Update(key)
server.lfuCache.cache.Update(key)
}
case utils.VolatileLRU:
server.lruCache.mutex.Lock()
defer server.lruCache.mutex.Unlock()
if _, ok := server.keyExpiry[key]; ok {
server.lruCache.Update(key)
server.lruCache.cache.Update(key)
}
}
if err := server.adjustMemoryUsage(ctx); err != nil {
@@ -219,122 +227,124 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
// We've done a GC, but we're still at or above the max memory limit.
// Start a loop that evicts keys until either the heap is empty or
// we're below the max memory limit.
for {
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove keys from LFU cache until we're below the max memory limit or
// until the LFU cache is empty.
for {
// Return if cache is empty
if server.lfuCache.Len() == 0 {
return fmt.Errorf("adjsutMemoryUsage -> LFU cache empty")
}
key := server.lfuCache.Pop().(string)
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
}
// Delete the keys
delete(server.store, key)
delete(server.keyExpiry, key)
delete(server.keyLocks, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove keys from LFU cache until we're below the max memory limit or
// until the LFU cache is empty.
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
for {
// Return if cache is empty
if server.lfuCache.cache.Len() == 0 {
return fmt.Errorf("adjsutMemoryUsage -> LFU cache empty")
}
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove keys from th LRU cache until we're below the max memory limit or
// until the LRU cache is empty.
for {
// Return if cache is empty
if server.lruCache.Len() == 0 {
return fmt.Errorf("adjsutMemoryUsage -> LRU cache empty")
}
key := server.lruCache.Pop().(string)
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
}
// Delete the keys
delete(server.store, key)
delete(server.keyExpiry, key)
delete(server.keyLocks, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
key := server.lfuCache.cache.Pop().(string)
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
}
case slices.Contains([]string{utils.AllKeysRandom}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove random keys until we're below the max memory limit
// or there are no more keys remaining.
for {
// If there are no keys, return error
if len(server.keyLocks) == 0 {
err := errors.New("no keys to evict")
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
// Get random key
idx := rand.Intn(len(server.keyLocks))
for key, _ := range server.keyLocks {
if idx == 0 {
// Lock the key
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
// Delete the key
delete(server.keyLocks, key)
delete(server.store, key)
delete(server.keyExpiry, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
}
idx--
}
// Delete the keys
delete(server.store, key)
delete(server.keyExpiry, key)
delete(server.keyLocks, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove random keys with expiry time until we're below the max memory limit
// or there are no more keys with expiry time.
for {
// If there are no volatile keys, return error
if len(server.keyExpiry) == 0 {
err := errors.New("no volatile keys to evict")
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
}
// Get random volatile key
idx := rand.Intn(len(server.keyExpiry))
for key, _ := range server.keyExpiry {
if idx == 0 {
// Lock the key
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
}
// Delete the key
delete(server.keyLocks, key)
delete(server.store, key)
delete(server.keyExpiry, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
}
idx--
}
}
default:
return nil
}
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove keys from th LRU cache until we're below the max memory limit or
// until the LRU cache is empty.
server.lruCache.mutex.Lock()
defer server.lruCache.mutex.Unlock()
for {
// Return if cache is empty
if server.lruCache.cache.Len() == 0 {
return fmt.Errorf("adjsutMemoryUsage -> LRU cache empty")
}
key := server.lruCache.cache.Pop().(string)
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
}
// Delete the keys
delete(server.store, key)
delete(server.keyExpiry, key)
delete(server.keyLocks, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
}
case slices.Contains([]string{utils.AllKeysRandom}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove random keys until we're below the max memory limit
// or there are no more keys remaining.
for {
// If there are no keys, return error
if len(server.keyLocks) == 0 {
err := errors.New("no keys to evict")
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
// Get random key
idx := rand.Intn(len(server.keyLocks))
for key, _ := range server.keyLocks {
if idx == 0 {
// Lock the key
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
// Delete the key
delete(server.keyLocks, key)
delete(server.store, key)
delete(server.keyExpiry, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
}
idx--
}
}
case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove random keys with expiry time until we're below the max memory limit
// or there are no more keys with expiry time.
for {
// If there are no volatile keys, return error
if len(server.keyExpiry) == 0 {
err := errors.New("no volatile keys to evict")
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
}
// Get random volatile key
idx := rand.Intn(len(server.keyExpiry))
for key, _ := range server.keyExpiry {
if idx == 0 {
// Lock the key
if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
}
// Delete the key
delete(server.keyLocks, key)
delete(server.store, key)
delete(server.keyExpiry, key)
// Run garbage collection
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
return nil
}
}
idx--
}
}
default:
return nil
}
}

View File

@@ -30,8 +30,14 @@ type Server struct {
keyLocks map[string]*sync.RWMutex
keyCreationLock *sync.Mutex
keyExpiry map[string]time.Time
lfuCache *eviction.CacheLFU
lruCache *eviction.CacheLRU
lfuCache struct {
mutex sync.Mutex
cache eviction.CacheLFU
}
lruCache struct {
mutex sync.Mutex
cache eviction.CacheLRU
}
Commands []utils.Command
@@ -132,9 +138,22 @@ func NewServer(opts Opts) *Server {
)
}
// Set up lfu and lru caches
server.lfuCache = eviction.NewCacheLFU()
server.lruCache = eviction.NewCacheLRU()
// Set up LFU cache
server.lfuCache = struct {
mutex sync.Mutex
cache eviction.CacheLFU
}{
mutex: sync.Mutex{},
cache: eviction.NewCacheLFU(),
}
// set up LRU cache
server.lruCache = struct {
mutex sync.Mutex
cache eviction.CacheLRU
}{
mutex: sync.Mutex{},
cache: eviction.NewCacheLRU(),
}
// TODO: If eviction policy is volatile-ttl, start goroutine that continuously reads the mem stats
// TODO: before triggering purge once max-memory is reached