diff --git a/.gitignore b/.gitignore index 70a6c30..50ad5e2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ volumes /coverage/ dist/ -aof/ +src/modules/*/aof \ No newline at end of file diff --git a/src/aof/engine.go b/src/aof/engine.go new file mode 100644 index 0000000..43f8a58 --- /dev/null +++ b/src/aof/engine.go @@ -0,0 +1,167 @@ +package aof + +import ( + "fmt" + logstore "github.com/echovault/echovault/src/aof/log" + "github.com/echovault/echovault/src/aof/preamble" + "log" + "sync" +) + +// This package handles AOF logging in standalone mode only. +// Logging in clusters is handled in the raft layer. + +type Engine struct { + syncStrategy string + directory string + preambleRW preamble.PreambleReadWriter + appendRW logstore.AppendReadWriter + + mut sync.Mutex + logChan chan []byte + logCount uint64 + preambleStore *preamble.PreambleStore + appendStore *logstore.AppendStore + + startRewrite func() + finishRewrite func() + getState func() map[string]interface{} + setValue func(key string, value interface{}) + handleCommand func(command []byte) +} + +func WithStrategy(strategy string) func(engine *Engine) { + return func(engine *Engine) { + engine.syncStrategy = strategy + } +} + +func WithDirectory(directory string) func(engine *Engine) { + return func(engine *Engine) { + engine.directory = directory + } +} + +func WithStartRewriteFunc(f func()) func(engine *Engine) { + return func(engine *Engine) { + engine.startRewrite = f + } +} + +func WithFinishRewriteFunc(f func()) func(engine *Engine) { + return func(engine *Engine) { + engine.finishRewrite = f + } +} + +func WithGetStateFunc(f func() map[string]interface{}) func(engine *Engine) { + return func(engine *Engine) { + engine.getState = f + } +} + +func WithSetValueFunc(f func(key string, value interface{})) func(engine *Engine) { + return func(engine *Engine) { + engine.setValue = f + } +} + +func WithHandleCommandFunc(f func(command []byte)) func(engine *Engine) { + return func(engine *Engine) { + engine.handleCommand = f + } +} + +func WithPreambleReadWriter(rw preamble.PreambleReadWriter) func(engine *Engine) { + return func(engine *Engine) { + engine.preambleRW = rw + } +} + +func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) { + return func(engine *Engine) { + engine.appendRW = rw + } +} + +func NewAOFEngine(options ...func(engine *Engine)) *Engine { + engine := &Engine{ + syncStrategy: "everysec", + directory: "", + mut: sync.Mutex{}, + logChan: make(chan []byte, 4096), + logCount: 0, + startRewrite: func() {}, + finishRewrite: func() {}, + getState: func() map[string]interface{} { return nil }, + setValue: func(key string, value interface{}) {}, + handleCommand: func(command []byte) {}, + } + + for _, option := range options { + option(engine) + } + + // Setup Preamble engine + engine.preambleStore = preamble.NewPreambleStore( + preamble.WithDirectory(engine.directory), + preamble.WithReadWriter(engine.preambleRW), + preamble.WithGetStateFunc(engine.getState), + preamble.WithSetValueFunc(engine.setValue), + ) + + // Setup AOF log store engine + engine.appendStore = logstore.NewAppendStore( + logstore.WithDirectory(engine.directory), + logstore.WithStrategy(engine.syncStrategy), + logstore.WithReadWriter(engine.appendRW), + logstore.WithHandleCommandFunc(engine.handleCommand), + ) + + // 3. Start the goroutine to pick up queued commands in order to write them to the file. + // LogCommand will get the open file handler from the struct top perform the AOF operation. + go func() { + for { + c := <-engine.logChan + if err := engine.appendStore.Write(c); err != nil { + log.Println(fmt.Errorf("new aof engine error: %+v", err)) + } + } + }() + + return engine +} + +func (engine *Engine) QueueCommand(command []byte) { + engine.logChan <- command +} + +func (engine *Engine) RewriteLog() error { + engine.mut.Lock() + defer engine.mut.Unlock() + + engine.startRewrite() + defer engine.finishRewrite() + + // Create AOF preamble + if err := engine.preambleStore.CreatePreamble(); err != nil { + log.Println(fmt.Errorf("rewrite log -> create preamble error: %+v", err)) + } + + // Truncate the AOF file. + if err := engine.appendStore.Truncate(); err != nil { + log.Println(fmt.Errorf("rewrite log -> create aof error: %+v", err)) + } + + return nil +} + +func (engine *Engine) Restore() error { + if err := engine.preambleStore.Restore(); err != nil { + log.Println(fmt.Errorf("restore aof -> restore preamble error: %+v", err)) + } + if err := engine.appendStore.Restore(); err != nil { + log.Println(fmt.Errorf("restore aof -> restore aof error: %+v", err)) + } + return nil +} diff --git a/src/aof/log/store.go b/src/aof/log/store.go new file mode 100644 index 0000000..a0830e4 --- /dev/null +++ b/src/aof/log/store.go @@ -0,0 +1,176 @@ +package log + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "log" + "os" + "path" + "strings" + "sync" + "time" +) + +type AppendReadWriter interface { + io.ReadWriteSeeker + io.Closer + Truncate(size int64) error + Sync() error +} + +type AppendStore struct { + strategy string // Append file sync strategy. Can only be "always", "everysec", or "no + mut sync.Mutex // Store mutex + rw AppendReadWriter // The ReadWriter used to persist and load the log + directory string // The directory for the AOF file if we must create one + handleCommand func(command []byte) // Function to handle command read from AOF log after restore +} + +func WithStrategy(strategy string) func(store *AppendStore) { + return func(store *AppendStore) { + store.strategy = strategy + } +} + +func WithReadWriter(rw AppendReadWriter) func(store *AppendStore) { + return func(store *AppendStore) { + store.rw = rw + } +} + +func WithDirectory(directory string) func(store *AppendStore) { + return func(store *AppendStore) { + store.directory = directory + } +} + +func WithHandleCommandFunc(f func(command []byte)) func(store *AppendStore) { + return func(store *AppendStore) { + store.handleCommand = f + } +} + +func NewAppendStore(options ...func(store *AppendStore)) *AppendStore { + store := &AppendStore{ + directory: "", + strategy: "everysec", + rw: nil, + mut: sync.Mutex{}, + handleCommand: func(command []byte) { + // No-Op + }, + } + + for _, option := range options { + option(store) + } + + // If rw is nil, use a default file at the provided directory + if store.rw == nil { + // Create the directory if it does not exist + err := os.MkdirAll(path.Join(store.directory, "aof"), os.ModePerm) + if err != nil { + log.Println(fmt.Errorf("new append store -> mkdir error: %+v", err)) + } + f, err := os.OpenFile(path.Join(store.directory, "aof", "log.aof"), os.O_RDWR|os.O_CREATE|os.O_APPEND, os.ModePerm) + if err != nil { + log.Println(fmt.Errorf("new append store -> open file error: %+v", err)) + } + store.rw = f + } + + // Start another goroutine that takes handles syncing the content to the file system. + // No need to start this goroutine if sync strategy is anything other than 'everysec'. + if strings.EqualFold(store.strategy, "everysec") { + go func() { + for { + if err := store.Sync(); err != nil { + log.Println(fmt.Errorf("new append store error: %+v", err)) + break + } + <-time.After(1 * time.Second) + } + }() + } + return store +} + +func (store *AppendStore) Write(command []byte) error { + store.mut.Lock() + defer store.mut.Unlock() + // Add new line before writing to AOF file. + out := append(command, []byte("\r\n")...) + if _, err := store.rw.Write(out); err != nil { + return err + } + if strings.EqualFold(store.strategy, "always") { + if err := store.Sync(); err != nil { + return err + } + } + return nil +} + +func (store *AppendStore) Sync() error { + store.mut.Lock() + store.mut.Unlock() + return store.rw.Sync() +} + +func (store *AppendStore) Restore() error { + store.mut.Lock() + defer store.mut.Unlock() + + buf := bufio.NewReader(store.rw) + + var commands [][]byte + var line []byte + + for { + b, _, err := buf.ReadLine() + if err != nil && errors.Is(err, io.EOF) { + break + } else if err != nil { + return err + } + if len(b) <= 0 { + line = append(line, []byte("\r\n\r\n")...) + commands = append(commands, line) + line = []byte{} + continue + } + if len(line) > 0 { + line = append(line, append([]byte("\r\n"), bytes.TrimLeft(b, "\x00")...)...) + continue + } + line = append(line, bytes.TrimLeft(b, "\x00")...) + } + + for _, c := range commands { + store.handleCommand(c) + } + + return nil +} + +func (store *AppendStore) Truncate() error { + store.mut.Lock() + defer store.mut.Unlock() + if err := store.rw.Truncate(0); err != nil { + return err + } + // Seek to the beginning of the file after truncating + if _, err := store.rw.Seek(0, 0); err != nil { + return err + } + return nil +} + +func (store *AppendStore) Close() error { + store.mut.Lock() + defer store.mut.Unlock() + return store.rw.Close() +} diff --git a/src/aof/preamble/store.go b/src/aof/preamble/store.go new file mode 100644 index 0000000..08f2265 --- /dev/null +++ b/src/aof/preamble/store.go @@ -0,0 +1,149 @@ +package preamble + +import ( + "encoding/json" + "fmt" + "io" + "log" + "os" + "path" + "sync" +) + +type PreambleReadWriter interface { + io.ReadWriteSeeker + io.Closer + Truncate(size int64) error + Sync() error +} + +type PreambleStore struct { + rw PreambleReadWriter + mut sync.Mutex + directory string + getState func() map[string]interface{} + setValue func(key string, value interface{}) +} + +func WithReadWriter(rw PreambleReadWriter) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.rw = rw + } +} + +func WithGetStateFunc(f func() map[string]interface{}) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.getState = f + } +} + +func WithSetValueFunc(f func(key string, value interface{})) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.setValue = f + } +} + +func WithDirectory(directory string) func(store *PreambleStore) { + return func(store *PreambleStore) { + store.directory = directory + } +} + +func NewPreambleStore(options ...func(store *PreambleStore)) *PreambleStore { + store := &PreambleStore{ + rw: nil, + mut: sync.Mutex{}, + directory: "", + getState: func() map[string]interface{} { + // No-Op by default + return nil + }, + setValue: func(key string, value interface{}) { + // No-Op by default + }, + } + + for _, option := range options { + option(store) + } + + // If rw is nil, create the default + if store.rw == nil { + err := os.MkdirAll(path.Join(store.directory, "aof"), os.ModePerm) + if err != nil { + log.Println(fmt.Errorf("new preamle store -> mkdir error: %+v", err)) + } + f, err := os.OpenFile(path.Join(store.directory, "aof", "preamble.bin"), os.O_RDWR|os.O_CREATE, os.ModePerm) + if err != nil { + log.Println(fmt.Errorf("new preamble store -> open file error: %+v", err)) + } + store.rw = f + } + + return store +} + +func (store *PreambleStore) CreatePreamble() error { + store.mut.Lock() + store.mut.Unlock() + + // Get current state. + state := store.getState() + o, err := json.Marshal(state) + if err != nil { + return err + } + + // Truncate the preamble first + if err = store.rw.Truncate(0); err != nil { + return err + } + // Seek to the beginning of the file after truncating + if _, err = store.rw.Seek(0, 0); err != nil { + return err + } + + if _, err = store.rw.Write(o); err != nil { + return err + } + + // Sync the changes + if err = store.rw.Sync(); err != nil { + return err + } + + return nil +} + +func (store *PreambleStore) Restore() error { + if store.rw == nil { + return nil + } + + b, err := io.ReadAll(store.rw) + if err != nil { + return err + } + + if len(b) <= 0 { + return nil + } + + state := make(map[string]interface{}) + + if err = json.Unmarshal(b, &state); err != nil { + return err + } + + for key, value := range state { + store.setValue(key, value) + } + + return nil +} + +func (store *PreambleStore) Close() error { + store.mut.Lock() + defer store.mut.Unlock() + return store.rw.Close() +}