diff --git a/src/raft/fsm.go b/src/raft/fsm.go index fb6cab8..11136f1 100644 --- a/src/raft/fsm.go +++ b/src/raft/fsm.go @@ -28,6 +28,8 @@ func NewFSM(opts FSMOpts) raft.FSM { // Apply Implements raft.FSM interface func (fsm *FSM) Apply(log *raft.Log) interface{} { switch log.Type { + default: + // No-Op case raft.LogCommand: var request utils.ApplyRequest diff --git a/src/raft/raft.go b/src/raft/raft.go index e2c0841..7385042 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -119,6 +119,10 @@ func (r *Raft) RaftInit(ctx context.Context) { } r.raft = raftServer + + // TODO + // Listen on leadership change channel and initiate key eviction goroutine + // if current node is the leader. } func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture { diff --git a/src/server/keyspace.go b/src/server/keyspace.go index f9071cc..86efc08 100644 --- a/src/server/keyspace.go +++ b/src/server/keyspace.go @@ -177,6 +177,15 @@ func (server *Server) SetExpiry(ctx context.Context, key string, expireAt time.T Value: server.store[key].Value, ExpireAt: expireAt, } + + // If the slice of keys associated with expiry time does not contain the current key, add the key. + server.keysWithExpiry.rwMutex.Lock() + if !slices.Contains(server.keysWithExpiry.keys, key) { + server.keysWithExpiry.keys = append(server.keysWithExpiry.keys, key) + } + server.keysWithExpiry.rwMutex.Unlock() + + // If touch is true, update the keys status in the cache. if touch { err := server.updateKeyInCache(ctx, key) if err != nil { @@ -188,16 +197,18 @@ func (server *Server) SetExpiry(ctx context.Context, key string, expireAt time.T // RemoveExpiry is called by commands that remove key expiry (e.g. PERSIST). // The key must be locked prior ro calling this function. func (server *Server) RemoveExpiry(key string) { + // Reset expiry time server.store[key] = utils.KeyData{ Value: server.store[key].Value, ExpireAt: time.Time{}, } - switch { - case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy): - server.lfuCache.cache.Delete(key) - case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.Config.EvictionPolicy): - server.lruCache.cache.Delete(key) - } + + // Remove key from slice of keys associated with expiry + server.keysWithExpiry.rwMutex.Lock() + defer server.keysWithExpiry.rwMutex.Unlock() + server.keysWithExpiry.keys = slices.DeleteFunc(server.keysWithExpiry.keys, func(k string) bool { + return k == key + }) } // GetState creates a deep copy of the store map. @@ -225,11 +236,22 @@ func (server *Server) DeleteKey(ctx context.Context, key string) error { if _, err := server.KeyLock(ctx, key); err != nil { return fmt.Errorf("deleteKey: %+v", err) } + // Remove key expiry server.RemoveExpiry(key) + // Delete the key from keyLocks and store delete(server.keyLocks, key) delete(server.store, key) + + // Remove the key from the cache + switch { + case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy): + server.lfuCache.cache.Delete(key) + case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.Config.EvictionPolicy): + server.lruCache.cache.Delete(key) + } + return nil } @@ -368,30 +390,26 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error { } } case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.Config.EvictionPolicy)): - // Remove random keys with expiry time until we're below the max memory limit + // Remove random keys with an associated expiry time until we're below the max memory limit // or there are no more keys with expiry time. for { // Get random volatile key - idx := rand.Intn(len(server.keyLocks)) - for key, _ := range server.keyLocks { - if idx == 0 { - // If the key is not volatile, break the loop - if server.store[key].ExpireAt == (time.Time{}) { - break - } - // Delete the key - if err := server.DeleteKey(ctx, key); err != nil { - return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err) - } - // Run garbage collection - runtime.GC() - // Return if we're below max memory - runtime.ReadMemStats(&memStats) - if memStats.HeapInuse < server.Config.MaxMemory { - return nil - } - } - idx-- + server.keysWithExpiry.rwMutex.RLock() + idx := rand.Intn(len(server.keysWithExpiry.keys)) + key := server.keysWithExpiry.keys[idx] + server.keysWithExpiry.rwMutex.RUnlock() + + // Delete the key + if err := server.DeleteKey(ctx, key); err != nil { + return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err) + } + + // Run garbage collection + runtime.GC() + // Return if we're below max memory + runtime.ReadMemStats(&memStats) + if memStats.HeapInuse < server.Config.MaxMemory { + return nil } } default: diff --git a/src/server/server.go b/src/server/server.go index 4b8b936..9a5f02f 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -22,26 +22,39 @@ import ( ) type Server struct { + // Config holds the server configuration variables. Config utils.Config + // The current index for the latest connection id. + // This number is incremented everytime there's a new connection and + // the new number is the new connection's ID. ConnID atomic.Uint64 - store map[string]utils.KeyData - keyLocks map[string]*sync.RWMutex - keyCreationLock *sync.Mutex - lfuCache struct { - mutex sync.Mutex - cache eviction.CacheLFU + store map[string]utils.KeyData // Data store to hold the keys and their associated data, expiry time, etc. + keyLocks map[string]*sync.RWMutex // Map to hold all the individual key locks. + keyCreationLock *sync.Mutex // The mutex for creating a new key. Only one goroutine should be able to create a key at a time. + + // Holds all the keys that are currently associated with an expiry. + keysWithExpiry struct { + rwMutex sync.RWMutex // Mutex as only one process should be able to update this list at a time. + keys []string // string slice of the volatile keys } + // LFU cache used when eviction policy is allkeys-lfu or volatile-lfu + lfuCache struct { + mutex sync.Mutex // Mutex as only one goroutine can edit the LFU cache at a time. + cache eviction.CacheLFU // LFU cache represented by a min head. + } + // LRU cache used when eviction policy is allkeys-lru or volatile-lru lruCache struct { - mutex sync.Mutex - cache eviction.CacheLRU + mutex sync.Mutex // Mutex as only one goroutine can edit the LRU at a time. + cache eviction.CacheLRU // LRU cache represented by a max head. } + // Holds the list of all commands supported by the server. Commands []utils.Command - raft *raft.Raft - memberList *memberlist.MemberList + raft *raft.Raft // The raft replication layer for the server. + memberList *memberlist.MemberList // The memberlist layer for the server. CancelCh *chan os.Signal @@ -138,6 +151,10 @@ func NewServer(opts Opts) *Server { ) } + // TODO + // If eviction policy is not noeviction and the server is in standalone mode, + // start a goroutine to evict keys every 100 milliseconds. + return server }