From 4eb0258ba47e1b20ee47d0bad5e2987cae31bfad Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 2 May 2023 15:49:46 +0200 Subject: [PATCH] Add automatic bootstrap and recovery --- app/api/api.go | 1 + cluster/cluster.go | 54 ++++++++++++++++++++++++---------- cluster/{fsm.go => store.go} | 56 ++++++++++++++++++++++++++++++++---- 3 files changed, 91 insertions(+), 20 deletions(-) rename cluster/{fsm.go => store.go} (70%) diff --git a/app/api/api.go b/app/api/api.go index 5c115efb..bfb2b6bb 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -628,6 +628,7 @@ func (a *api) start() error { Name: cfg.Name, Path: filepath.Join(cfg.DB.Dir, "cluster"), Bootstrap: cfg.Cluster.Bootstrap, + Recover: cfg.Cluster.Recover, Address: cfg.Cluster.Address, JoinAddress: cfg.Cluster.JoinAddress, CoreAPIAddress: cfg.Address, diff --git a/cluster/cluster.go b/cluster/cluster.go index 436e35ce..cc5ef015 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -941,26 +941,50 @@ func (c *cluster) startRaft(fsm raft.FSM, bootstrap, recover, inmem bool) error cfg.LocalID = raft.ServerID(c.id) cfg.Logger = NewLogger(c.logger, hclog.Debug).Named("raft") - if bootstrap { - hasState, err := raft.HasExistingState(logStore, stableStore, snapshots) + hasState, err := raft.HasExistingState(logStore, stableStore, snapshots) + if err != nil { + return err + } + + if !hasState { + // Bootstrap cluster + configuration := raft.Configuration{ + Servers: []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(c.id), + Address: transport.LocalAddr(), + }, + }, + } + + if err := raft.BootstrapCluster(cfg, logStore, stableStore, snapshots, transport, configuration); err != nil { + return err + } + + c.logger.Debug().Log("raft node bootstrapped") + } else { + // Recover cluster + fsm, err := NewStore() if err != nil { return err } - if !hasState { - configuration := raft.Configuration{ - Servers: []raft.Server{ - { - Suffrage: raft.Voter, - ID: raft.ServerID(c.id), - Address: transport.LocalAddr(), - }, - }, - } - if err := raft.BootstrapCluster(cfg, logStore, stableStore, snapshots, transport, configuration); err != nil { - return err - } + configuration := raft.Configuration{ + Servers: []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(c.id), + Address: transport.LocalAddr(), + }, + }, } + + if err := raft.RecoverCluster(cfg, fsm, logStore, stableStore, snapshots, transport, configuration); err != nil { + return err + } + + c.logger.Debug().Log("raft node recoverd") } // Set up a channel for reliable leader notifications. diff --git a/cluster/fsm.go b/cluster/store.go similarity index 70% rename from cluster/fsm.go rename to cluster/store.go index 37db667f..ac84ffac 100644 --- a/cluster/fsm.go +++ b/cluster/store.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "io" + "sync" "github.com/hashicorp/raft" ) @@ -45,10 +46,15 @@ type addProcessCommand struct { } // Implement a FSM -type store struct{} +type store struct { + lock sync.RWMutex + Nodes map[string]string +} func NewStore() (Store, error) { - return &store{}, nil + return &store{ + Nodes: map[string]string{}, + }, nil } func (s *store) Apply(log *raft.Log) interface{} { @@ -72,23 +78,53 @@ func (s *store) Apply(log *raft.Log) interface{} { json.Unmarshal(b, &cmd) fmt.Printf("addNode: %+v\n", cmd) + + s.lock.Lock() + s.Nodes[cmd.ID] = cmd.Address + s.lock.Unlock() case opRemoveNode: b, _ := json.Marshal(c.Data) cmd := removeNodeCommand{} json.Unmarshal(b, &cmd) fmt.Printf("removeNode: %+v\n", cmd) + + s.lock.Lock() + delete(s.Nodes, cmd.ID) + s.lock.Unlock() } return nil } func (s *store) Snapshot() (raft.FSMSnapshot, error) { fmt.Printf("a snapshot is requested\n") - return &fsmSnapshot{}, nil + + s.lock.Lock() + defer s.lock.Unlock() + + data, err := json.Marshal(s) + if err != nil { + return nil, err + } + + return &fsmSnapshot{ + data: data, + }, nil } func (s *store) Restore(snapshot io.ReadCloser) error { fmt.Printf("a snapshot is restored\n") + + defer snapshot.Close() + + s.lock.Lock() + defer s.lock.Unlock() + + dec := json.NewDecoder(snapshot) + if err := dec.Decode(s); err != nil { + return err + } + return nil } @@ -100,10 +136,20 @@ func (s *store) GetNode(id string) string { return "" } -type fsmSnapshot struct{} +type fsmSnapshot struct { + data []byte +} func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error { + if _, err := sink.Write(s.data); err != nil { + sink.Cancel() + return err + } + + sink.Close() return nil } -func (s *fsmSnapshot) Release() {} +func (s *fsmSnapshot) Release() { + s.data = nil +}