Added support for multiple databases on the evictKeysWithExpiredTTL function

This commit is contained in:
Kelvin Mwinuka
2024-06-20 20:58:49 +08:00
parent 33d7eb2f6e
commit 663c097be8
6 changed files with 121 additions and 107 deletions

View File

@@ -24,6 +24,7 @@ import (
"github.com/echovault/echovault/internal/aof"
"github.com/echovault/echovault/internal/clock"
"github.com/echovault/echovault/internal/config"
"github.com/echovault/echovault/internal/constants"
"github.com/echovault/echovault/internal/eviction"
"github.com/echovault/echovault/internal/memberlist"
"github.com/echovault/echovault/internal/modules/acl"
@@ -89,14 +90,14 @@ type EchoVault struct {
// Mutex as only one goroutine can edit the LFU cache at a time.
mutex sync.Mutex
// LFU cache for each database represented by a min heap.
cache map[int]eviction.CacheLFU
cache map[int]*eviction.CacheLFU
}
// LRU cache used when eviction policy is allkeys-lru or volatile-lru.
lruCache struct {
// Mutex as only one goroutine can edit the LRU at a time.
mutex sync.Mutex
// LRU cache represented by a max heap.
cache map[int]eviction.CacheLRU
cache map[int]*eviction.CacheLRU
}
// Holds the list of all commands supported by the echovault.
@@ -298,24 +299,34 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
}
// If eviction policy is not noeviction, start a goroutine to evict keys every 100 milliseconds.
//if echovault.config.EvictionPolicy != constants.NoEviction {
// go func() {
// ticker := time.NewTicker(echovault.config.EvictionInterval)
// defer func() {
// ticker.Stop()
// }()
// for {
// select {
// case <-ticker.C:
// if err := echovault.evictKeysWithExpiredTTL(context.Background()); err != nil {
// log.Printf("evict with ttl: %v\n", err)
// }
// case <-echovault.stopTTL:
// break
// }
// }
// }()
//}
if echovault.config.EvictionPolicy != constants.NoEviction {
go func() {
ticker := time.NewTicker(echovault.config.EvictionInterval)
defer func() {
ticker.Stop()
}()
for {
select {
case <-ticker.C:
// Run key eviction for each database that has volatile keys.
wg := sync.WaitGroup{}
for database, _ := range echovault.keysWithExpiry.keys {
wg.Add(1)
ctx := context.WithValue(context.Background(), "Database", database)
go func(ctx context.Context, wg *sync.WaitGroup) {
if err := echovault.evictKeysWithExpiredTTL(ctx); err != nil {
log.Printf("evict with ttl: %v\n", err)
}
wg.Done()
}(ctx, &wg)
}
wg.Wait()
case <-echovault.stopTTL:
break
}
}
}()
}
if echovault.config.TLS && len(echovault.config.CertKeyPairs) <= 0 {
return nil, errors.New("must provide certificate and key file paths for TLS mode")
@@ -614,23 +625,23 @@ func (server *EchoVault) ShutDown() {
}
func (server *EchoVault) initialiseCaches() {
// Set up LFU cache
// Set up LFU cache.
server.lfuCache = struct {
mutex sync.Mutex
cache map[int]eviction.CacheLFU
cache map[int]*eviction.CacheLFU
}{
mutex: sync.Mutex{},
cache: make(map[int]eviction.CacheLFU),
cache: make(map[int]*eviction.CacheLFU),
}
// set up LRU cache
// set up LRU cache.
server.lruCache = struct {
mutex sync.Mutex
cache map[int]eviction.CacheLRU
cache map[int]*eviction.CacheLRU
}{
mutex: sync.Mutex{},
cache: make(map[int]eviction.CacheLRU),
cache: make(map[int]*eviction.CacheLRU),
}
// Initialise caches for each pre-loaded database.
// Initialise caches for each preloaded database.
for database, _ := range server.store {
server.lfuCache.cache[database] = eviction.NewCacheLFU()
server.lruCache.cache[database] = eviction.NewCacheLRU()

View File

@@ -17,9 +17,11 @@ package echovault
import (
"context"
"errors"
"fmt"
"github.com/echovault/echovault/internal"
"github.com/echovault/echovault/internal/constants"
"log"
"math/rand"
"slices"
"time"
)
@@ -157,8 +159,8 @@ func (server *EchoVault) setExpiry(ctx context.Context, key string, expireAt tim
// If the slice of keys associated with expiry time does not contain the current key, add the key.
server.keysWithExpiry.rwMutex.Lock()
if !slices.Contains(server.keysWithExpiry.keys, key) {
server.keysWithExpiry.keys = append(server.keysWithExpiry.keys, key)
if !slices.Contains(server.keysWithExpiry.keys[database], key) {
server.keysWithExpiry.keys[database] = append(server.keysWithExpiry.keys[database], key)
}
server.keysWithExpiry.rwMutex.Unlock()
@@ -183,17 +185,16 @@ func (server *EchoVault) deleteKey(ctx context.Context, key string) error {
// Remove key from slice of keys associated with expiry.
server.keysWithExpiry.rwMutex.Lock()
defer server.keysWithExpiry.rwMutex.Unlock()
server.keysWithExpiry.keys = slices.DeleteFunc(server.keysWithExpiry.keys, func(k string) bool {
server.keysWithExpiry.keys[database] = slices.DeleteFunc(server.keysWithExpiry.keys[database], func(k string) bool {
return k == key
})
// TODO: Update the cache for the specific database.
// Remove the key from the cache.
// Remove the key from the cache associated with the database.
switch {
case slices.Contains([]string{constants.AllKeysLFU, constants.VolatileLFU}, server.config.EvictionPolicy):
server.lfuCache.cache.Delete(key)
server.lfuCache.cache[database].Delete(key)
case slices.Contains([]string{constants.AllKeysLRU, constants.VolatileLRU}, server.config.EvictionPolicy):
server.lruCache.cache.Delete(key)
server.lruCache.cache[database].Delete(key)
}
log.Printf("deleted key %s\n", key)
@@ -426,70 +427,72 @@ func (server *EchoVault) getState() map[int]map[string]interface{} {
// This function will sample 20 keys from the list of keys with an associated TTL,
// if the key is expired, it will be evicted.
// This function is only executed in standalone mode or by the raft cluster leader.
//func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error {
// // Only execute this if we're in standalone mode, or raft cluster leader.
// if server.isInCluster() && !server.raft.IsRaftLeader() {
// return nil
// }
//
// server.keysWithExpiry.rwMutex.RLock()
//
// // Sample size should be the configured sample size, or the size of the keys with expiry,
// // whichever one is smaller.
// sampleSize := int(server.config.EvictionSample)
// if len(server.keysWithExpiry.keys) < sampleSize {
// sampleSize = len(server.keysWithExpiry.keys)
// }
// keys := make([]string, sampleSize)
//
// deletedCount := 0
// thresholdPercentage := 20
//
// var idx int
// var key string
// for i := 0; i < len(keys); i++ {
// for {
// // Retry retrieval of a random key until we find a key that is not already in the list of sampled keys.
// idx = rand.Intn(len(server.keysWithExpiry.keys))
// key = server.keysWithExpiry.keys[idx]
// if !slices.Contains(keys, key) {
// keys[i] = key
// break
// }
// }
// }
// server.keysWithExpiry.rwMutex.RUnlock()
//
// // Loop through the keys and delete them if they're expired
// server.storeLock.Lock()
// defer server.storeLock.Unlock()
// for _, k := range keys {
// // Delete the expired key
// deletedCount += 1
// if !server.isInCluster() {
// if err := server.deleteKey(k); err != nil {
// return fmt.Errorf("evictKeysWithExpiredTTL -> standalone delete: %+v", err)
// }
// } else if server.isInCluster() && server.raft.IsRaftLeader() {
// if err := server.raftApplyDeleteKey(ctx, k); err != nil {
// return fmt.Errorf("evictKeysWithExpiredTTL -> cluster delete: %+v", err)
// }
// }
// }
//
// // If sampleSize is 0, there's no need to calculate deleted percentage.
// if sampleSize == 0 {
// return nil
// }
//
// log.Printf("%d keys sampled, %d keys deleted\n", sampleSize, deletedCount)
//
// // If the deleted percentage is over 20% of the sample size, execute the function again immediately.
// if (deletedCount/sampleSize)*100 >= thresholdPercentage {
// log.Printf("deletion ratio (%d percent) reached threshold (%d percent), sampling again\n",
// (deletedCount/sampleSize)*100, thresholdPercentage)
// return server.evictKeysWithExpiredTTL(ctx)
// }
//
// return nil
//}
func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error {
// Only execute this if we're in standalone mode, or raft cluster leader.
if server.isInCluster() && !server.raft.IsRaftLeader() {
return nil
}
server.keysWithExpiry.rwMutex.RLock()
database := ctx.Value("Database").(int)
// Sample size should be the configured sample size, or the size of the keys with expiry,
// whichever one is smaller.
sampleSize := int(server.config.EvictionSample)
if len(server.keysWithExpiry.keys[database]) < sampleSize {
sampleSize = len(server.keysWithExpiry.keys)
}
keys := make([]string, sampleSize)
deletedCount := 0
thresholdPercentage := 20
var idx int
var key string
for i := 0; i < len(keys); i++ {
for {
// Retry retrieval of a random key until we find a key that is not already in the list of sampled keys.
idx = rand.Intn(len(server.keysWithExpiry.keys))
key = server.keysWithExpiry.keys[database][idx]
if !slices.Contains(keys, key) {
keys[i] = key
break
}
}
}
server.keysWithExpiry.rwMutex.RUnlock()
// Loop through the keys and delete them if they're expired
server.storeLock.Lock()
defer server.storeLock.Unlock()
for _, k := range keys {
// Delete the expired key
deletedCount += 1
if !server.isInCluster() {
if err := server.deleteKey(ctx, k); err != nil {
return fmt.Errorf("evictKeysWithExpiredTTL -> standalone delete: %+v", err)
}
} else if server.isInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, k); err != nil {
return fmt.Errorf("evictKeysWithExpiredTTL -> cluster delete: %+v", err)
}
}
}
// If sampleSize is 0, there's no need to calculate deleted percentage.
if sampleSize == 0 {
return nil
}
log.Printf("%d keys sampled, %d keys deleted\n", sampleSize, deletedCount)
// If the deleted percentage is over 20% of the sample size, execute the function again immediately.
if (deletedCount/sampleSize)*100 >= thresholdPercentage {
log.Printf("deletion ratio (%d percent) reached threshold (%d percent), sampling again\n",
(deletedCount/sampleSize)*100, thresholdPercentage)
return server.evictKeysWithExpiredTTL(ctx)
}
return nil
}

View File

@@ -32,13 +32,13 @@ type CacheLFU struct {
entries []*EntryLFU
}
func NewCacheLFU() CacheLFU {
func NewCacheLFU() *CacheLFU {
cache := CacheLFU{
keys: make(map[string]bool),
entries: make([]*EntryLFU, 0),
}
heap.Init(&cache)
return cache
return &cache
}
func (cache *CacheLFU) Len() int {

View File

@@ -57,7 +57,7 @@ func Test_CacheLFU(t *testing.T) {
mut.Lock()
for i := 0; i < len(expectedKeys); i++ {
key := heap.Pop(&cache).(string)
key := heap.Pop(cache).(string)
if key != expectedKeys[i] {
t.Errorf("expected popped key at index %d to be %s, got %s", i, expectedKeys[i], key)
}

View File

@@ -31,13 +31,13 @@ type CacheLRU struct {
entries []*EntryLRU
}
func NewCacheLRU() CacheLRU {
func NewCacheLRU() *CacheLRU {
cache := CacheLRU{
keys: make(map[string]bool),
entries: make([]*EntryLRU, 0),
}
heap.Init(&cache)
return cache
return &cache
}
func (cache *CacheLRU) Len() int {

View File

@@ -40,7 +40,7 @@ func Test_CacheLRU(t *testing.T) {
ticker.Stop()
for i := len(access) - 1; i >= 0; i-- {
key := heap.Pop(&cache).(string)
key := heap.Pop(cache).(string)
if key != access[i] {
t.Errorf("expected key at index %d to be %s, got %s", i, access[i], key)
}