Refactored EchoVault struct fields to make them private

This commit is contained in:
Kelvin Mwinuka
2024-03-25 20:21:38 +08:00
parent 893d577ed2
commit c72e982833
5 changed files with 882 additions and 882 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -23,7 +23,7 @@ import (
)
func (server *EchoVault) isInCluster() bool {
return server.Config.BootstrapCluster || server.Config.JoinAddr != ""
return server.config.BootstrapCluster || server.config.JoinAddr != ""
}
func (server *EchoVault) raftApplyDeleteKey(ctx context.Context, key string) error {

View File

@@ -36,13 +36,13 @@ import (
)
type EchoVault struct {
// Config holds the echovault configuration variables.
Config utils.Config
// config holds the echovault configuration variables.
config utils.Config
// The current index for the latest connection id.
// This number is incremented everytime there's a new connection and
// the new number is the new connection's ID.
ConnID atomic.Uint64
connId atomic.Uint64
store map[string]utils.KeyData // Data store to hold the keys and their associated data, expiry time, etc.
keyLocks map[string]*sync.RWMutex // Map to hold all the individual key locks.
@@ -65,7 +65,7 @@ type EchoVault struct {
}
// Holds the list of all commands supported by the echovault.
Commands []utils.Command
commands []utils.Command
raft *raft.Raft // The raft replication layer for the echovault.
memberList *memberlist.MemberList // The memberlist layer for the echovault.
@@ -75,13 +75,13 @@ type EchoVault struct {
ACL utils.ACL
PubSub utils.PubSub
SnapshotInProgress atomic.Bool // Atomic boolean that's true when actively taking a snapshot.
RewriteAOFInProgress atomic.Bool // Atomic boolean that's true when actively rewriting AOF file is in progress.
StateCopyInProgress atomic.Bool // Atomic boolean that's true when actively copying state for snapshotting or preamble generation.
StateMutationInProgress atomic.Bool // Atomic boolean that is set to true when state mutation is in progress.
LatestSnapshotMilliseconds atomic.Int64 // Unix epoch in milliseconds
SnapshotEngine *snapshot.Engine // Snapshot engine for standalone mode
AOFEngine *aof.Engine // AOF engine for standalone mode
snapshotInProgress atomic.Bool // Atomic boolean that's true when actively taking a snapshot.
rewriteAOFInProgress atomic.Bool // Atomic boolean that's true when actively rewriting AOF file is in progress.
stateCopyInProgress atomic.Bool // Atomic boolean that's true when actively copying state for snapshotting or preamble generation.
stateMutationInProgress atomic.Bool // Atomic boolean that is set to true when state mutation is in progress.
latestSnapshotMilliseconds atomic.Int64 // Unix epoch in milliseconds
snapshotEngine *snapshot.Engine // Snapshot engine for standalone mode
aofEngine *aof.Engine // AOF engine for standalone mode
}
func WithContext(ctx context.Context) func(echovault *EchoVault) {
@@ -92,7 +92,7 @@ func WithContext(ctx context.Context) func(echovault *EchoVault) {
func WithConfig(config utils.Config) func(echovault *EchoVault) {
return func(echovault *EchoVault) {
echovault.Config = config
echovault.config = config
}
}
@@ -110,14 +110,14 @@ func WithPubSub(pubsub utils.PubSub) func(echovault *EchoVault) {
func WithCommands(commands []utils.Command) func(echovault *EchoVault) {
return func(echovault *EchoVault) {
echovault.Commands = commands
echovault.commands = commands
}
}
func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
echovault := &EchoVault{
Context: context.Background(),
Commands: make([]utils.Command, 0),
commands: make([]utils.Command, 0),
store: make(map[string]utils.KeyData),
keyLocks: make(map[string]*sync.RWMutex),
keyCreationLock: &sync.Mutex{},
@@ -129,13 +129,13 @@ func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
if echovault.isInCluster() {
echovault.raft = raft.NewRaft(raft.Opts{
Config: echovault.Config,
Config: echovault.config,
EchoVault: echovault,
GetCommand: echovault.getCommand,
DeleteKey: echovault.DeleteKey,
})
echovault.memberList = memberlist.NewMemberList(memberlist.Opts{
Config: echovault.Config,
Config: echovault.config,
HasJoinedCluster: echovault.raft.HasJoinedCluster,
AddVoter: echovault.raft.AddVoter,
RemoveRaftServer: echovault.raft.RemoveServer,
@@ -145,10 +145,10 @@ func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
})
} else {
// Set up standalone snapshot engine
echovault.SnapshotEngine = snapshot.NewSnapshotEngine(
snapshot.WithDirectory(echovault.Config.DataDir),
snapshot.WithThreshold(echovault.Config.SnapShotThreshold),
snapshot.WithInterval(echovault.Config.SnapshotInterval),
echovault.snapshotEngine = snapshot.NewSnapshotEngine(
snapshot.WithDirectory(echovault.config.DataDir),
snapshot.WithThreshold(echovault.config.SnapShotThreshold),
snapshot.WithInterval(echovault.config.SnapshotInterval),
snapshot.WithGetStateFunc(echovault.GetState),
snapshot.WithStartSnapshotFunc(echovault.StartSnapshot),
snapshot.WithFinishSnapshotFunc(echovault.FinishSnapshot),
@@ -167,9 +167,9 @@ func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
}),
)
// Set up standalone AOF engine
echovault.AOFEngine = aof.NewAOFEngine(
aof.WithDirectory(echovault.Config.DataDir),
aof.WithStrategy(echovault.Config.AOFSyncStrategy),
echovault.aofEngine = aof.NewAOFEngine(
aof.WithDirectory(echovault.config.DataDir),
aof.WithStrategy(echovault.config.AOFSyncStrategy),
aof.WithStartRewriteFunc(echovault.StartRewriteAOF),
aof.WithFinishRewriteFunc(echovault.FinishRewriteAOF),
aof.WithGetStateFunc(echovault.GetState),
@@ -194,10 +194,10 @@ func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
}
// If eviction policy is not noeviction, start a goroutine to evict keys every 100 milliseconds.
if echovault.Config.EvictionPolicy != utils.NoEviction {
if echovault.config.EvictionPolicy != utils.NoEviction {
go func() {
for {
<-time.After(echovault.Config.EvictionInterval)
<-time.After(echovault.config.EvictionInterval)
if err := echovault.evictKeysWithExpiredTTL(context.Background()); err != nil {
log.Println(err)
}
@@ -209,7 +209,7 @@ func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
}
func (server *EchoVault) StartTCP(ctx context.Context) {
conf := server.Config
conf := server.config
listenConfig := net.ListenConfig{
KeepAlive: 200 * time.Millisecond,
@@ -290,7 +290,7 @@ func (server *EchoVault) handleConnection(ctx context.Context, conn net.Conn) {
w, r := io.Writer(conn), io.Reader(conn)
cid := server.ConnID.Add(1)
cid := server.connId.Add(1)
ctx = context.WithValue(ctx, utils.ContextConnID("ConnectionID"),
fmt.Sprintf("%s-%d", ctx.Value(utils.ContextServerID("ServerID")), cid))
@@ -358,7 +358,7 @@ func (server *EchoVault) handleConnection(ctx context.Context, conn net.Conn) {
}
func (server *EchoVault) Start(ctx context.Context) {
conf := server.Config
conf := server.config
if conf.TLS && len(conf.CertKeyPairs) <= 0 {
log.Fatal("must provide certificate and key file paths for TLS mode")
@@ -378,7 +378,7 @@ func (server *EchoVault) Start(ctx context.Context) {
server.initialiseCaches()
// Restore from AOF by default if it's enabled
if conf.RestoreAOF {
err := server.AOFEngine.Restore()
err := server.aofEngine.Restore()
if err != nil {
log.Println(err)
}
@@ -386,7 +386,7 @@ func (server *EchoVault) Start(ctx context.Context) {
// Restore from snapshot if snapshot restore is enabled and AOF restore is disabled
if conf.RestoreSnapshot && !conf.RestoreAOF {
err := server.SnapshotEngine.Restore()
err := server.snapshotEngine.Restore()
if err != nil {
log.Println(err)
}
@@ -397,7 +397,7 @@ func (server *EchoVault) Start(ctx context.Context) {
}
func (server *EchoVault) TakeSnapshot() error {
if server.SnapshotInProgress.Load() {
if server.snapshotInProgress.Load() {
return errors.New("snapshot already in progress")
}
@@ -410,7 +410,7 @@ func (server *EchoVault) TakeSnapshot() error {
return
}
// Handle snapshot in standalone mode
if err := server.SnapshotEngine.TakeSnapshot(); err != nil {
if err := server.snapshotEngine.TakeSnapshot(); err != nil {
log.Println(err)
}
}()
@@ -419,35 +419,35 @@ func (server *EchoVault) TakeSnapshot() error {
}
func (server *EchoVault) StartSnapshot() {
server.SnapshotInProgress.Store(true)
server.snapshotInProgress.Store(true)
}
func (server *EchoVault) FinishSnapshot() {
server.SnapshotInProgress.Store(false)
server.snapshotInProgress.Store(false)
}
func (server *EchoVault) SetLatestSnapshot(msec int64) {
server.LatestSnapshotMilliseconds.Store(msec)
server.latestSnapshotMilliseconds.Store(msec)
}
func (server *EchoVault) GetLatestSnapshot() int64 {
return server.LatestSnapshotMilliseconds.Load()
return server.latestSnapshotMilliseconds.Load()
}
func (server *EchoVault) StartRewriteAOF() {
server.RewriteAOFInProgress.Store(true)
server.rewriteAOFInProgress.Store(true)
}
func (server *EchoVault) FinishRewriteAOF() {
server.RewriteAOFInProgress.Store(false)
server.rewriteAOFInProgress.Store(false)
}
func (server *EchoVault) RewriteAOF() error {
if server.RewriteAOFInProgress.Load() {
if server.rewriteAOFInProgress.Load() {
return errors.New("aof rewrite in progress")
}
go func() {
if err := server.AOFEngine.RewriteLog(); err != nil {
if err := server.aofEngine.RewriteLog(); err != nil {
log.Println(err)
}
}()

View File

@@ -127,7 +127,7 @@ func (server *EchoVault) KeyExists(ctx context.Context, key string) bool {
// CreateKeyAndLock creates a new key lock and immediately locks it if the key does not exist.
// If the key exists, the existing key is locked.
func (server *EchoVault) CreateKeyAndLock(ctx context.Context, key string) (bool, error) {
if utils.IsMaxMemoryExceeded(server.Config.MaxMemory) && server.Config.EvictionPolicy == utils.NoEviction {
if utils.IsMaxMemoryExceeded(server.config.MaxMemory) && server.config.EvictionPolicy == utils.NoEviction {
return false, errors.New("max memory reached, key not created")
}
@@ -165,7 +165,7 @@ func (server *EchoVault) GetValue(ctx context.Context, key string) interface{} {
// This count triggers a snapshot when the threshold is reached.
// The key must be locked prior to calling this function.
func (server *EchoVault) SetValue(ctx context.Context, key string, value interface{}) error {
if utils.IsMaxMemoryExceeded(server.Config.MaxMemory) && server.Config.EvictionPolicy == utils.NoEviction {
if utils.IsMaxMemoryExceeded(server.config.MaxMemory) && server.config.EvictionPolicy == utils.NoEviction {
return errors.New("max memory reached, key value not set")
}
@@ -180,7 +180,7 @@ func (server *EchoVault) SetValue(ctx context.Context, key string, value interfa
}
if !server.isInCluster() {
server.SnapshotEngine.IncrementChangeCount()
server.snapshotEngine.IncrementChangeCount()
}
return nil
@@ -242,13 +242,13 @@ func (server *EchoVault) RemoveExpiry(key string) {
// GetState creates a deep copy of the store map.
// It is used to retrieve the current state for persistence but can also be used for other
// functions that require a deep copy of the state.
// The copy only starts when there's no current copy in progress (represented by StateCopyInProgress atomic boolean)
// and when there's no current state mutation in progress (represented by StateMutationInProgress atomic boolean)
// The copy only starts when there's no current copy in progress (represented by stateCopyInProgress atomic boolean)
// and when there's no current state mutation in progress (represented by stateMutationInProgress atomic boolean)
func (server *EchoVault) GetState() map[string]utils.KeyData {
// Wait unit there's no state mutation or copy in progress before starting a new copy process.
for {
if !server.StateCopyInProgress.Load() && !server.StateMutationInProgress.Load() {
server.StateCopyInProgress.Store(true)
if !server.stateCopyInProgress.Load() && !server.stateMutationInProgress.Load() {
server.stateCopyInProgress.Store(true)
break
}
}
@@ -256,7 +256,7 @@ func (server *EchoVault) GetState() map[string]utils.KeyData {
for k, v := range server.store {
data[k] = v
}
server.StateCopyInProgress.Store(false)
server.stateCopyInProgress.Store(false)
return data
}
@@ -275,9 +275,9 @@ func (server *EchoVault) DeleteKey(ctx context.Context, key string) error {
// Remove the key from the cache.
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.Config.EvictionPolicy):
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, server.config.EvictionPolicy):
server.lfuCache.cache.Delete(key)
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.Config.EvictionPolicy):
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, server.config.EvictionPolicy):
server.lruCache.cache.Delete(key)
}
@@ -294,10 +294,10 @@ func (server *EchoVault) updateKeyInCache(ctx context.Context, key string) error
return nil
}
// If max memory is 0, there's no max so no need to update caches
if server.Config.MaxMemory == 0 {
if server.config.MaxMemory == 0 {
return nil
}
switch strings.ToLower(server.Config.EvictionPolicy) {
switch strings.ToLower(server.config.EvictionPolicy) {
case utils.AllKeysLFU:
server.lfuCache.mutex.Lock()
defer server.lfuCache.mutex.Unlock()
@@ -328,7 +328,7 @@ func (server *EchoVault) updateKeyInCache(ctx context.Context, key string) error
// adjustMemoryUsage should only be called from standalone echovault or from raft cluster leader.
func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
// If max memory is 0, there's no need to adjust memory usage.
if server.Config.MaxMemory == 0 {
if server.config.MaxMemory == 0 {
return nil
}
// Check if memory usage is above max-memory.
@@ -336,20 +336,20 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
// If we're using less memory than the max-memory, there's no need to evict.
if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
// Force a garbage collection first before we start evicting key.
runtime.GC()
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
// We've done a GC, but we're still at or above the max memory limit.
// Start a loop that evicts keys until either the heap is empty or
// we're below the max memory limit.
switch {
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, strings.ToLower(server.Config.EvictionPolicy)):
case slices.Contains([]string{utils.AllKeysLFU, utils.VolatileLFU}, strings.ToLower(server.config.EvictionPolicy)):
// Remove keys from LFU cache until we're below the max memory limit or
// until the LFU cache is empty.
server.lfuCache.mutex.Lock()
@@ -377,11 +377,11 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, strings.ToLower(server.Config.EvictionPolicy)):
case slices.Contains([]string{utils.AllKeysLRU, utils.VolatileLRU}, strings.ToLower(server.config.EvictionPolicy)):
// Remove keys from th LRU cache until we're below the max memory limit or
// until the LRU cache is empty.
server.lruCache.mutex.Lock()
@@ -410,11 +410,11 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
case slices.Contains([]string{utils.AllKeysRandom}, strings.ToLower(server.Config.EvictionPolicy)):
case slices.Contains([]string{utils.AllKeysRandom}, strings.ToLower(server.config.EvictionPolicy)):
// Remove random keys until we're below the max memory limit
// or there are no more keys remaining.
for {
@@ -441,14 +441,14 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
idx--
}
}
case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.Config.EvictionPolicy)):
case slices.Contains([]string{utils.VolatileRandom}, strings.ToLower(server.config.EvictionPolicy)):
// Remove random keys with an associated expiry time until we're below the max memory limit
// or there are no more keys with expiry time.
for {
@@ -473,7 +473,7 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
runtime.GC()
// Return if we're below max memory
runtime.ReadMemStats(&memStats)
if memStats.HeapInuse < server.Config.MaxMemory {
if memStats.HeapInuse < server.config.MaxMemory {
return nil
}
}
@@ -497,7 +497,7 @@ func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error {
// Sample size should be the configured sample size, or the size of the keys with expiry,
// whichever one is smaller.
sampleSize := int(server.Config.EvictionSample)
sampleSize := int(server.config.EvictionSample)
if len(server.keysWithExpiry.keys) < sampleSize {
sampleSize = len(server.keysWithExpiry.keys)
}

View File

@@ -24,7 +24,7 @@ import (
)
func (server *EchoVault) GetAllCommands() []utils.Command {
return server.Commands
return server.commands
}
func (server *EchoVault) GetACL() interface{} {
@@ -36,7 +36,7 @@ func (server *EchoVault) GetPubSub() interface{} {
}
func (server *EchoVault) getCommand(cmd string) (utils.Command, error) {
for _, command := range server.Commands {
for _, command := range server.commands {
if strings.EqualFold(command.Command, cmd) {
return command, nil
}
@@ -76,8 +76,8 @@ func (server *EchoVault) handleCommand(ctx context.Context, message []byte, conn
// If the command is a write command, wait for state copy to finish.
if utils.IsWriteCommand(command, subCommand) {
for {
if !server.StateCopyInProgress.Load() {
server.StateMutationInProgress.Store(true)
if !server.stateCopyInProgress.Load() {
server.stateMutationInProgress.Store(true)
break
}
}
@@ -90,10 +90,10 @@ func (server *EchoVault) handleCommand(ctx context.Context, message []byte, conn
}
if utils.IsWriteCommand(command, subCommand) && !replay {
go server.AOFEngine.QueueCommand(message)
go server.aofEngine.QueueCommand(message)
}
server.StateMutationInProgress.Store(false)
server.stateMutationInProgress.Store(false)
return res, err
}
@@ -109,7 +109,7 @@ func (server *EchoVault) handleCommand(ctx context.Context, message []byte, conn
}
// Forward message to leader and return immediate OK response
if server.Config.ForwardCommand {
if server.config.ForwardCommand {
server.memberList.ForwardDataMutation(ctx, message)
return []byte(utils.OkResponse), nil
}