Implemented RewriteLog functionality and defined handler for REWRITE AOF command.

This commit is contained in:
Kelvin Clement Mwinuka
2024-01-30 23:33:24 +08:00
parent dff80940cd
commit 25b2cb7154
4 changed files with 47 additions and 4 deletions

View File

@@ -71,7 +71,10 @@ func NewModule() Plugin {
return []string{}, nil return []string{}, nil
}, },
HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) { HandlerFunc: func(ctx context.Context, cmd []string, server utils.Server, conn *net.Conn) ([]byte, error) {
return nil, errors.New("BGREWRITEAOF command not implemented") if err := server.RewriteAOF(); err != nil {
return nil, err
}
return []byte(utils.OK_RESPONSE), nil
}, },
}, },
}, },

View File

@@ -1,6 +1,7 @@
package aof package aof
import ( import (
"encoding/json"
"github.com/echovault/echovault/src/utils" "github.com/echovault/echovault/src/utils"
"log" "log"
"os" "os"
@@ -12,7 +13,8 @@ import (
// Logging in clusters is handled in the raft layer. // Logging in clusters is handled in the raft layer.
type Opts struct { type Opts struct {
Config utils.Config Config utils.Config
GetState func() map[string]interface{}
} }
type Engine struct { type Engine struct {
@@ -60,10 +62,41 @@ func (engine *Engine) LogCommand(command []byte) error {
} }
func (engine *Engine) RewriteLog() error { func (engine *Engine) RewriteLog() error {
engine.mut.Lock()
defer engine.mut.Unlock()
// Get current state. // Get current state.
state := engine.options.GetState()
o, err := json.Marshal(state)
if err != nil {
return err
}
// Replace snapshot contents file with current state. // Replace snapshot contents file with current state.
// Close snapshot file. sf, err := os.Create(path.Join(engine.options.Config.DataDir, "aof", "snapshot.bin"))
if err != nil {
return err
}
defer func() {
if err = sf.Close(); err != nil {
log.Println(err)
}
}()
if _, err = sf.Write(o); err != nil {
return err
}
// Replace aof file with empty file. // Replace aof file with empty file.
aof, err := os.Create(path.Join(engine.options.Config.DataDir, "aof", "log.aof"))
if err != nil {
return err
}
defer func() {
if err = aof.Close(); err != nil {
log.Println(err)
}
}()
return nil return nil
} }

View File

@@ -242,7 +242,8 @@ func (server *Server) Start(ctx context.Context) {
} else { } else {
// Initialize standalone AOF engine // Initialize standalone AOF engine
server.AOFEngine = aof.NewAOFEngine(aof.Opts{ server.AOFEngine = aof.NewAOFEngine(aof.Opts{
Config: conf, Config: conf,
GetState: server.GetState,
}) })
// Initialize and start standalone snapshot engine // Initialize and start standalone snapshot engine
server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{ server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{
@@ -295,6 +296,11 @@ func (server *Server) GetLatestSnapshot() int64 {
return server.LatestSnapshotMilliseconds.Load() return server.LatestSnapshotMilliseconds.Load()
} }
func (server *Server) RewriteAOF() error {
// TODO: Make this concurrent
return server.AOFEngine.RewriteLog()
}
func (server *Server) ShutDown(ctx context.Context) { func (server *Server) ShutDown(ctx context.Context) {
if server.IsInCluster() { if server.IsInCluster() {
server.raft.RaftShutdown(ctx) server.raft.RaftShutdown(ctx)

View File

@@ -23,6 +23,7 @@ type Server interface {
FinishSnapshot() FinishSnapshot()
SetLatestSnapshot(msec int64) SetLatestSnapshot(msec int64)
GetLatestSnapshot() int64 GetLatestSnapshot() int64
RewriteAOF() error
} }
type ContextServerID string type ContextServerID string