isInCluster is not a private function

This commit is contained in:
Kelvin Mwinuka
2024-03-25 20:06:26 +08:00
parent 37a6958c1a
commit 893d577ed2
5 changed files with 512 additions and 512 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -22,7 +22,7 @@ import (
"time" "time"
) )
func (server *EchoVault) IsInCluster() bool { func (server *EchoVault) isInCluster() bool {
return server.Config.BootstrapCluster || server.Config.JoinAddr != "" return server.Config.BootstrapCluster || server.Config.JoinAddr != ""
} }

View File

@@ -127,7 +127,7 @@ func NewEchoVault(options ...func(echovault *EchoVault)) *EchoVault {
option(echovault) option(echovault)
} }
if echovault.IsInCluster() { if echovault.isInCluster() {
echovault.raft = raft.NewRaft(raft.Opts{ echovault.raft = raft.NewRaft(raft.Opts{
Config: echovault.Config, Config: echovault.Config,
EchoVault: echovault, EchoVault: echovault,
@@ -365,7 +365,7 @@ func (server *EchoVault) Start(ctx context.Context) {
return return
} }
if server.IsInCluster() { if server.isInCluster() {
// Initialise raft and memberlist // Initialise raft and memberlist
server.raft.RaftInit(ctx) server.raft.RaftInit(ctx)
server.memberList.MemberListInit(ctx) server.memberList.MemberListInit(ctx)
@@ -374,7 +374,7 @@ func (server *EchoVault) Start(ctx context.Context) {
} }
} }
if !server.IsInCluster() { if !server.isInCluster() {
server.initialiseCaches() server.initialiseCaches()
// Restore from AOF by default if it's enabled // Restore from AOF by default if it's enabled
if conf.RestoreAOF { if conf.RestoreAOF {
@@ -402,7 +402,7 @@ func (server *EchoVault) TakeSnapshot() error {
} }
go func() { go func() {
if server.IsInCluster() { if server.isInCluster() {
// Handle snapshot in cluster mode // Handle snapshot in cluster mode
if err := server.raft.TakeSnapshot(); err != nil { if err := server.raft.TakeSnapshot(); err != nil {
log.Println(err) log.Println(err)
@@ -455,7 +455,7 @@ func (server *EchoVault) RewriteAOF() error {
} }
func (server *EchoVault) ShutDown(ctx context.Context) { func (server *EchoVault) ShutDown(ctx context.Context) {
if server.IsInCluster() { if server.isInCluster() {
server.raft.RaftShutdown(ctx) server.raft.RaftShutdown(ctx)
server.memberList.MemberListShutdown(ctx) server.memberList.MemberListShutdown(ctx)
} }

View File

@@ -99,19 +99,19 @@ func (server *EchoVault) KeyExists(ctx context.Context, key string) bool {
} }
if entry.ExpireAt != (time.Time{}) && entry.ExpireAt.Before(time.Now()) { if entry.ExpireAt != (time.Time{}) && entry.ExpireAt.Before(time.Now()) {
if !server.IsInCluster() { if !server.isInCluster() {
// If in standalone mode, delete the key directly. // If in standalone mode, delete the key directly.
err := server.DeleteKey(ctx, key) err := server.DeleteKey(ctx, key)
if err != nil { if err != nil {
log.Printf("keyExists: %+v\n", err) log.Printf("keyExists: %+v\n", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() { } else if server.isInCluster() && server.raft.IsRaftLeader() {
// If we're in a raft cluster, and we're the leader, send command to delete the key in the cluster. // If we're in a raft cluster, and we're the leader, send command to delete the key in the cluster.
err := server.raftApplyDeleteKey(ctx, key) err := server.raftApplyDeleteKey(ctx, key)
if err != nil { if err != nil {
log.Printf("keyExists: %+v\n", err) log.Printf("keyExists: %+v\n", err)
} }
} else if server.IsInCluster() && !server.raft.IsRaftLeader() { } else if server.isInCluster() && !server.raft.IsRaftLeader() {
// Forward message to leader to initiate key deletion. // Forward message to leader to initiate key deletion.
// This is always called regardless of ForwardCommand config value // This is always called regardless of ForwardCommand config value
// because we always want to remove expired keys. // because we always want to remove expired keys.
@@ -179,7 +179,7 @@ func (server *EchoVault) SetValue(ctx context.Context, key string, value interfa
log.Printf("SetValue error: %+v\n", err) log.Printf("SetValue error: %+v\n", err)
} }
if !server.IsInCluster() { if !server.isInCluster() {
server.SnapshotEngine.IncrementChangeCount() server.SnapshotEngine.IncrementChangeCount()
} }
@@ -290,7 +290,7 @@ func (server *EchoVault) DeleteKey(ctx context.Context, key string) error {
// depending on whether an LFU or LRU strategy was used. // depending on whether an LFU or LRU strategy was used.
func (server *EchoVault) updateKeyInCache(ctx context.Context, key string) error { func (server *EchoVault) updateKeyInCache(ctx context.Context, key string) error {
// Only update cache when in standalone mode or when raft leader // Only update cache when in standalone mode or when raft leader
if server.IsInCluster() || (server.IsInCluster() && !server.raft.IsRaftLeader()) { if server.isInCluster() || (server.isInCluster() && !server.raft.IsRaftLeader()) {
return nil return nil
} }
// If max memory is 0, there's no max so no need to update caches // If max memory is 0, there's no max so no need to update caches
@@ -361,12 +361,12 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
} }
key := server.lfuCache.cache.Pop().(string) key := server.lfuCache.cache.Pop().(string)
if !server.IsInCluster() { if !server.isInCluster() {
// If in standalone mode, directly delete the key // If in standalone mode, directly delete the key
if err := server.DeleteKey(ctx, key); err != nil { if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err) return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() { } else if server.isInCluster() && server.raft.IsRaftLeader() {
// If in raft cluster, send command to delete key from cluster // If in raft cluster, send command to delete key from cluster
if err := server.raftApplyDeleteKey(ctx, key); err != nil { if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err) return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
@@ -393,12 +393,12 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
} }
key := server.lruCache.cache.Pop().(string) key := server.lruCache.cache.Pop().(string)
if !server.IsInCluster() { if !server.isInCluster() {
// If in standalone mode, directly delete the key. // If in standalone mode, directly delete the key.
if err := server.DeleteKey(ctx, key); err != nil { if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err) return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() { } else if server.isInCluster() && server.raft.IsRaftLeader() {
// If in cluster mode and the node is a cluster leader, // If in cluster mode and the node is a cluster leader,
// send command to delete the key from the cluster. // send command to delete the key from the cluster.
if err := server.raftApplyDeleteKey(ctx, key); err != nil { if err := server.raftApplyDeleteKey(ctx, key); err != nil {
@@ -427,12 +427,12 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
idx := rand.Intn(len(server.keyLocks)) idx := rand.Intn(len(server.keyLocks))
for key, _ := range server.keyLocks { for key, _ := range server.keyLocks {
if idx == 0 { if idx == 0 {
if !server.IsInCluster() { if !server.isInCluster() {
// If in standalone mode, directly delete the key // If in standalone mode, directly delete the key
if err := server.DeleteKey(ctx, key); err != nil { if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err) return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() { } else if server.isInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, key); err != nil { if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err) return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
} }
@@ -458,12 +458,12 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
key := server.keysWithExpiry.keys[idx] key := server.keysWithExpiry.keys[idx]
server.keysWithExpiry.rwMutex.RUnlock() server.keysWithExpiry.rwMutex.RUnlock()
if !server.IsInCluster() { if !server.isInCluster() {
// If in standalone mode, directly delete the key // If in standalone mode, directly delete the key
if err := server.DeleteKey(ctx, key); err != nil { if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err) return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() { } else if server.isInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, key); err != nil { if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys randome: %+v", err) return fmt.Errorf("adjustMemoryUsage -> volatile keys randome: %+v", err)
} }
@@ -489,7 +489,7 @@ func (server *EchoVault) adjustMemoryUsage(ctx context.Context) error {
// This function is only executed in standalone mode or by the raft cluster leader. // This function is only executed in standalone mode or by the raft cluster leader.
func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error { func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error {
// Only execute this if we're in standalone mode, or raft cluster leader. // Only execute this if we're in standalone mode, or raft cluster leader.
if server.IsInCluster() && !server.raft.IsRaftLeader() { if server.isInCluster() && !server.raft.IsRaftLeader() {
return nil return nil
} }
@@ -536,11 +536,11 @@ func (server *EchoVault) evictKeysWithExpiredTTL(ctx context.Context) error {
// Delete the expired key // Delete the expired key
deletedCount += 1 deletedCount += 1
server.KeyRUnlock(ctx, k) server.KeyRUnlock(ctx, k)
if !server.IsInCluster() { if !server.isInCluster() {
if err := server.DeleteKey(ctx, k); err != nil { if err := server.DeleteKey(ctx, k); err != nil {
return fmt.Errorf("evictKeysWithExpiredTTL -> standalone delete: %+v", err) return fmt.Errorf("evictKeysWithExpiredTTL -> standalone delete: %+v", err)
} }
} else if server.IsInCluster() && server.raft.IsRaftLeader() { } else if server.isInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, k); err != nil { if err := server.raftApplyDeleteKey(ctx, k); err != nil {
return fmt.Errorf("evictKeysWithExpiredTTL -> cluster delete: %+v", err) return fmt.Errorf("evictKeysWithExpiredTTL -> cluster delete: %+v", err)
} }

View File

@@ -83,7 +83,7 @@ func (server *EchoVault) handleCommand(ctx context.Context, message []byte, conn
} }
} }
if !server.IsInCluster() || !synchronize { if !server.isInCluster() || !synchronize {
res, err := handler(ctx, cmd, server, conn) res, err := handler(ctx, cmd, server, conn)
if err != nil { if err != nil {
return nil, err return nil, err