Removed StartSnapshot, FinishSnapshot, SetLatestSnapshot, and GetLatestSnapshot funcs from EchoVault interface as these are now private functions passed to the raft module using inversion of control

This commit is contained in:
Kelvin Mwinuka
2024-04-04 03:01:44 +08:00
parent 96968278e2
commit 3b0493e1d4
6 changed files with 581 additions and 552 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -24,11 +24,11 @@ import (
) )
type SnapshotOpts struct { type SnapshotOpts struct {
config config.Config config config.Config
data map[string]internal.KeyData data map[string]internal.KeyData
startSnapshot func() startSnapshot func()
finishSnapshot func() finishSnapshot func()
setLatestSnapshot func(msec int64) setLatestSnapshotTime func(msec int64)
} }
type Snapshot struct { type Snapshot struct {
@@ -68,7 +68,7 @@ func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
return err return err
} }
s.options.setLatestSnapshot(int64(msec)) s.options.setLatestSnapshotTime(int64(msec))
return nil return nil
} }

View File

@@ -28,11 +28,14 @@ import (
) )
type FSMOpts struct { type FSMOpts struct {
Config config.Config Config config.Config
EchoVault types.EchoVault EchoVault types.EchoVault
GetState func() map[string]internal.KeyData GetState func() map[string]internal.KeyData
GetCommand func(command string) (types.Command, error) GetCommand func(command string) (types.Command, error)
DeleteKey func(ctx context.Context, key string) error DeleteKey func(ctx context.Context, key string) error
StartSnapshot func()
FinishSnapshot func()
SetLatestSnapshotTime func(msec int64)
} }
type FSM struct { type FSM struct {
@@ -119,11 +122,11 @@ func (fsm *FSM) Apply(log *raft.Log) interface{} {
// Snapshot implements raft.FSM interface // Snapshot implements raft.FSM interface
func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) { func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) {
return NewFSMSnapshot(SnapshotOpts{ return NewFSMSnapshot(SnapshotOpts{
config: fsm.options.Config, config: fsm.options.Config,
startSnapshot: fsm.options.EchoVault.StartSnapshot, startSnapshot: fsm.options.StartSnapshot,
finishSnapshot: fsm.options.EchoVault.FinishSnapshot, finishSnapshot: fsm.options.FinishSnapshot,
setLatestSnapshot: fsm.options.EchoVault.SetLatestSnapshot, setLatestSnapshotTime: fsm.options.SetLatestSnapshotTime,
data: fsm.options.GetState(), data: fsm.options.GetState(),
}), nil }), nil
} }
@@ -159,7 +162,7 @@ func (fsm *FSM) Restore(snapshot io.ReadCloser) error {
fsm.options.EchoVault.KeyUnlock(ctx, k) fsm.options.EchoVault.KeyUnlock(ctx, k)
} }
// Set latest snapshot milliseconds // Set latest snapshot milliseconds
fsm.options.EchoVault.SetLatestSnapshot(data.LatestSnapshotMilliseconds) fsm.options.SetLatestSnapshotTime(data.LatestSnapshotMilliseconds)
return nil return nil
} }

View File

@@ -33,11 +33,14 @@ import (
) )
type Opts struct { type Opts struct {
Config config.Config Config config.Config
EchoVault types.EchoVault EchoVault types.EchoVault
GetState func() map[string]internal.KeyData GetState func() map[string]internal.KeyData
GetCommand func(command string) (types.Command, error) GetCommand func(command string) (types.Command, error)
DeleteKey func(ctx context.Context, key string) error DeleteKey func(ctx context.Context, key string) error
StartSnapshot func()
FinishSnapshot func()
SetLatestSnapshotTime func(msec int64)
} }
type Raft struct { type Raft struct {
@@ -109,11 +112,14 @@ func (r *Raft) RaftInit(ctx context.Context) {
raftServer, err := raft.NewRaft( raftServer, err := raft.NewRaft(
raftConfig, raftConfig,
NewFSM(FSMOpts{ NewFSM(FSMOpts{
Config: r.options.Config, Config: r.options.Config,
EchoVault: r.options.EchoVault, EchoVault: r.options.EchoVault,
GetState: r.options.GetState, GetState: r.options.GetState,
GetCommand: r.options.GetCommand, GetCommand: r.options.GetCommand,
DeleteKey: r.options.DeleteKey, DeleteKey: r.options.DeleteKey,
StartSnapshot: r.options.StartSnapshot,
FinishSnapshot: r.options.FinishSnapshot,
SetLatestSnapshotTime: r.options.SetLatestSnapshotTime,
}), }),
logStore, logStore,
stableStore, stableStore,

View File

@@ -89,24 +89,35 @@ type EchoVault struct {
aofEngine *aof.Engine // AOF engine for standalone mode aofEngine *aof.Engine // AOF engine for standalone mode
} }
// WithContext is an options that for the NewEchoVault function that allows you to
// configure a custom context object to be used in EchoVault. If you don't provide this
// option, EchoVault will create its own internal context object.
func WithContext(ctx context.Context) func(echovault *EchoVault) { func WithContext(ctx context.Context) func(echovault *EchoVault) {
return func(echovault *EchoVault) { return func(echovault *EchoVault) {
echovault.context = ctx echovault.context = ctx
} }
} }
// WithConfig is an option for the NewEchoVault function that allows you to pass a
// custom configuration to EchoVault. If not specified, EchoVault will use the default
// configuration from config.DefaultConfig().
func WithConfig(config config.Config) func(echovault *EchoVault) { func WithConfig(config config.Config) func(echovault *EchoVault) {
return func(echovault *EchoVault) { return func(echovault *EchoVault) {
echovault.config = config echovault.config = config
} }
} }
// WithCommands is an options for the NewEchoVault function that allows you to pass a
// list of commands that should be supported by your EchoVault instance. If you don't pass
// this option, EchoVault will start with no commands loaded.
func WithCommands(commands []types.Command) func(echovault *EchoVault) { func WithCommands(commands []types.Command) func(echovault *EchoVault) {
return func(echovault *EchoVault) { return func(echovault *EchoVault) {
echovault.commands = commands echovault.commands = commands
} }
} }
// NewEchoVault creates a new EchoVault instance.
// This functions accepts the WithContext, WithConfig and WithCommands options.
func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) { func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
echovault := &EchoVault{ echovault := &EchoVault{
context: context.Background(), context: context.Background(),
@@ -131,10 +142,13 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
if echovault.isInCluster() { if echovault.isInCluster() {
echovault.raft = raft.NewRaft(raft.Opts{ echovault.raft = raft.NewRaft(raft.Opts{
Config: echovault.config, Config: echovault.config,
EchoVault: echovault, EchoVault: echovault,
GetCommand: echovault.getCommand, GetCommand: echovault.getCommand,
DeleteKey: echovault.DeleteKey, DeleteKey: echovault.DeleteKey,
StartSnapshot: echovault.startSnapshot,
FinishSnapshot: echovault.finishSnapshot,
SetLatestSnapshotTime: echovault.setLatestSnapshot,
GetState: func() map[string]internal.KeyData { GetState: func() map[string]internal.KeyData {
state := make(map[string]internal.KeyData) state := make(map[string]internal.KeyData)
for k, v := range echovault.getState() { for k, v := range echovault.getState() {
@@ -160,10 +174,10 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
snapshot.WithDirectory(echovault.config.DataDir), snapshot.WithDirectory(echovault.config.DataDir),
snapshot.WithThreshold(echovault.config.SnapShotThreshold), snapshot.WithThreshold(echovault.config.SnapShotThreshold),
snapshot.WithInterval(echovault.config.SnapshotInterval), snapshot.WithInterval(echovault.config.SnapshotInterval),
snapshot.WithStartSnapshotFunc(echovault.StartSnapshot), snapshot.WithStartSnapshotFunc(echovault.startSnapshot),
snapshot.WithFinishSnapshotFunc(echovault.FinishSnapshot), snapshot.WithFinishSnapshotFunc(echovault.finishSnapshot),
snapshot.WithSetLatestSnapshotTimeFunc(echovault.SetLatestSnapshot), snapshot.WithSetLatestSnapshotTimeFunc(echovault.setLatestSnapshot),
snapshot.WithGetLatestSnapshotTimeFunc(echovault.GetLatestSnapshot), snapshot.WithGetLatestSnapshotTimeFunc(echovault.getLatestSnapshotTime),
snapshot.WithGetStateFunc(func() map[string]internal.KeyData { snapshot.WithGetStateFunc(func() map[string]internal.KeyData {
state := make(map[string]internal.KeyData) state := make(map[string]internal.KeyData)
for k, v := range echovault.getState() { for k, v := range echovault.getState() {
@@ -189,8 +203,8 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) {
echovault.aofEngine = aof.NewAOFEngine( echovault.aofEngine = aof.NewAOFEngine(
aof.WithDirectory(echovault.config.DataDir), aof.WithDirectory(echovault.config.DataDir),
aof.WithStrategy(echovault.config.AOFSyncStrategy), aof.WithStrategy(echovault.config.AOFSyncStrategy),
aof.WithStartRewriteFunc(echovault.StartRewriteAOF), aof.WithStartRewriteFunc(echovault.startRewriteAOF),
aof.WithFinishRewriteFunc(echovault.FinishRewriteAOF), aof.WithFinishRewriteFunc(echovault.finishRewriteAOF),
aof.WithGetStateFunc(func() map[string]internal.KeyData { aof.WithGetStateFunc(func() map[string]internal.KeyData {
state := make(map[string]internal.KeyData) state := make(map[string]internal.KeyData)
for k, v := range echovault.getState() { for k, v := range echovault.getState() {
@@ -416,10 +430,16 @@ func (server *EchoVault) handleConnection(conn net.Conn) {
} }
} }
// Start starts the EchoVault instance's TCP listener.
// This allows the instance to accept connections handle client commands over TCP.
//
// You can still use command functions like echovault.SET if you're embedding EchoVault in you application.
// However, if you'd like to also accept TCP request on the same instance, you must call this function.
func (server *EchoVault) Start() { func (server *EchoVault) Start() {
server.startTCP() server.startTCP()
} }
// TakeSnapshot triggers a snapshot when called.
func (server *EchoVault) TakeSnapshot() error { func (server *EchoVault) TakeSnapshot() error {
if server.snapshotInProgress.Load() { if server.snapshotInProgress.Load() {
return errors.New("snapshot already in progress") return errors.New("snapshot already in progress")
@@ -442,27 +462,27 @@ func (server *EchoVault) TakeSnapshot() error {
return nil return nil
} }
func (server *EchoVault) StartSnapshot() { func (server *EchoVault) startSnapshot() {
server.snapshotInProgress.Store(true) server.snapshotInProgress.Store(true)
} }
func (server *EchoVault) FinishSnapshot() { func (server *EchoVault) finishSnapshot() {
server.snapshotInProgress.Store(false) server.snapshotInProgress.Store(false)
} }
func (server *EchoVault) SetLatestSnapshot(msec int64) { func (server *EchoVault) setLatestSnapshot(msec int64) {
server.latestSnapshotMilliseconds.Store(msec) server.latestSnapshotMilliseconds.Store(msec)
} }
func (server *EchoVault) GetLatestSnapshot() int64 { func (server *EchoVault) getLatestSnapshotTime() int64 {
return server.latestSnapshotMilliseconds.Load() return server.latestSnapshotMilliseconds.Load()
} }
func (server *EchoVault) StartRewriteAOF() { func (server *EchoVault) startRewriteAOF() {
server.rewriteAOFInProgress.Store(true) server.rewriteAOFInProgress.Store(true)
} }
func (server *EchoVault) FinishRewriteAOF() { func (server *EchoVault) finishRewriteAOF() {
server.rewriteAOFInProgress.Store(false) server.rewriteAOFInProgress.Store(false)
} }

View File

@@ -37,11 +37,11 @@ type EchoVault interface {
GetACL() interface{} GetACL() interface{}
GetPubSub() interface{} GetPubSub() interface{}
TakeSnapshot() error TakeSnapshot() error
StartSnapshot()
FinishSnapshot()
SetLatestSnapshot(msec int64)
GetLatestSnapshot() int64
RewriteAOF() error RewriteAOF() error
//StartSnapshot()
//FinishSnapshot()
//SetLatestSnapshot(msec int64)
//GetLatestSnapshot() int64
} }
type KeyExtractionFunc func(cmd []string) ([]string, error) type KeyExtractionFunc func(cmd []string) ([]string, error)