Added RWMutex to keysWithExpiry in order to improve thread safety

This commit is contained in:
Kelvin Mwinuka
2024-03-11 02:10:23 +08:00
parent 8cc8f6c02c
commit 8000cef72d
4 changed files with 78 additions and 37 deletions

View File

@@ -28,6 +28,8 @@ func NewFSM(opts FSMOpts) raft.FSM {
// Apply Implements raft.FSM interface // Apply Implements raft.FSM interface
func (fsm *FSM) Apply(log *raft.Log) interface{} { func (fsm *FSM) Apply(log *raft.Log) interface{} {
switch log.Type { switch log.Type {
default:
// No-Op
case raft.LogCommand: case raft.LogCommand:
var request utils.ApplyRequest var request utils.ApplyRequest

View File

@@ -119,6 +119,10 @@ func (r *Raft) RaftInit(ctx context.Context) {
} }
r.raft = raftServer r.raft = raftServer
// TODO
// Listen on leadership change channel and initiate key eviction goroutine
// if current node is the leader.
} }
func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture { func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture {

View File

@@ -177,6 +177,15 @@ func (server *Server) SetExpiry(ctx context.Context, key string, expireAt time.T
Value: server.store[key].Value, Value: server.store[key].Value,
ExpireAt: expireAt, ExpireAt: expireAt,
} }
// If the slice of keys associated with expiry time does not contain the current key, add the key.
server.keysWithExpiry.rwMutex.Lock()
if !slices.Contains(server.keysWithExpiry.keys, key) {
server.keysWithExpiry.keys = append(server.keysWithExpiry.keys, key)
}
server.keysWithExpiry.rwMutex.Unlock()
// If touch is true, update the keys status in the cache.
if touch { if touch {
err := server.updateKeyInCache(ctx, key) err := server.updateKeyInCache(ctx, key)
if err != nil { if err != nil {
@@ -188,16 +197,18 @@ func (server *Server) SetExpiry(ctx context.Context, key string, expireAt time.T
// RemoveExpiry is called by commands that remove key expiry (e.g. PERSIST). // RemoveExpiry is called by commands that remove key expiry (e.g. PERSIST).
// The key must be locked prior ro calling this function. // The key must be locked prior ro calling this function.
func (server *Server) RemoveExpiry(key string) { func (server *Server) RemoveExpiry(key string) {
// Reset expiry time
server.store[key] = utils.KeyData{ server.store[key] = utils.KeyData{
Value: server.store[key].Value, Value: server.store[key].Value,
ExpireAt: time.Time{}, ExpireAt: time.Time{},
} }
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy): // Remove key from slice of keys associated with expiry
server.lfuCache.cache.Delete(key) server.keysWithExpiry.rwMutex.Lock()
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.Config.EvictionPolicy): defer server.keysWithExpiry.rwMutex.Unlock()
server.lruCache.cache.Delete(key) server.keysWithExpiry.keys = slices.DeleteFunc(server.keysWithExpiry.keys, func(k string) bool {
} return k == key
})
} }
// GetState creates a deep copy of the store map. // GetState creates a deep copy of the store map.
@@ -225,11 +236,22 @@ func (server *Server) DeleteKey(ctx context.Context, key string) error {
if _, err := server.KeyLock(ctx, key); err != nil { if _, err := server.KeyLock(ctx, key); err != nil {
return fmt.Errorf("deleteKey: %+v", err) return fmt.Errorf("deleteKey: %+v", err)
} }
// Remove key expiry // Remove key expiry
server.RemoveExpiry(key) server.RemoveExpiry(key)
// Delete the key from keyLocks and store // Delete the key from keyLocks and store
delete(server.keyLocks, key) delete(server.keyLocks, key)
delete(server.store, key) delete(server.store, key)
// Remove the key from the cache
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy):
server.lfuCache.cache.Delete(key)
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.Config.EvictionPolicy):
server.lruCache.cache.Delete(key)
}
return nil return nil
} }
@@ -368,30 +390,26 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
} }
} }
case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.Config.EvictionPolicy)): case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.Config.EvictionPolicy)):
// Remove random keys with expiry time until we're below the max memory limit // Remove random keys with an associated expiry time until we're below the max memory limit
// or there are no more keys with expiry time. // or there are no more keys with expiry time.
for { for {
// Get random volatile key // Get random volatile key
idx := rand.Intn(len(server.keyLocks)) server.keysWithExpiry.rwMutex.RLock()
for key, _ := range server.keyLocks { idx := rand.Intn(len(server.keysWithExpiry.keys))
if idx == 0 { key := server.keysWithExpiry.keys[idx]
// If the key is not volatile, break the loop server.keysWithExpiry.rwMutex.RUnlock()
if server.store[key].ExpireAt == (time.Time{}) {
break // Delete the key
} if err := server.DeleteKey(ctx, key); err != nil {
// Delete the key return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
if err := server.DeleteKey(ctx, key); err != nil { }
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
} // Run garbage collection
// Run garbage collection runtime.GC()
runtime.GC() // Return if we're below max memory
// Return if we're below max memory runtime.ReadMemStats(&memStats)
runtime.ReadMemStats(&memStats) if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.Config.MaxMemory { return nil
return nil
}
}
idx--
} }
} }
default: default:

View File

@@ -22,26 +22,39 @@ import (
) )
type Server struct { type Server struct {
// Config holds the server configuration variables.
Config utils.Config Config utils.Config
// The current index for the latest connection id.
// This number is incremented everytime there's a new connection and
// the new number is the new connection's ID.
ConnID atomic.Uint64 ConnID atomic.Uint64
store map[string]utils.KeyData store map[string]utils.KeyData // Data store to hold the keys and their associated data, expiry time, etc.
keyLocks map[string]*sync.RWMutex keyLocks map[string]*sync.RWMutex // Map to hold all the individual key locks.
keyCreationLock *sync.Mutex keyCreationLock *sync.Mutex // The mutex for creating a new key. Only one goroutine should be able to create a key at a time.
lfuCache struct {
mutex sync.Mutex // Holds all the keys that are currently associated with an expiry.
cache eviction.CacheLFU keysWithExpiry struct {
rwMutex sync.RWMutex // Mutex as only one process should be able to update this list at a time.
keys []string // string slice of the volatile keys
} }
// LFU cache used when eviction policy is allkeys-lfu or volatile-lfu
lfuCache struct {
mutex sync.Mutex // Mutex as only one goroutine can edit the LFU cache at a time.
cache eviction.CacheLFU // LFU cache represented by a min head.
}
// LRU cache used when eviction policy is allkeys-lru or volatile-lru
lruCache struct { lruCache struct {
mutex sync.Mutex mutex sync.Mutex // Mutex as only one goroutine can edit the LRU at a time.
cache eviction.CacheLRU cache eviction.CacheLRU // LRU cache represented by a max head.
} }
// Holds the list of all commands supported by the server.
Commands []utils.Command Commands []utils.Command
raft *raft.Raft raft *raft.Raft // The raft replication layer for the server.
memberList *memberlist.MemberList memberList *memberlist.MemberList // The memberlist layer for the server.
CancelCh *chan os.Signal CancelCh *chan os.Signal
@@ -138,6 +151,10 @@ func NewServer(opts Opts) *Server {
) )
} }
// TODO
// If eviction policy is not noeviction and the server is in standalone mode,
// start a goroutine to evict keys every 100 milliseconds.
return server return server
} }