Implemented restore function in standalone snapshot engine

This commit is contained in:
Kelvin Clement Mwinuka
2024-01-29 00:22:47 +08:00
parent d31acfbbdd
commit a081971427
2 changed files with 72 additions and 2 deletions

View File

@@ -245,8 +245,14 @@ func (server *Server) Start(ctx context.Context) {
GetState: server.GetState,
SetLatestSnapshotMilliseconds: server.SetLatestSnapshot,
GetLatestSnapshotMilliseconds: server.GetLatestSnapshot,
CreateKeyAndLock: server.CreateKeyAndLock,
KeyUnlock: server.KeyUnlock,
SetValue: server.SetValue,
})
server.SnapshotEngine.Start()
if err := server.SnapshotEngine.Restore(ctx); err != nil {
log.Println(err)
}
server.SnapshotEngine.Start(ctx)
}
server.StartTCP(ctx)

View File

@@ -1,6 +1,7 @@
package snapshot
import (
"context"
"crypto/md5"
"encoding/json"
"errors"
@@ -29,6 +30,9 @@ type Opts struct {
GetState func() map[string]interface{}
SetLatestSnapshotMilliseconds func(msec int64)
GetLatestSnapshotMilliseconds func() int64
CreateKeyAndLock func(ctx context.Context, key string) (bool, error)
KeyUnlock func(key string)
SetValue func(ctx context.Context, key string, value interface{})
}
type Engine struct {
@@ -41,7 +45,7 @@ func NewSnapshotEngine(opts Opts) *Engine {
}
}
func (engine *Engine) Start() {
func (engine *Engine) Start(ctx context.Context) {
// TODO: Start goroutine for periodic snapshots
}
@@ -185,3 +189,63 @@ func (engine *Engine) TakeSnapshot() error {
return nil
}
func (engine *Engine) Restore(ctx context.Context) error {
mf, err := os.Open(path.Join(engine.options.Config.DataDir, "snapshots", "manifest.bin"))
if err != nil && errors.Is(err, fs.ErrNotExist) {
return errors.New("no snapshot manifest, skipping snapshot restore")
}
if err != nil {
return err
}
manifest := new(Manifest)
md, err := io.ReadAll(mf)
if err != nil {
return err
}
if err = json.Unmarshal(md, manifest); err != nil {
return err
}
if manifest.LatestSnapshotMilliseconds == 0 {
return errors.New("no snapshot to restore")
}
sf, err := os.Open(path.Join(
engine.options.Config.DataDir,
"snapshots",
fmt.Sprintf("%d", manifest.LatestSnapshotMilliseconds),
"state.bin"))
if err != nil && errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("snapshot file %d/state.bin not found, skipping snapshot", manifest.LatestSnapshotMilliseconds)
}
if err != nil {
return err
}
sd, err := io.ReadAll(sf)
if err != nil {
return nil
}
snapshotObject := new(utils.SnapshotObject)
if err = json.Unmarshal(sd, snapshotObject); err != nil {
return err
}
engine.options.SetLatestSnapshotMilliseconds(snapshotObject.LatestSnapshotMilliseconds)
for key, value := range snapshotObject.State {
if _, err = engine.options.CreateKeyAndLock(ctx, key); err != nil {
log.Println(fmt.Errorf("could not load value at key %s with error: %s", key, err.Error()))
}
engine.options.SetValue(ctx, key, value)
engine.options.KeyUnlock(key)
}
return nil
}