Created basic handling for taking snapshots. Will implemented error when snapshot in progress and propagation of snapshot command to the rest of the cluster

This commit is contained in:
Kelvin Clement Mwinuka
2024-01-26 00:58:43 +08:00
parent 63fcb490c9
commit 3c5d6fef2c
5 changed files with 46 additions and 1 deletions

View File

@@ -2,6 +2,7 @@ package admin
import (
"context"
"errors"
"github.com/echovault/echovault/src/utils"
"net"
)
@@ -38,9 +39,36 @@ func NewModule() Plugin {
return []string{}, nil
},
HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
if err := server.TakeSnapshot(); err != nil {
return nil, err
}
return []byte(utils.OK_RESPONSE), nil
},
},
{
Command: "lastsave",
Categories: []string{utils.AdminCategory, utils.FastCategory, utils.DangerousCategory},
Description: "(LASTSAVE) Get timestamp for the latest snapshot",
Sync: false,
KeyExtractionFunc: func(cmd []string) ([]string, error) {
return []string{}, nil
},
HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
return nil, errors.New("LASTSAVE command not implemented")
},
},
{
Command: "bgrewriteaof",
Categories: []string{utils.AdminCategory, utils.SlowCategory, utils.DangerousCategory},
Description: "(BGREWRITEAOF) Trigger re-writing of append process",
Sync: false,
KeyExtractionFunc: func(cmd []string) ([]string, error) {
return []string{}, nil
},
HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
return nil, errors.New("BGREWRITEAOF command not implemented")
},
},
},
}
}

View File

@@ -23,6 +23,7 @@ func NewFSMSnapshot(opts SnapshotOpts) *Snapshot {
// Persist implements FSMSnapshot interface
func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
// TODO: Turn on snapshot in-progress flag
o, err := json.Marshal(s.options.data)
if err != nil {
@@ -39,4 +40,6 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
}
// Release implements FSMSnapshot interface
func (s *Snapshot) Release() {}
func (s *Snapshot) Release() {
// TODO: Turn off snapshot in-progress flag
}

View File

@@ -183,6 +183,10 @@ func (r *Raft) RemoveServer(meta memberlist.NodeMeta) error {
return nil
}
func (r *Raft) TakeSnapshot() error {
return r.raft.Snapshot().Error()
}
func (r *Raft) RaftShutdown(ctx context.Context) {
// Leadership transfer if current node is the leader
if r.IsRaftLeader() {

View File

@@ -240,6 +240,15 @@ func (server *Server) Start(ctx context.Context) {
server.StartTCP(ctx)
}
func (server *Server) TakeSnapshot() error {
// TODO: Check if there's a snapshot currently in progress
go func() {
err := server.raft.TakeSnapshot()
log.Println(err)
}()
return nil
}
func (server *Server) ShutDown(ctx context.Context) {
if server.IsInCluster() {
server.raft.RaftShutdown(ctx)

View File

@@ -18,6 +18,7 @@ type Server interface {
GetAllCommands(ctx context.Context) []Command
GetACL() interface{}
GetPubSub() interface{}
TakeSnapshot() error
}
type ContextServerID string