From 04ddd46d2b5955cd9c5cce6ea6c7f53944d07b86 Mon Sep 17 00:00:00 2001 From: Kelvin Clement Mwinuka Date: Thu, 1 Feb 2024 04:08:34 +0800 Subject: [PATCH] Start goroutine in AOF engine to log queued commands --- src/server/aof/aof.go | 39 +++++++++++++++++++++++++++++---------- src/server/server.go | 6 +++++- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/server/aof/aof.go b/src/server/aof/aof.go index c70c678..c66d5e1 100644 --- a/src/server/aof/aof.go +++ b/src/server/aof/aof.go @@ -1,6 +1,7 @@ package aof import ( + "context" "encoding/json" "github.com/echovault/echovault/src/utils" "log" @@ -20,18 +21,38 @@ type Opts struct { } type Engine struct { - options Opts - mut sync.Mutex + options Opts + mut sync.Mutex + logChan chan []byte + logCount uint64 } func NewAOFEngine(opts Opts) *Engine { return &Engine{ - options: opts, - mut: sync.Mutex{}, + options: opts, + mut: sync.Mutex{}, + logChan: make(chan []byte, 4096), + logCount: 0, } } -func (engine *Engine) LogCommand(command []byte, sync bool) error { +func (engine *Engine) Start(ctx context.Context) { + go func() { + for { + c := <-engine.logChan + if err := engine.LogCommand(c); err != nil { + log.Println(err) + continue + } + } + }() +} + +func (engine *Engine) QueueCommand(command []byte) { + engine.logChan <- command +} + +func (engine *Engine) LogCommand(command []byte) error { engine.mut.Lock() defer engine.mut.Unlock() @@ -60,10 +81,8 @@ func (engine *Engine) LogCommand(command []byte, sync bool) error { return err } - if sync { - if err = f.Sync(); err != nil { - log.Println(err) - } + if err = f.Sync(); err != nil { + log.Println(err) } return nil @@ -114,7 +133,7 @@ func (engine *Engine) RewriteLog() error { return nil } -func (engine *Engine) Restore() error { +func (engine *Engine) Restore(ctx context.Context) error { // Open snapshot file. // If snapshot file exists, set current state to the state in snapshot file. // Open AOF file. diff --git a/src/server/server.go b/src/server/server.go index bb1b6a1..fd9b364 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -176,7 +176,7 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) { log.Println(err) } if utils.IsWriteCommand(command, subCommand) { - // TODO: Queue successful write command instead of logging it directly + go server.AOFEngine.QueueCommand(message) } } server.StateMutationInProgress.Store(false) @@ -256,6 +256,10 @@ func (server *Server) Start(ctx context.Context) { StartRewriteAOF: server.StartRewriteAOF, FinishRewriteAOF: server.FinishRewriteAOF, }) + if err := server.AOFEngine.Restore(ctx); err != nil { + log.Println(err) + } + server.AOFEngine.Start(ctx) // Initialize and start standalone snapshot engine server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{ Config: conf,