diff --git a/src/modules/admin/commands.go b/src/modules/admin/commands.go index 08f4f6b..43a3d6b 100644 --- a/src/modules/admin/commands.go +++ b/src/modules/admin/commands.go @@ -3,6 +3,7 @@ package admin import ( "context" "errors" + "fmt" "github.com/echovault/echovault/src/utils" "net" ) @@ -48,13 +49,17 @@ func NewModule() Plugin { { Command: "lastsave", Categories: []string{utils.AdminCategory, utils.FastCategory, utils.DangerousCategory}, - Description: "(LASTSAVE) Get timestamp for the latest snapshot", + Description: "(LASTSAVE) Get unix timestamp for the latest snapshot in milliseconds.", Sync: false, KeyExtractionFunc: func(cmd []string) ([]string, error) { return []string{}, nil }, HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { - return nil, errors.New("LASTSAVE command not implemented") + msec := server.GetLatestSnapshot() + if msec == 0 { + return nil, errors.New("no snapshot") + } + return []byte(fmt.Sprintf(":%d\r\n\r\n", msec)), nil }, }, { diff --git a/src/raft/fms_snapshot.go b/src/raft/fms_snapshot.go index 4d411fe..3694454 100644 --- a/src/raft/fms_snapshot.go +++ b/src/raft/fms_snapshot.go @@ -4,13 +4,16 @@ import ( "encoding/json" "github.com/echovault/echovault/src/utils" "github.com/hashicorp/raft" + "strconv" + "strings" ) type SnapshotOpts struct { - config utils.Config - data map[string]interface{} - startSnapshot func() - finishSnapshot func() + config utils.Config + data map[string]interface{} + startSnapshot func() + finishSnapshot func() + setLatestSnapshot func(msec int64) } type Snapshot struct { @@ -27,7 +30,18 @@ func NewFSMSnapshot(opts SnapshotOpts) *Snapshot { func (s *Snapshot) Persist(sink raft.SnapshotSink) error { s.options.startSnapshot() - o, err := json.Marshal(s.options.data) + msec, err := strconv.Atoi(strings.Split(sink.ID(), "-")[2]) + if err != nil { + sink.Cancel() + return err + } + + snapshotObject := utils.SnapshotObject{ + State: s.options.data, + LatestSnapshotMilliseconds: int64(msec), + } + + o, err := json.Marshal(snapshotObject) if err != nil { sink.Cancel() @@ -39,6 +53,8 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error { return err } + s.options.setLatestSnapshot(int64(msec)) + return nil } diff --git a/src/raft/fsm.go b/src/raft/fsm.go index 3f17784..f6d4f58 100644 --- a/src/raft/fsm.go +++ b/src/raft/fsm.go @@ -76,10 +76,11 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} { // Snapshot implements raft.FSM interface func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) { return NewFSMSnapshot(SnapshotOpts{ - config: fsm.options.Config, - data: fsm.options.Server.GetState(), - startSnapshot: fsm.options.Server.StartSnapshot, - finishSnapshot: fsm.options.Server.FinishSnapshot, + config: fsm.options.Config, + data: fsm.options.Server.GetState(), + startSnapshot: fsm.options.Server.StartSnapshot, + finishSnapshot: fsm.options.Server.FinishSnapshot, + setLatestSnapshot: fsm.options.Server.SetLatestSnapshot, }), nil } @@ -92,14 +93,18 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error { return err } - data := make(map[string]interface{}) + data := utils.SnapshotObject{ + State: make(map[string]interface{}), + LatestSnapshotMilliseconds: 0, + } if err := json.Unmarshal(b, &data); err != nil { log.Fatal(err) return err } - for k, v := range data { + // Set state + for k, v := range data.State { _, err := fsm.options.Server.CreateKeyAndLock(context.Background(), k) if err != nil { log.Fatal(err) @@ -107,6 +112,8 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error { fsm.options.Server.SetValue(context.Background(), k, v) fsm.options.Server.KeyUnlock(k) } + // Set latest snapshot milliseconds + fsm.options.Server.SetLatestSnapshot(data.LatestSnapshotMilliseconds) return nil } diff --git a/src/raft/raft.go b/src/raft/raft.go index 552d412..544e55a 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -8,7 +8,6 @@ import ( "log" "net" "os" - "path" "path/filepath" "time" @@ -63,7 +62,7 @@ func (r *Raft) RaftInit(ctx context.Context) { stableStore = raft.StableStore(boltdb) - snapshotStore, err = raft.NewFileSnapshotStore(path.Join(conf.DataDir, "snapshots"), 2, os.Stdout) + snapshotStore, err = raft.NewFileSnapshotStore(conf.DataDir, 2, os.Stdout) if err != nil { log.Fatal(err) } @@ -144,10 +143,10 @@ func (r *Raft) HasJoinedCluster() bool { } func (r *Raft) AddVoter( - id raft.ServerID, - address raft.ServerAddress, - prevIndex uint64, - timeout time.Duration, + id raft.ServerID, + address raft.ServerAddress, + prevIndex uint64, + timeout time.Duration, ) error { if r.IsRaftLeader() { raftConfig := r.raft.GetConfiguration() diff --git a/src/server/aof/aof.go b/src/server/aof/aof.go index 7b20395..19d1585 100644 --- a/src/server/aof/aof.go +++ b/src/server/aof/aof.go @@ -1 +1,22 @@ package aof + +import ( + "github.com/echovault/echovault/src/utils" +) + +// This package handles AOF logging in standalone mode only. +// Logging in clusters is handled in the raft layer. + +type Opts struct { + Config utils.Config +} + +type Engine struct { + options Opts +} + +func NewAOFEngine(opts Opts) *Engine { + return &Engine{ + options: opts, + } +} diff --git a/src/server/server.go b/src/server/server.go index 7b92452..2f1daa9 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -9,6 +9,8 @@ import ( "github.com/echovault/echovault/src/modules/acl" "github.com/echovault/echovault/src/modules/pubsub" "github.com/echovault/echovault/src/raft" + "github.com/echovault/echovault/src/server/aof" + "github.com/echovault/echovault/src/server/snapshot" "github.com/echovault/echovault/src/utils" "io" "log" @@ -38,7 +40,10 @@ type Server struct { ACL *acl.ACL PubSub *pubsub.PubSub - SnapshotInProgress atomic.Bool + SnapshotInProgress atomic.Bool + LatestSnapshotMilliseconds atomic.Int64 // Unix epoch in milliseconds + SnapshotEngine *snapshot.Engine + AOFEngine *aof.Engine } func (server *Server) StartTCP(ctx context.Context) { @@ -99,28 +104,26 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { if err != nil { if err == io.EOF { // Connection closed - // TODO: Remove this connection from channel subscriptions break } if err, ok := err.(net.Error); ok && err.Timeout() { // Connection timeout - fmt.Println(err) + log.Println(err) break } if err, ok := err.(tls.RecordHeaderError); ok { // TLS verification error - fmt.Println(err) + log.Println(err) break } - fmt.Println(err) + log.Println(err) break } if cmd, err := utils.Decode(message); err != nil { // Return error to client if _, err := w.Write([]byte(fmt.Sprintf("-Error %s\r\n\r\n", err.Error()))); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } continue } else { @@ -128,8 +131,7 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { if err != nil { if _, err := w.Write([]byte(fmt.Sprintf("-%s\r\n\r\n", err.Error()))); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } continue } @@ -146,8 +148,7 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { if err := server.ACL.AuthorizeConnection(&conn, cmd, command, subCommand); err != nil { if _, err := w.Write([]byte(fmt.Sprintf("-%s\r\n\r\n", err.Error()))); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } continue } @@ -155,13 +156,11 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { if !server.IsInCluster() || !synchronize { if res, err := handler(ctx, cmd, server, &conn); err != nil { if _, err := w.Write([]byte(fmt.Sprintf("-%s\r\n\r\n", err.Error()))); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } } else { if _, err := w.Write(res); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } // TODO: Write successful, add entry to AOF } @@ -172,13 +171,11 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { if server.raft.IsRaftLeader() { if res, err := server.raftApply(ctx, cmd); err != nil { if _, err := w.Write([]byte(fmt.Sprintf("-Error %s\r\n\r\n", err.Error()))); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } } else { if _, err := w.Write(res); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } } continue @@ -188,22 +185,19 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { if server.Config.ForwardCommand { server.memberList.ForwardDataMutation(ctx, message) if _, err := w.Write([]byte(utils.OK_RESPONSE)); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } continue } if _, err := w.Write([]byte("-Error not cluster leader, cannot carry out command\r\n\r\n")); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } } } if err := conn.Close(); err != nil { - // TODO: Log error at configured logger - fmt.Println(err) + log.Println(err) } } @@ -238,6 +232,21 @@ func (server *Server) Start(ctx context.Context) { }) server.raft.RaftInit(ctx) server.memberList.MemberListInit(ctx) + } else { + // Initialize standalone AOF engine + server.AOFEngine = aof.NewAOFEngine(aof.Opts{ + Config: conf, + }) + // Initialize and start standalone snapshot engine + server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{ + Config: conf, + StartSnapshot: server.StartSnapshot, + FinishSnapshot: server.FinishSnapshot, + GetState: server.GetState, + SetLatestSnapshotMilliseconds: server.SetLatestSnapshot, + GetLatestSnapshotMilliseconds: server.GetLatestSnapshot, + }) + server.SnapshotEngine.Start() } server.StartTCP(ctx) @@ -247,17 +256,32 @@ func (server *Server) TakeSnapshot() error { if server.SnapshotInProgress.Load() { return errors.New("snapshot already in progress") } - if server.IsInCluster() { - // Handle snapshot in cluster mode - go func() { - err := server.raft.TakeSnapshot() + + go func() { + if server.IsInCluster() { + // Handle snapshot in cluster mode + if err := server.raft.TakeSnapshot(); err != nil { + log.Println(err) + } + return + } + // Handle snapshot in standalone mode + if err := server.SnapshotEngine.TakeSnapshot(); err != nil { log.Println(err) - }() - } - // Handle snapshot in standalone mode + } + }() + return nil } +func (server *Server) SetLatestSnapshot(msec int64) { + server.LatestSnapshotMilliseconds.Store(msec) +} + +func (server *Server) GetLatestSnapshot() int64 { + return server.LatestSnapshotMilliseconds.Load() +} + func (server *Server) ShutDown(ctx context.Context) { if server.IsInCluster() { server.raft.RaftShutdown(ctx) diff --git a/src/server/snapshot/snapshot.go b/src/server/snapshot/snapshot.go index 4966db1..ca2c758 100644 --- a/src/server/snapshot/snapshot.go +++ b/src/server/snapshot/snapshot.go @@ -1,4 +1,187 @@ package snapshot +import ( + "crypto/md5" + "encoding/json" + "errors" + "fmt" + "github.com/echovault/echovault/src/utils" + "io" + "io/fs" + "log" + "os" + "path" + "time" +) + // This package contains the snapshot engine for standalone mode. // Snapshots in cluster mode will be handled using the raft package in the raft layer. + +type Manifest struct { + LatestSnapshotMilliseconds int64 + LatestSnapshotHash [16]byte +} + +type Opts struct { + Config utils.Config + StartSnapshot func() + FinishSnapshot func() + GetState func() map[string]interface{} + SetLatestSnapshotMilliseconds func(msec int64) + GetLatestSnapshotMilliseconds func() int64 +} + +type Engine struct { + options Opts +} + +func NewSnapshotEngine(opts Opts) *Engine { + return &Engine{ + options: opts, + } +} + +func (engine *Engine) Start() { + // TODO: Start goroutine for periodic snapshots +} + +func (engine *Engine) TakeSnapshot() error { + engine.options.StartSnapshot() + defer engine.options.FinishSnapshot() + + // Extract current time + now := time.Now() + msec := now.UnixNano() / int64(time.Millisecond) + + // Update manifest file to indicate the latest snapshot. + // If manifest file does not exist, create it. + // Manifest object will contain the following information: + // 1. Hash of the snapshot contents. + // 2. Unix time of the latest snapshot taken. + // The information above will be used to determine whether a snapshot should be taken. + // If the hash of the current state equals the hash in the manifest file, skip the snapshot. + // Otherwise, take the snapshot and update the latest snapshot timestamp and hash in the manifest file. + + var firstSnapshot bool // Tracks whether the snapshot being attempted is the first one + + dirname := path.Join(engine.options.Config.DataDir, "snapshots") + if err := os.MkdirAll(dirname, os.ModePerm); err != nil { + log.Println(err) + return err + } + // Open manifest file + var mf *os.File + mf, err := os.Open(path.Join(dirname, "manifest.bin")) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // Create file if it does not exist + mf, err = os.Create(path.Join(dirname, "manifest.bin")) + if err != nil { + log.Println(err) + return err + } + firstSnapshot = true + } else { + log.Println(err) + return err + } + } + + md, err := io.ReadAll(mf) + if err != nil { + log.Println(err) + return err + } + if err := mf.Close(); err != nil { + log.Println(err) + return err + } + + manifest := new(Manifest) + + if !firstSnapshot { + if err = json.Unmarshal(md, manifest); err != nil { + log.Println(err) + return err + } + } + + // Get current state + snapshotObject := utils.SnapshotObject{ + State: engine.options.GetState(), + LatestSnapshotMilliseconds: engine.options.GetLatestSnapshotMilliseconds(), + } + out, err := json.Marshal(snapshotObject) + if err != nil { + log.Println(err) + return err + } + + snapshotHash := md5.Sum(out) + if snapshotHash == manifest.LatestSnapshotHash { + return errors.New("nothing new to snapshot") + } + + // Update the snapshotObject + snapshotObject.LatestSnapshotMilliseconds = msec + // Marshal the updated snapshotObject + out, err = json.Marshal(snapshotObject) + if err != nil { + log.Println(err) + return err + } + + // os.Create will replace the old manifest file + mf, err = os.Create(path.Join(dirname, "manifest.bin")) + if err != nil { + log.Println(err) + return err + } + + // Write the latest manifest data + manifest = &Manifest{ + LatestSnapshotHash: md5.Sum(out), + LatestSnapshotMilliseconds: msec, + } + mo, err := json.Marshal(manifest) + if err != nil { + log.Println(err) + return err + } + if _, err = mf.Write(mo); err != nil { + log.Println(err) + return err + } + if err = mf.Close(); err != nil { + log.Println(err) + return err + } + + // Create snapshot directory + dirname = path.Join(engine.options.Config.DataDir, "snapshots", fmt.Sprintf("%d", msec)) + if err := os.MkdirAll(dirname, os.ModePerm); err != nil { + return err + } + + // Create snapshot file + f, err := os.OpenFile(path.Join(dirname, "state.bin"), os.O_WRONLY|os.O_CREATE, os.ModePerm) + if err != nil { + log.Println(err) + return err + } + defer func() { + if err := f.Close(); err != nil { + log.Println(err) + } + }() + + // Write state to file + if _, err = f.Write(out); err != nil { + return err + } + + // Set the latest snapshot in unix milliseconds + engine.options.SetLatestSnapshotMilliseconds(msec) + + return nil +} diff --git a/src/utils/types.go b/src/utils/types.go index f0d76ca..38ed5f8 100644 --- a/src/utils/types.go +++ b/src/utils/types.go @@ -21,6 +21,8 @@ type Server interface { TakeSnapshot() error StartSnapshot() FinishSnapshot() + SetLatestSnapshot(msec int64) + GetLatestSnapshot() int64 } type ContextServerID string @@ -64,3 +66,8 @@ type Plugin interface { Commands() []Command Description() string } + +type SnapshotObject struct { + State map[string]interface{} + LatestSnapshotMilliseconds int64 +}