Created flow for deleting key consistently within cluster.

This commit is contained in:
Kelvin Mwinuka
2024-03-11 05:22:38 +08:00
parent 8000cef72d
commit c611dd60ee
6 changed files with 120 additions and 37 deletions

View File

@@ -3,10 +3,12 @@ package raft
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/echovault/echovault/src/utils" "github.com/echovault/echovault/src/utils"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"io" "io"
"log" "log"
"strings"
) )
type FSMOpts struct { type FSMOpts struct {
@@ -43,31 +45,43 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} {
ctx := context.WithValue(context.Background(), utils.ContextServerID("ServerID"), request.ServerID) ctx := context.WithValue(context.Background(), utils.ContextServerID("ServerID"), request.ServerID)
ctx = context.WithValue(ctx, utils.ContextConnID("ConnectionID"), request.ConnectionID) ctx = context.WithValue(ctx, utils.ContextConnID("ConnectionID"), request.ConnectionID)
// Handle command switch strings.ToLower(request.Type) {
command, err := fsm.options.GetCommand(request.CMD[0]) default:
if err != nil {
return utils.ApplyResponse{ return utils.ApplyResponse{
Error: err, Error: fmt.Errorf("unsupported raft command type %s", request.Type),
Response: nil, Response: nil,
} }
}
handler := command.HandlerFunc case "delete-key":
// TODO: Handle key deletion
subCommand, ok := utils.GetSubCommand(command, request.CMD).(utils.SubCommand) case "command":
if ok { // Handle command
handler = subCommand.HandlerFunc command, err := fsm.options.GetCommand(request.CMD[0])
} if err != nil {
return utils.ApplyResponse{
if res, err := handler(ctx, request.CMD, fsm.options.Server, nil); err != nil { Error: err,
return utils.ApplyResponse{ Response: nil,
Error: err, }
Response: nil,
} }
} else {
return utils.ApplyResponse{ handler := command.HandlerFunc
Error: nil,
Response: res, subCommand, ok := utils.GetSubCommand(command, request.CMD).(utils.SubCommand)
if ok {
handler = subCommand.HandlerFunc
}
if res, err := handler(ctx, request.CMD, fsm.options.Server, nil); err != nil {
return utils.ApplyResponse{
Error: err,
Response: nil,
}
} else {
return utils.ApplyResponse{
Error: nil,
Response: res,
}
} }
} }
} }

View File

@@ -3,7 +3,6 @@ package server
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"github.com/echovault/echovault/src/utils" "github.com/echovault/echovault/src/utils"
"time" "time"
@@ -13,25 +12,59 @@ func (server *Server) IsInCluster() bool {
return server.Config.BootstrapCluster || server.Config.JoinAddr != "" return server.Config.BootstrapCluster || server.Config.JoinAddr != ""
} }
func (server *Server) raftApply(ctx context.Context, cmd []string) ([]byte, error) { func (server *Server) raftApplyDeleteKey(ctx context.Context, key string) error {
serverId, _ := ctx.Value(utils.ContextServerID("ServerID")).(string)
deleteKeyRequest := utils.ApplyRequest{
Type: "delete-key",
ServerID: serverId,
ConnectionID: "nil",
Key: key,
}
b, err := json.Marshal(deleteKeyRequest)
if err != nil {
return fmt.Errorf("could not parse delete key request for key: %s", key)
}
applyFuture := server.raft.Apply(b, 500*time.Millisecond)
if err = applyFuture.Error(); err != nil {
return err
}
r, ok := applyFuture.Response().(utils.ApplyResponse)
if !ok {
return fmt.Errorf("unprocessable entity %v", r)
}
if r.Error != nil {
return r.Error
}
return nil
}
func (server *Server) raftApplyCommand(ctx context.Context, cmd []string) ([]byte, error) {
serverId, _ := ctx.Value(utils.ContextServerID("ServerID")).(string) serverId, _ := ctx.Value(utils.ContextServerID("ServerID")).(string)
connectionId, _ := ctx.Value(utils.ContextConnID("ConnectionID")).(string) connectionId, _ := ctx.Value(utils.ContextConnID("ConnectionID")).(string)
applyRequest := utils.ApplyRequest{ applyRequest := utils.ApplyRequest{
Type: "command",
ServerID: serverId, ServerID: serverId,
ConnectionID: connectionId, ConnectionID: connectionId,
CMD: cmd, CMD: cmd,
} }
b, err := json.Marshal(applyRequest) b, err := json.Marshal(applyRequest)
if err != nil { if err != nil {
return nil, errors.New("could not parse request") return nil, fmt.Errorf("could not parse command request for commad: %+v", cmd)
} }
applyFuture := server.raft.Apply(b, 500*time.Millisecond) applyFuture := server.raft.Apply(b, 500*time.Millisecond)
if err := applyFuture.Error(); err != nil { if err = applyFuture.Error(); err != nil {
return nil, err return nil, err
} }

View File

@@ -202,7 +202,6 @@ func (server *Server) RemoveExpiry(key string) {
Value: server.store[key].Value, Value: server.store[key].Value,
ExpireAt: time.Time{}, ExpireAt: time.Time{},
} }
// Remove key from slice of keys associated with expiry // Remove key from slice of keys associated with expiry
server.keysWithExpiry.rwMutex.Lock() server.keysWithExpiry.rwMutex.Lock()
defer server.keysWithExpiry.rwMutex.Unlock() defer server.keysWithExpiry.rwMutex.Unlock()
@@ -294,6 +293,7 @@ func (server *Server) updateKeyInCache(ctx context.Context, key string) error {
return nil return nil
} }
// adjustMemoryUsage should only be called from standalone server or from raft cluster leader.
func (server *Server) adjustMemoryUsage(ctx context.Context) error { func (server *Server) adjustMemoryUsage(ctx context.Context) error {
// If max memory is 0, there's no need to adjust memory usage. // If max memory is 0, there's no need to adjust memory usage.
if server.Config.MaxMemory == 0 { if server.Config.MaxMemory == 0 {
@@ -327,10 +327,20 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
if server.lfuCache.cache.Len() == 0 { if server.lfuCache.cache.Len() == 0 {
return fmt.Errorf("adjsutMemoryUsage -> LFU cache empty") return fmt.Errorf("adjsutMemoryUsage -> LFU cache empty")
} }
key := server.lfuCache.cache.Pop().(string) key := server.lfuCache.cache.Pop().(string)
if err := server.DeleteKey(ctx, key); err != nil { if !server.IsInCluster() {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err) // If in standalone mode, directly delete the key
if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
}
} else if server.IsInCluster() && server.raft.IsRaftLeader() {
// If in raft cluster, send command to delete key from cluster
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LFU cache eviction: %+v", err)
}
} }
// Run garbage collection // Run garbage collection
runtime.GC() runtime.GC()
// Return if we're below max memory // Return if we're below max memory
@@ -349,10 +359,21 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
if server.lruCache.cache.Len() == 0 { if server.lruCache.cache.Len() == 0 {
return fmt.Errorf("adjsutMemoryUsage -> LRU cache empty") return fmt.Errorf("adjsutMemoryUsage -> LRU cache empty")
} }
key := server.lruCache.cache.Pop().(string) key := server.lruCache.cache.Pop().(string)
if err := server.DeleteKey(ctx, key); err != nil { if !server.IsInCluster() {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err) // If in standalone mode, directly delete the key.
if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
}
} else if server.IsInCluster() && server.raft.IsRaftLeader() {
// If in cluster mode and the node is a cluster leader,
// send command to delete the key from the cluster.
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> LRU cache eviction: %+v", err)
}
} }
// Run garbage collection // Run garbage collection
runtime.GC() runtime.GC()
// Return if we're below max memory // Return if we're below max memory
@@ -374,9 +395,15 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
idx := rand.Intn(len(server.keyLocks)) idx := rand.Intn(len(server.keyLocks))
for key, _ := range server.keyLocks { for key, _ := range server.keyLocks {
if idx == 0 { if idx == 0 {
// Lock the key if !server.IsInCluster() {
if err := server.DeleteKey(ctx, key); err != nil { // If in standalone mode, directly delete the key
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err) if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
} else if server.IsInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> all keys random: %+v", err)
}
} }
// Run garbage collection // Run garbage collection
runtime.GC() runtime.GC()
@@ -399,9 +426,15 @@ func (server *Server) adjustMemoryUsage(ctx context.Context) error {
key := server.keysWithExpiry.keys[idx] key := server.keysWithExpiry.keys[idx]
server.keysWithExpiry.rwMutex.RUnlock() server.keysWithExpiry.rwMutex.RUnlock()
// Delete the key if !server.IsInCluster() {
if err := server.DeleteKey(ctx, key); err != nil { // If in standalone mode, directly delete the key
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err) if err := server.DeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys random: %+v", err)
}
} else if server.IsInCluster() && server.raft.IsRaftLeader() {
if err := server.raftApplyDeleteKey(ctx, key); err != nil {
return fmt.Errorf("adjustMemoryUsage -> volatile keys randome: %+v", err)
}
} }
// Run garbage collection // Run garbage collection

View File

@@ -84,7 +84,8 @@ func (server *Server) handleCommand(ctx context.Context, message []byte, conn *n
// Handle other commands that need to be synced across the cluster // Handle other commands that need to be synced across the cluster
if server.raft.IsRaftLeader() { if server.raft.IsRaftLeader() {
res, err := server.raftApply(ctx, cmd) var res []byte
res, err = server.raftApplyCommand(ctx, cmd)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -101,7 +101,7 @@ func NewServer(opts Opts) *Server {
AddVoter: server.raft.AddVoter, AddVoter: server.raft.AddVoter,
RemoveRaftServer: server.raft.RemoveServer, RemoveRaftServer: server.raft.RemoveServer,
IsRaftLeader: server.raft.IsRaftLeader, IsRaftLeader: server.raft.IsRaftLeader,
ApplyMutate: server.raftApply, ApplyMutate: server.raftApplyCommand,
}) })
} else { } else {
// Set up standalone snapshot engine // Set up standalone snapshot engine

View File

@@ -41,9 +41,11 @@ type ContextServerID string
type ContextConnID string type ContextConnID string
type ApplyRequest struct { type ApplyRequest struct {
Type string `json:"Type"` // command | delete-key
ServerID string `json:"ServerID"` ServerID string `json:"ServerID"`
ConnectionID string `json:"ConnectionID"` ConnectionID string `json:"ConnectionID"`
CMD []string `json:"CMD"` CMD []string `json:"CMD"`
Key string `json:"Key"`
} }
type ApplyResponse struct { type ApplyResponse struct {