Implemented eviction algorithm that samples a configurable number of keys with TTLs (default 20) at a configurable interval (default 100ms) and deletes the keys that are expired.

This commit is contained in:
Kelvin Mwinuka
2024-03-12 02:12:55 +08:00
parent 52b39d5b0f
commit f27a0dda79
6 changed files with 121 additions and 17 deletions

View File

@@ -23,6 +23,8 @@ CMD "./server" \
"--snapshot-interval" "${SNAPSHOT_INTERVAL}" \ "--snapshot-interval" "${SNAPSHOT_INTERVAL}" \
"--max-memory" "${MAX_MEMORY}" \ "--max-memory" "${MAX_MEMORY}" \
"--eviction-policy" "${EVICTION_POLICY}" \ "--eviction-policy" "${EVICTION_POLICY}" \
"--eviction-sample" "${EVICTION_SAMPLE}" \
"--eviction-interval" "${EVICTION_INTERVAL}" \
"--tls=${TLS}" \ "--tls=${TLS}" \
"--mtls=${MTLS}" \ "--mtls=${MTLS}" \
"--in-memory=${IN_MEMORY}" \ "--in-memory=${IN_MEMORY}" \

View File

@@ -32,6 +32,8 @@ services:
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
- EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
- CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key
@@ -74,6 +76,8 @@ services:
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
- EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
- CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
@@ -117,6 +121,8 @@ services:
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
- EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
- CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
@@ -160,6 +166,8 @@ services:
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
- EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
- CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
@@ -203,6 +211,8 @@ services:
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
- EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
- CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key
@@ -246,6 +256,8 @@ services:
- AOF_SYNC_STRATEGY=everysec - AOF_SYNC_STRATEGY=everysec
- MAX_MEMORY=2000kb - MAX_MEMORY=2000kb
- EVICTION_POLICY=allkeys-lfu - EVICTION_POLICY=allkeys-lfu
- EVICTION_SAMPLE=20
- EVICTION_INTERVAL=100ms
# List of server cert/key pairs # List of server cert/key pairs
- CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key
- CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key

View File

@@ -121,10 +121,6 @@ func (r *Raft) RaftInit(ctx context.Context) {
} }
r.raft = raftServer r.raft = raftServer
// TODO
// Listen on leadership change channel and initiate key eviction goroutine
// if current node is the leader.
} }
func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture { func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture {

View File

@@ -245,20 +245,20 @@ func (server *Server) GetState() map[string]interface{} {
return data return data
} }
// DeleteKey removes the key from store, keyLocks and keyExpiry maps // DeleteKey removes the key from store, keyLocks and keyExpiry maps.
func (server *Server) DeleteKey(ctx context.Context, key string) error { func (server *Server) DeleteKey(ctx context.Context, key string) error {
if _, err := server.KeyLock(ctx, key); err != nil { if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("deleteKey: %+v", err) return fmt.Errorf("deleteKey error: %+v", err)
} }
// Remove key expiry // Remove key expiry.
server.RemoveExpiry(key) server.RemoveExpiry(key)
// Delete the key from keyLocks and store // Delete the key from keyLocks and store.
delete(server.keyLocks, key) delete(server.keyLocks, key)
delete(server.store, key) delete(server.store, key)
// Remove the key from the cache // Remove the key from the cache.
switch { switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy): case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy):
server.lfuCache.cache.Delete(key) server.lfuCache.cache.Delete(key)
@@ -466,3 +466,86 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
return nil return nil
} }
} }
// evictKeysWithExpiredTTL is a function that samples keys with an associated TTL
// and evicts keys that are currently expired.
// This function will sample 20 keys from the list of keys with an associated TTL,
// if the key is expired, it will be evicted.
// This function is only executed in standalone mode or by the raft cluster leader.
func (server *Server) evictKeysWithExpiredTTL(ctx context.Context) error {
// Only execute this if we're in standalone mode, or raft cluster leader.
if server.IsInCluster() && !server.raft.IsRaftLeader() {
return nil
}
server.keysWithExpiry.rwMutex.RLock()
// Sample size should be the configured sample size, or the size of the keys with expiry,
// whichever one is smaller.
sampleSize := int(server.Config.EvictionSample)
if len(server.keysWithExpiry.keys) < sampleSize {
sampleSize = len(server.keysWithExpiry.keys)
}
keys := make([]string, sampleSize)
deletedCount := 0
thresholdPercentage := 20
var idx int
var key string
for i := 0; i < len(keys); i++ {
for {
// Retry retrieval of a random key until we find a key that is not already in the list of sampled keys.
idx = rand.Intn(len(server.keysWithExpiry.keys))
key = server.keysWithExpiry.keys[idx]
if !slices.Contains(keys, key) {
keys[i] = key
break
}
}
}
server.keysWithExpiry.rwMutex.RUnlock()
// Loop through the keys and delete them if they're expired
for _, k := range keys {
if _, err := server.KeyRLock(ctx, k); err != nil {
continue
}
// If the current key is not expired, skip to the next key
if server.store[k].ExpireAt.After(time.Now()) {
server.KeyRUnlock(ctx, k)
continue
}
// Delete the expired key
deletedCount += 1
server.KeyRUnlock(ctx, k)
if !server.IsInCluster() {
if err := server.DeleteKey(ctx, k); err != nil {
return fmt.Errorf("evictKeysWithExpiredTTL -> standalone delete: %+v", err)
}
} else if server.IsInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, k); err != nil {
return fmt.Errorf("evictKeysWithExpiredTTL -> cluster delete: %+v", err)
}
}
}
// If sampleSize is 0, there's no need to calculate deleted percentage.
if sampleSize == 0 {
log.Println("no keys to sample, skipping eviction")
return nil
}
log.Printf("%d keys sampled, %d keys deleted\n", sampleSize, deletedCount)
// If the deleted percentage is over 20% of the sample size, execute the function again immediately.
if (deletedCount/sampleSize)*100 >= thresholdPercentage {
log.Printf("deletion ratio (%d percent) reached threshold (%d percent), sampling again\n",
(deletedCount/sampleSize)*100, thresholdPercentage)
return server.evictKeysWithExpiredTTL(ctx)
}
return nil
}

View File

@@ -153,9 +153,17 @@ func NewServer(opts Opts) *Server {
) )
} }
// TODO // If eviction policy is not noeviction, start a goroutine to evict keys every 100 milliseconds.
// If eviction policy is not noeviction and the server is in standalone mode, if server.Config.EvictionPolicy != utils.NoEviction {
// start a goroutine to evict keys every 100 milliseconds. go func() {
for {
<-time.After(server.Config.EvictionInterval)
if err := server.evictKeysWithExpiredTTL(context.Background()); err != nil {
log.Println(err)
}
}
}()
}
return server return server
} }

View File

@@ -40,6 +40,8 @@ type Config struct {
AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"` AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"`
MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"` MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"`
EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"` EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"`
EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"`
EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"`
} }
func GetConfig() (Config, error) { func GetConfig() (Config, error) {
@@ -128,6 +130,8 @@ There is no limit by default.`, func(memory string) error {
snapshotInterval := flag.Duration("snapshot-interval", 5*time.Minute, "The time interval between snapshots (in seconds). Default is 5 minutes.") snapshotInterval := flag.Duration("snapshot-interval", 5*time.Minute, "The time interval between snapshots (in seconds). Default is 5 minutes.")
restoreSnapshot := flag.Bool("restore-snapshot", false, "This flag prompts the server to restore state from snapshot when set to true. Only works in standalone mode. Higher priority than restoreAOF.") restoreSnapshot := flag.Bool("restore-snapshot", false, "This flag prompts the server to restore state from snapshot when set to true. Only works in standalone mode. Higher priority than restoreAOF.")
restoreAOF := flag.Bool("restore-aof", false, "This flag prompts the server to restore state from append-only logs. Only works in standalone mode. Lower priority than restoreSnapshot.") restoreAOF := flag.Bool("restore-aof", false, "This flag prompts the server to restore state from append-only logs. Only works in standalone mode. Lower priority than restoreSnapshot.")
evictionSample := flag.Uint("eviction-sample", 20, "An integer specifying the number of keys to sample when checking for expired keys.")
evictionInterval := flag.Duration("eviction-interval", 100*time.Millisecond, "The interval between each sampling of keys to evict.")
forwardCommand := flag.Bool( forwardCommand := flag.Bool(
"forward-commands", "forward-commands",
false, false,
@@ -177,6 +181,8 @@ It is a plain text value by default but you can provide a SHA256 hash by adding
AOFSyncStrategy: aofSyncStrategy, AOFSyncStrategy: aofSyncStrategy,
MaxMemory: maxMemory, MaxMemory: maxMemory,
EvictionPolicy: evictionPolicy, EvictionPolicy: evictionPolicy,
EvictionSample: *evictionSample,
EvictionInterval: *evictionInterval,
} }
if len(*config) > 0 { if len(*config) > 0 {
@@ -193,20 +199,17 @@ It is a plain text value by default but you can provide a SHA256 hash by adding
ext := path.Ext(f.Name()) ext := path.Ext(f.Name())
if ext == ".json" { if ext == ".json" {
err := json.NewDecoder(f).Decode(&conf) if err = json.NewDecoder(f).Decode(&conf); err != nil {
if err != nil {
return Config{}, nil return Config{}, nil
} }
} }
if ext == ".yaml" || ext == ".yml" { if ext == ".yaml" || ext == ".yml" {
err := yaml.NewDecoder(f).Decode(&conf) if err = yaml.NewDecoder(f).Decode(&conf); err != nil {
if err != nil {
return Config{}, err return Config{}, err
} }
} }
} }
} }
// If requirePass is set to true, then password must be provided as well // If requirePass is set to true, then password must be provided as well