Track src/aof folder but ignore aof subfolders in modules folder

This commit is contained in:
Kelvin Clement Mwinuka
2024-03-05 22:52:32 +08:00
parent 5c347cb7de
commit f23bbd481e
4 changed files with 493 additions and 1 deletions

2
.gitignore vendored
View File

@@ -5,4 +5,4 @@ volumes
/coverage/
dist/
aof/
src/modules/*/aof

167
src/aof/engine.go Normal file
View File

@@ -0,0 +1,167 @@
package aof
import (
"fmt"
logstore "github.com/echovault/echovault/src/aof/log"
"github.com/echovault/echovault/src/aof/preamble"
"log"
"sync"
)
// This package handles AOF logging in standalone mode only.
// Logging in clusters is handled in the raft layer.
type Engine struct {
syncStrategy string
directory string
preambleRW preamble.PreambleReadWriter
appendRW logstore.AppendReadWriter
mut sync.Mutex
logChan chan []byte
logCount uint64
preambleStore *preamble.PreambleStore
appendStore *logstore.AppendStore
startRewrite func()
finishRewrite func()
getState func() map[string]interface{}
setValue func(key string, value interface{})
handleCommand func(command []byte)
}
func WithStrategy(strategy string) func(engine *Engine) {
return func(engine *Engine) {
engine.syncStrategy = strategy
}
}
func WithDirectory(directory string) func(engine *Engine) {
return func(engine *Engine) {
engine.directory = directory
}
}
func WithStartRewriteFunc(f func()) func(engine *Engine) {
return func(engine *Engine) {
engine.startRewrite = f
}
}
func WithFinishRewriteFunc(f func()) func(engine *Engine) {
return func(engine *Engine) {
engine.finishRewrite = f
}
}
func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) {
return func(engine *Engine) {
engine.getState = f
}
}
func WithSetValueFunc(f func(key string, value interface{})) func(engine *Engine) {
return func(engine *Engine) {
engine.setValue = f
}
}
func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) {
return func(engine *Engine) {
engine.handleCommand = f
}
}
func WithPreambleReadWriter(rw preamble.PreambleReadWriter) func(engine *Engine) {
return func(engine *Engine) {
engine.preambleRW = rw
}
}
func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) {
return func(engine *Engine) {
engine.appendRW = rw
}
}
func NewAOFEngine(options ...func(engine *Engine)) *Engine {
engine := &Engine{
syncStrategy: "everysec",
directory: "",
mut: sync.Mutex{},
logChan: make(chan []byte, 4096),
logCount: 0,
startRewrite: func() {},
finishRewrite: func() {},
getState: func() map[string]interface{} { return nil },
setValue: func(key string, value interface{}) {},
handleCommand: func(command []byte) {},
}
for _, option := range options {
option(engine)
}
// Setup Preamble engine
engine.preambleStore = preamble.NewPreambleStore(
preamble.WithDirectory(engine.directory),
preamble.WithReadWriter(engine.preambleRW),
preamble.WithGetStateFunc(engine.getState),
preamble.WithSetValueFunc(engine.setValue),
)
// Setup AOF log store engine
engine.appendStore = logstore.NewAppendStore(
logstore.WithDirectory(engine.directory),
logstore.WithStrategy(engine.syncStrategy),
logstore.WithReadWriter(engine.appendRW),
logstore.WithHandleCommandFunc(engine.handleCommand),
)
// 3. Start the goroutine to pick up queued commands in order to write them to the file.
// LogCommand will get the open file handler from the struct top perform the AOF operation.
go func() {
for {
c := <-engine.logChan
if err := engine.appendStore.Write(c); err != nil {
log.Println(fmt.Errorf("new aof engine error: %+v", err))
}
}
}()
return engine
}
func (engine *Engine) QueueCommand(command []byte) {
engine.logChan <- command
}
func (engine *Engine) RewriteLog() error {
engine.mut.Lock()
defer engine.mut.Unlock()
engine.startRewrite()
defer engine.finishRewrite()
// Create AOF preamble
if err := engine.preambleStore.CreatePreamble(); err != nil {
log.Println(fmt.Errorf("rewrite log -> create preamble error: %+v", err))
}
// Truncate the AOF file.
if err := engine.appendStore.Truncate(); err != nil {
log.Println(fmt.Errorf("rewrite log -> create aof error: %+v", err))
}
return nil
}
func (engine *Engine) Restore() error {
if err := engine.preambleStore.Restore(); err != nil {
log.Println(fmt.Errorf("restore aof -> restore preamble error: %+v", err))
}
if err := engine.appendStore.Restore(); err != nil {
log.Println(fmt.Errorf("restore aof -> restore aof error: %+v", err))
}
return nil
}

176
src/aof/log/store.go Normal file
View File

@@ -0,0 +1,176 @@
package log
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"log"
"os"
"path"
"strings"
"sync"
"time"
)
type AppendReadWriter interface {
io.ReadWriteSeeker
io.Closer
Truncate(size int64) error
Sync() error
}
type AppendStore struct {
strategy string // Append file sync strategy. Can only be "always", "everysec", or "no
mut sync.Mutex // Store mutex
rw AppendReadWriter // The ReadWriter used to persist and load the log
directory string // The directory for the AOF file if we must create one
handleCommand func(command []byte) // Function to handle command read from AOF log after restore
}
func WithStrategy(strategy string) func(store *AppendStore) {
return func(store *AppendStore) {
store.strategy = strategy
}
}
func WithReadWriter(rw AppendReadWriter) func(store *AppendStore) {
return func(store *AppendStore) {
store.rw = rw
}
}
func WithDirectory(directory string) func(store *AppendStore) {
return func(store *AppendStore) {
store.directory = directory
}
}
func WithHandleCommandFunc(f func(command []byte)) func(store *AppendStore) {
return func(store *AppendStore) {
store.handleCommand = f
}
}
func NewAppendStore(options ...func(store *AppendStore)) *AppendStore {
store := &AppendStore{
directory: "",
strategy: "everysec",
rw: nil,
mut: sync.Mutex{},
handleCommand: func(command []byte) {
// No-Op
},
}
for _, option := range options {
option(store)
}
// If rw is nil, use a default file at the provided directory
if store.rw == nil {
// Create the directory if it does not exist
err := os.MkdirAll(path.Join(store.directory, "aof"), os.ModePerm)
if err != nil {
log.Println(fmt.Errorf("new append store -> mkdir error: %+v", err))
}
f, err := os.OpenFile(path.Join(store.directory, "aof", "log.aof"), os.O_RDWR|os.O_CREATE|os.O_APPEND, os.ModePerm)
if err != nil {
log.Println(fmt.Errorf("new append store -> open file error: %+v", err))
}
store.rw = f
}
// Start another goroutine that takes handles syncing the content to the file system.
// No need to start this goroutine if sync strategy is anything other than 'everysec'.
if strings.EqualFold(store.strategy, "everysec") {
go func() {
for {
if err := store.Sync(); err != nil {
log.Println(fmt.Errorf("new append store error: %+v", err))
break
}
<-time.After(1 * time.Second)
}
}()
}
return store
}
func (store *AppendStore) Write(command []byte) error {
store.mut.Lock()
defer store.mut.Unlock()
// Add new line before writing to AOF file.
out := append(command, []byte("\r\n")...)
if _, err := store.rw.Write(out); err != nil {
return err
}
if strings.EqualFold(store.strategy, "always") {
if err := store.Sync(); err != nil {
return err
}
}
return nil
}
func (store *AppendStore) Sync() error {
store.mut.Lock()
store.mut.Unlock()
return store.rw.Sync()
}
func (store *AppendStore) Restore() error {
store.mut.Lock()
defer store.mut.Unlock()
buf := bufio.NewReader(store.rw)
var commands [][]byte
var line []byte
for {
b, _, err := buf.ReadLine()
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
if len(b) <= 0 {
line = append(line, []byte("\r\n\r\n")...)
commands = append(commands, line)
line = []byte{}
continue
}
if len(line) > 0 {
line = append(line, append([]byte("\r\n"), bytes.TrimLeft(b, "\x00")...)...)
continue
}
line = append(line, bytes.TrimLeft(b, "\x00")...)
}
for _, c := range commands {
store.handleCommand(c)
}
return nil
}
func (store *AppendStore) Truncate() error {
store.mut.Lock()
defer store.mut.Unlock()
if err := store.rw.Truncate(0); err != nil {
return err
}
// Seek to the beginning of the file after truncating
if _, err := store.rw.Seek(0, 0); err != nil {
return err
}
return nil
}
func (store *AppendStore) Close() error {
store.mut.Lock()
defer store.mut.Unlock()
return store.rw.Close()
}

149
src/aof/preamble/store.go Normal file
View File

@@ -0,0 +1,149 @@
package preamble
import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"sync"
)
type PreambleReadWriter interface {
io.ReadWriteSeeker
io.Closer
Truncate(size int64) error
Sync() error
}
type PreambleStore struct {
rw PreambleReadWriter
mut sync.Mutex
directory string
getState func() map[string]interface{}
setValue func(key string, value interface{})
}
func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.rw = rw
}
}
func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.getState = f
}
}
func WithSetValueFunc(f func(key string, value interface{})) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.setValue = f
}
}
func WithDirectory(directory string) func(store *PreambleStore) {
return func(store *PreambleStore) {
store.directory = directory
}
}
func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore {
store := &PreambleStore{
rw: nil,
mut: sync.Mutex{},
directory: "",
getState: func() map[string]interface{} {
// No-Op by default
return nil
},
setValue: func(key string, value interface{}) {
// No-Op by default
},
}
for _, option := range options {
option(store)
}
// If rw is nil, create the default
if store.rw == nil {
err := os.MkdirAll(path.Join(store.directory, "aof"), os.ModePerm)
if err != nil {
log.Println(fmt.Errorf("new preamle store -> mkdir error: %+v", err))
}
f, err := os.OpenFile(path.Join(store.directory, "aof", "preamble.bin"), os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
log.Println(fmt.Errorf("new preamble store -> open file error: %+v", err))
}
store.rw = f
}
return store
}
func (store *PreambleStore) CreatePreamble() error {
store.mut.Lock()
store.mut.Unlock()
// Get current state.
state := store.getState()
o, err := json.Marshal(state)
if err != nil {
return err
}
// Truncate the preamble first
if err = store.rw.Truncate(0); err != nil {
return err
}
// Seek to the beginning of the file after truncating
if _, err = store.rw.Seek(0, 0); err != nil {
return err
}
if _, err = store.rw.Write(o); err != nil {
return err
}
// Sync the changes
if err = store.rw.Sync(); err != nil {
return err
}
return nil
}
func (store *PreambleStore) Restore() error {
if store.rw == nil {
return nil
}
b, err := io.ReadAll(store.rw)
if err != nil {
return err
}
if len(b) <= 0 {
return nil
}
state := make(map[string]interface{})
if err = json.Unmarshal(b, &state); err != nil {
return err
}
for key, value := range state {
store.setValue(key, value)
}
return nil
}
func (store *PreambleStore) Close() error {
store.mut.Lock()
defer store.mut.Unlock()
return store.rw.Close()
}