Implemented snapshot persistence on raft layer

This commit is contained in:
Kelvin Clement Mwinuka
2024-01-25 17:52:22 +08:00
parent 1c00706f88
commit 4e12064ce8
6 changed files with 34 additions and 30 deletions

View File

@@ -7,7 +7,8 @@ import (
)
type SnapshotOpts struct {
Config utils.Config
config utils.Config
data map[string]interface{}
}
type Snapshot struct {
@@ -22,10 +23,7 @@ func NewFSMSnapshot(opts SnapshotOpts) *Snapshot {
// Persist implements FSMSnapshot interface
func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
data := map[string]interface{}{}
// TODO: Copy current store contents
o, err := json.Marshal(data)
o, err := json.Marshal(s.options.data)
if err != nil {
sink.Cancel()
@@ -37,8 +35,6 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
return err
}
// TODO: Store data in separate snapshot file
return nil
}

View File

@@ -6,12 +6,12 @@ import (
"github.com/echovault/echovault/src/utils"
"github.com/hashicorp/raft"
"io"
"log"
)
type FSMOpts struct {
Config utils.Config
Server utils.Server
Snapshot raft.FSMSnapshot
GetCommand func(command string) (utils.Command, error)
}
@@ -75,7 +75,10 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} {
// Snapshot implements raft.FSM interface
func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) {
return fsm.options.Snapshot, nil
return NewFSMSnapshot(SnapshotOpts{
config: fsm.options.Config,
data: fsm.options.Server.GetState(),
}), nil
}
// Restore implements raft.FSM interface
@@ -83,20 +86,25 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error {
b, err := io.ReadAll(snapshot)
if err != nil {
log.Fatal(err)
return err
}
data := make(map[string]interface{})
if err := json.Unmarshal(b, &data); err != nil {
log.Fatal(err)
return err
}
// for k, v := range data {
// server.keyLocks[k].Lock()
// server.SetValue(context.Background(), k, v)
// server.keyLocks[k].Unlock()
// }
for k, v := range data {
_, err := fsm.options.Server.CreateKeyAndLock(context.Background(), k)
if err != nil {
log.Fatal(err)
}
fsm.options.Server.SetValue(context.Background(), k, v)
fsm.options.Server.KeyUnlock(k)
}
return nil
}

View File

@@ -17,18 +17,18 @@ import (
raftboltdb "github.com/hashicorp/raft-boltdb"
)
type RaftOpts struct {
type Opts struct {
Config utils.Config
Server utils.Server
GetCommand func(command string) (utils.Command, error)
}
type Raft struct {
options RaftOpts
options Opts
raft *raft.Raft
}
func NewRaft(opts RaftOpts) *Raft {
func NewRaft(opts Opts) *Raft {
return &Raft{
options: opts,
}
@@ -91,11 +91,8 @@ func (r *Raft) RaftInit(ctx context.Context) {
raftServer, err := raft.NewRaft(
raftConfig,
NewFSM(FSMOpts{
Config: r.options.Config,
Server: r.options.Server,
Snapshot: NewFSMSnapshot(SnapshotOpts{
Config: r.options.Config,
}),
Config: r.options.Config,
Server: r.options.Server,
GetCommand: r.options.GetCommand,
}),
logStore,
@@ -108,11 +105,9 @@ func (r *Raft) RaftInit(ctx context.Context) {
log.Fatalf("Could not start node with error; %s", err)
}
r.raft = raftServer
if conf.BootstrapCluster {
// Bootstrap raft cluster
if err := r.raft.BootstrapCluster(raft.Configuration{
// Error can be safely ignored if we're already leader
_ = raftServer.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
@@ -120,10 +115,10 @@ func (r *Raft) RaftInit(ctx context.Context) {
Address: raft.ServerAddress(addr),
},
},
}).Error(); err != nil {
log.Fatal(err)
}
}).Error()
}
r.raft = raftServer
}
func (r *Raft) Apply(cmd []byte, timeout time.Duration) raft.ApplyFuture {

View File

@@ -71,3 +71,7 @@ func (server *Server) GetValue(key string) interface{} {
func (server *Server) SetValue(ctx context.Context, key string, value interface{}) {
server.store[key] = value
}
func (server *Server) GetState() map[string]interface{} {
return server.store
}

View File

@@ -220,7 +220,7 @@ func (server *Server) Start(ctx context.Context) {
if server.IsInCluster() {
// Initialise raft and memberlist
server.raft = raft.NewRaft(raft.RaftOpts{
server.raft = raft.NewRaft(raft.Opts{
Config: conf,
Server: server,
GetCommand: server.getCommand,

View File

@@ -14,6 +14,7 @@ type Server interface {
CreateKeyAndLock(ctx context.Context, key string) (bool, error)
GetValue(key string) interface{}
SetValue(ctx context.Context, key string, value interface{})
GetState() map[string]interface{}
GetAllCommands(ctx context.Context) []Command
GetACL() interface{}
GetPubSub() interface{}