Return error from preamble restore method instead of just logging it. Implemented test for AOF engine persist and restore flow.

This commit is contained in:
Kelvin Clement Mwinuka
2024-05-05 01:30:04 +08:00
parent 915c34d9f6
commit baf5f3d5da
7 changed files with 397 additions and 13 deletions

View File

@@ -107,7 +107,7 @@ func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) {
}
}
func NewAOFEngine(options ...func(engine *Engine)) *Engine {
func NewAOFEngine(options ...func(engine *Engine)) (*Engine, error) {
engine := &Engine{
clock: clock.NewClock(),
syncStrategy: "everysec",
@@ -129,22 +129,30 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {
}
// Setup Preamble engine
engine.preambleStore = preamble.NewPreambleStore(
preambleStore, err := preamble.NewPreambleStore(
preamble.WithClock(engine.clock),
preamble.WithDirectory(engine.directory),
preamble.WithReadWriter(engine.preambleRW),
preamble.WithGetStateFunc(engine.getStateFunc),
preamble.WithSetKeyDataFunc(engine.setKeyDataFunc),
)
if err != nil {
return nil, err
}
engine.preambleStore = preambleStore
// Setup AOF log store engine
engine.appendStore = logstore.NewAppendStore(
appendStore, err := logstore.NewAppendStore(
logstore.WithClock(engine.clock),
logstore.WithDirectory(engine.directory),
logstore.WithStrategy(engine.syncStrategy),
logstore.WithReadWriter(engine.appendRW),
logstore.WithHandleCommandFunc(engine.handleCommand),
)
if err != nil {
return nil, err
}
engine.appendStore = appendStore
// 3. Start the goroutine to pick up queued commands in order to write them to the file.
// LogCommand will get the open file handler from the struct top perform the AOF operation.
@@ -157,7 +165,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine {
}
}()
return engine
return engine, nil
}
func (engine *Engine) QueueCommand(command []byte) {
@@ -173,12 +181,12 @@ func (engine *Engine) RewriteLog() error {
// Create AOF preamble
if err := engine.preambleStore.CreatePreamble(); err != nil {
log.Println(fmt.Errorf("rewrite log -> create preamble error: %+v", err))
return fmt.Errorf("rewrite log -> create preamble error: %+v", err)
}
// Truncate the AOF file.
if err := engine.appendStore.Truncate(); err != nil {
log.Println(fmt.Errorf("rewrite log -> create aof error: %+v", err))
return fmt.Errorf("rewrite log -> create aof error: %+v", err)
}
return nil
@@ -186,10 +194,10 @@ func (engine *Engine) RewriteLog() error {
func (engine *Engine) Restore() error {
if err := engine.preambleStore.Restore(); err != nil {
log.Println(fmt.Errorf("restore aof -> restore preamble error: %+v", err))
return fmt.Errorf("restore aof -> restore preamble error: %+v", err)
}
if err := engine.appendStore.Restore(); err != nil {
log.Println(fmt.Errorf("restore aof -> restore aof error: %+v", err))
return fmt.Errorf("restore aof -> restore aof error: %+v", err)
}
return nil
}