Start goroutine in AOF engine to log queued commands

This commit is contained in:
Kelvin Clement Mwinuka
2024-02-01 04:08:34 +08:00
parent 58ecde9d92
commit 04ddd46d2b
2 changed files with 34 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
package aof
import (
"context"
"encoding/json"
"github.com/echovault/echovault/src/utils"
"log"
@@ -20,18 +21,38 @@ type Opts struct {
}
type Engine struct {
options Opts
mut sync.Mutex
options Opts
mut sync.Mutex
logChan chan []byte
logCount uint64
}
func NewAOFEngine(opts Opts) *Engine {
return &Engine{
options: opts,
mut: sync.Mutex{},
options: opts,
mut: sync.Mutex{},
logChan: make(chan []byte, 4096),
logCount: 0,
}
}
func (engine *Engine) LogCommand(command []byte, sync bool) error {
func (engine *Engine) Start(ctx context.Context) {
go func() {
for {
c := <-engine.logChan
if err := engine.LogCommand(c); err != nil {
log.Println(err)
continue
}
}
}()
}
func (engine *Engine) QueueCommand(command []byte) {
engine.logChan <- command
}
func (engine *Engine) LogCommand(command []byte) error {
engine.mut.Lock()
defer engine.mut.Unlock()
@@ -60,10 +81,8 @@ func (engine *Engine) LogCommand(command []byte, sync bool) error {
return err
}
if sync {
if err = f.Sync(); err != nil {
log.Println(err)
}
if err = f.Sync(); err != nil {
log.Println(err)
}
return nil
@@ -114,7 +133,7 @@ func (engine *Engine) RewriteLog() error {
return nil
}
func (engine *Engine) Restore() error {
func (engine *Engine) Restore(ctx context.Context) error {
// Open snapshot file.
// If snapshot file exists, set current state to the state in snapshot file.
// Open AOF file.

View File

@@ -176,7 +176,7 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) {
log.Println(err)
}
if utils.IsWriteCommand(command, subCommand) {
// TODO: Queue successful write command instead of logging it directly
go server.AOFEngine.QueueCommand(message)
}
}
server.StateMutationInProgress.Store(false)
@@ -256,6 +256,10 @@ func (server *Server) Start(ctx context.Context) {
StartRewriteAOF: server.StartRewriteAOF,
FinishRewriteAOF: server.FinishRewriteAOF,
})
if err := server.AOFEngine.Restore(ctx); err != nil {
log.Println(err)
}
server.AOFEngine.Start(ctx)
// Initialize and start standalone snapshot engine
server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{
Config: conf,