diff --git a/src/modules/admin/commands.go b/src/modules/admin/commands.go index 046b2a6..48f8ee3 100644 --- a/src/modules/admin/commands.go +++ b/src/modules/admin/commands.go @@ -2,6 +2,7 @@ package admin import ( "context" + "errors" "github.com/echovault/echovault/src/utils" "net" ) @@ -38,9 +39,36 @@ func NewModule() Plugin { return []string{}, nil }, HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { + if err := server.TakeSnapshot(); err != nil { + return nil, err + } return []byte(utils.OK_RESPONSE), nil }, }, + { + Command: "lastsave", + Categories: []string{utils.AdminCategory, utils.FastCategory, utils.DangerousCategory}, + Description: "(LASTSAVE) Get timestamp for the latest snapshot", + Sync: false, + KeyExtractionFunc: func(cmd []string) ([]string, error) { + return []string{}, nil + }, + HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { + return nil, errors.New("LASTSAVE command not implemented") + }, + }, + { + Command: "bgrewriteaof", + Categories: []string{utils.AdminCategory, utils.SlowCategory, utils.DangerousCategory}, + Description: "(BGREWRITEAOF) Trigger re-writing of append process", + Sync: false, + KeyExtractionFunc: func(cmd []string) ([]string, error) { + return []string{}, nil + }, + HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { + return nil, errors.New("BGREWRITEAOF command not implemented") + }, + }, }, } } diff --git a/src/raft/fms_snapshot.go b/src/raft/fms_snapshot.go index 3ff0066..cab4247 100644 --- a/src/raft/fms_snapshot.go +++ b/src/raft/fms_snapshot.go @@ -23,6 +23,7 @@ func NewFSMSnapshot(opts SnapshotOpts) *Snapshot { // Persist implements FSMSnapshot interface func (s *Snapshot) Persist(sink raft.SnapshotSink) error { + // TODO: Turn on snapshot in-progress flag o, err := json.Marshal(s.options.data) if err != nil { @@ -39,4 +40,6 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error { } // Release implements FSMSnapshot interface -func (s *Snapshot) Release() {} +func (s *Snapshot) Release() { + // TODO: Turn off snapshot in-progress flag +} diff --git a/src/raft/raft.go b/src/raft/raft.go index 6bc61f4..1f9d4d4 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -183,6 +183,10 @@ func (r *Raft) RemoveServer(meta memberlist.NodeMeta) error { return nil } +func (r *Raft) TakeSnapshot() error { + return r.raft.Snapshot().Error() +} + func (r *Raft) RaftShutdown(ctx context.Context) { // Leadership transfer if current node is the leader if r.IsRaftLeader() { diff --git a/src/server/server.go b/src/server/server.go index b32f51b..3646eab 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -240,6 +240,15 @@ func (server *Server) Start(ctx context.Context) { server.StartTCP(ctx) } +func (server *Server) TakeSnapshot() error { + // TODO: Check if there's a snapshot currently in progress + go func() { + err := server.raft.TakeSnapshot() + log.Println(err) + }() + return nil +} + func (server *Server) ShutDown(ctx context.Context) { if server.IsInCluster() { server.raft.RaftShutdown(ctx) diff --git a/src/utils/types.go b/src/utils/types.go index 127f4ea..b8f4c94 100644 --- a/src/utils/types.go +++ b/src/utils/types.go @@ -18,6 +18,7 @@ type Server interface { GetAllCommands(ctx context.Context) []Command GetACL() interface{} GetPubSub() interface{} + TakeSnapshot() error } type ContextServerID string