diff --git a/Dockerfile.dev b/Dockerfile.dev index f89dbf9..eb05020 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -23,6 +23,8 @@ CMD "./server" \ "--snapshot-interval" "${SNAPSHOT_INTERVAL}" \ "--max-memory" "${MAX_MEMORY}" \ "--eviction-policy" "${EVICTION_POLICY}" \ + "--eviction-sample" "${EVICTION_SAMPLE}" \ + "--eviction-interval" "${EVICTION_INTERVAL}" \ "--tls=${TLS}" \ "--mtls=${MTLS}" \ "--in-memory=${IN_MEMORY}" \ diff --git a/docker-compose.yaml b/docker-compose.yaml index e80559c..6251cb1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -32,6 +32,8 @@ services: - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - EVICTION_POLICY=allkeys-lfu + - EVICTION_SAMPLE=20 + - EVICTION_INTERVAL=100ms # List of server cert/key pairs - CERT_KEY_PAIR_1=/etc/ssl/certs/echovault/server/server1.crt,/etc/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_2=/etc/ssl/certs/echovault/server/server2.crt,/etc/ssl/certs/echovault/server/server2.key @@ -74,6 +76,8 @@ services: - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - EVICTION_POLICY=allkeys-lfu + - EVICTION_SAMPLE=20 + - EVICTION_INTERVAL=100ms # List of server cert/key pairs - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key @@ -117,6 +121,8 @@ services: - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - EVICTION_POLICY=allkeys-lfu + - EVICTION_SAMPLE=20 + - EVICTION_INTERVAL=100ms # List of server cert/key pairs - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key @@ -160,6 +166,8 @@ services: - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - EVICTION_POLICY=allkeys-lfu + - EVICTION_SAMPLE=20 + - EVICTION_INTERVAL=100ms # List of server cert/key pairs - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key @@ -203,6 +211,8 @@ services: - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - EVICTION_POLICY=allkeys-lfu + - EVICTION_SAMPLE=20 + - EVICTION_INTERVAL=100ms # List of server cert/key pairs - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key @@ -246,6 +256,8 @@ services: - AOF_SYNC_STRATEGY=everysec - MAX_MEMORY=2000kb - EVICTION_POLICY=allkeys-lfu + - EVICTION_SAMPLE=20 + - EVICTION_INTERVAL=100ms # List of server cert/key pairs - CERT_KEY_PAIR_1=/generic/ssl/certs/echovault/server/server1.crt,/generic/ssl/certs/echovault/server/server1.key - CERT_KEY_PAIR_2=/generic/ssl/certs/echovault/server/server2.crt,/generic/ssl/certs/echovault/server/server2.key diff --git a/src/raft/raft.go b/src/raft/raft.go index 7b1d0fd..24538d9 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -121,10 +121,6 @@ func (r *Raft) RaftInit(ctx context.Context) { } r.raft = raftServer - - // TODO - // Listen on leadership change channel and initiate key eviction goroutine - // if current node is the leader. } func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture { diff --git a/src/server/keyspace.go b/src/server/keyspace.go index 05cd12e..055ed89 100644 --- a/src/server/keyspace.go +++ b/src/server/keyspace.go @@ -245,20 +245,20 @@ func (server *Server) GetState() map[string]interface{} { return data } -// DeleteKey removes the key from store, keyLocks and keyExpiry maps +// DeleteKey removes the key from store, keyLocks and keyExpiry maps. func (server *Server) DeleteKey(ctx context.Context, key string) error { if _, err := server.KeyLock(ctx, key); err != nil { - return fmt.Errorf("deleteKey: %+v", err) + return fmt.Errorf("deleteKey error: %+v", err) } - // Remove key expiry + // Remove key expiry. server.RemoveExpiry(key) - // Delete the key from keyLocks and store + // Delete the key from keyLocks and store. delete(server.keyLocks, key) delete(server.store, key) - // Remove the key from the cache + // Remove the key from the cache. switch { case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy): server.lfuCache.cache.Delete(key) @@ -466,3 +466,86 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error { return nil } } + +// evictKeysWithExpiredTTL is a function that samples keys with an associated TTL +// and evicts keys that are currently expired. +// This function will sample 20 keys from the list of keys with an associated TTL, +// if the key is expired, it will be evicted. +// This function is only executed in standalone mode or by the raft cluster leader. +func (server *Server) evictKeysWithExpiredTTL(ctx context.Context) error { + // Only execute this if we're in standalone mode, or raft cluster leader. + if server.IsInCluster() && !server.raft.IsRaftLeader() { + return nil + } + + server.keysWithExpiry.rwMutex.RLock() + + // Sample size should be the configured sample size, or the size of the keys with expiry, + // whichever one is smaller. + sampleSize := int(server.Config.EvictionSample) + if len(server.keysWithExpiry.keys) < sampleSize { + sampleSize = len(server.keysWithExpiry.keys) + } + keys := make([]string, sampleSize) + + deletedCount := 0 + thresholdPercentage := 20 + + var idx int + var key string + for i := 0; i < len(keys); i++ { + for { + // Retry retrieval of a random key until we find a key that is not already in the list of sampled keys. + idx = rand.Intn(len(server.keysWithExpiry.keys)) + key = server.keysWithExpiry.keys[idx] + if !slices.Contains(keys, key) { + keys[i] = key + break + } + } + } + server.keysWithExpiry.rwMutex.RUnlock() + + // Loop through the keys and delete them if they're expired + for _, k := range keys { + if _, err := server.KeyRLock(ctx, k); err != nil { + continue + } + + // If the current key is not expired, skip to the next key + if server.store[k].ExpireAt.After(time.Now()) { + server.KeyRUnlock(ctx, k) + continue + } + + // Delete the expired key + deletedCount += 1 + server.KeyRUnlock(ctx, k) + if !server.IsInCluster() { + if err := server.DeleteKey(ctx, k); err != nil { + return fmt.Errorf("evictKeysWithExpiredTTL -> standalone delete: %+v", err) + } + } else if server.IsInCluster() && server.raft.IsRaftLeader() { + if err := server.raftApplyDeleteKey(ctx, k); err != nil { + return fmt.Errorf("evictKeysWithExpiredTTL -> cluster delete: %+v", err) + } + } + } + + // If sampleSize is 0, there's no need to calculate deleted percentage. + if sampleSize == 0 { + log.Println("no keys to sample, skipping eviction") + return nil + } + + log.Printf("%d keys sampled, %d keys deleted\n", sampleSize, deletedCount) + + // If the deleted percentage is over 20% of the sample size, execute the function again immediately. + if (deletedCount/sampleSize)*100 >= thresholdPercentage { + log.Printf("deletion ratio (%d percent) reached threshold (%d percent), sampling again\n", + (deletedCount/sampleSize)*100, thresholdPercentage) + return server.evictKeysWithExpiredTTL(ctx) + } + + return nil +} diff --git a/src/server/server.go b/src/server/server.go index 3c07c88..685e8e9 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -153,9 +153,17 @@ func NewServer(opts Opts) *Server { ) } - // TODO - // If eviction policy is not noeviction and the server is in standalone mode, - // start a goroutine to evict keys every 100 milliseconds. + // If eviction policy is not noeviction, start a goroutine to evict keys every 100 milliseconds. + if server.Config.EvictionPolicy != utils.NoEviction { + go func() { + for { + <-time.After(server.Config.EvictionInterval) + if err := server.evictKeysWithExpiredTTL(context.Background()); err != nil { + log.Println(err) + } + } + }() + } return server } diff --git a/src/utils/config.go b/src/utils/config.go index eae8dd9..7a18138 100644 --- a/src/utils/config.go +++ b/src/utils/config.go @@ -40,6 +40,8 @@ type Config struct { AOFSyncStrategy string `json:"AOFSyncStrategy" yaml:"AOFSyncStrategy"` MaxMemory uint64 `json:"MaxMemory" yaml:"MaxMemory"` EvictionPolicy string `json:"EvictionPolicy" yaml:"EvictionPolicy"` + EvictionSample uint `json:"EvictionSample" yaml:"EvictionSample"` + EvictionInterval time.Duration `json:"EvictionInterval" yaml:"EvictionInterval"` } func GetConfig() (Config, error) { @@ -128,6 +130,8 @@ There is no limit by default.`, func(memory string) error { snapshotInterval := flag.Duration("snapshot-interval", 5*time.Minute, "The time interval between snapshots (in seconds). Default is 5 minutes.") restoreSnapshot := flag.Bool("restore-snapshot", false, "This flag prompts the server to restore state from snapshot when set to true. Only works in standalone mode. Higher priority than restoreAOF.") restoreAOF := flag.Bool("restore-aof", false, "This flag prompts the server to restore state from append-only logs. Only works in standalone mode. Lower priority than restoreSnapshot.") + evictionSample := flag.Uint("eviction-sample", 20, "An integer specifying the number of keys to sample when checking for expired keys.") + evictionInterval := flag.Duration("eviction-interval", 100*time.Millisecond, "The interval between each sampling of keys to evict.") forwardCommand := flag.Bool( "forward-commands", false, @@ -177,6 +181,8 @@ It is a plain text value by default but you can provide a SHA256 hash by adding AOFSyncStrategy: aofSyncStrategy, MaxMemory: maxMemory, EvictionPolicy: evictionPolicy, + EvictionSample: *evictionSample, + EvictionInterval: *evictionInterval, } if len(*config) > 0 { @@ -193,20 +199,17 @@ It is a plain text value by default but you can provide a SHA256 hash by adding ext := path.Ext(f.Name()) if ext == ".json" { - err := json.NewDecoder(f).Decode(&conf) - if err != nil { + if err = json.NewDecoder(f).Decode(&conf); err != nil { return Config{}, nil } } if ext == ".yaml" || ext == ".yml" { - err := yaml.NewDecoder(f).Decode(&conf) - if err != nil { + if err = yaml.NewDecoder(f).Decode(&conf); err != nil { return Config{}, err } } } - } // If requirePass is set to true, then password must be provided as well