mirror of
https://github.com/EchoVault/SugarDB.git
synced 2025-09-28 21:02:16 +08:00
201 lines
5.5 KiB
Go
201 lines
5.5 KiB
Go
// Copyright 2024 Kelvin Clement Mwinuka
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package aof handles AOF logging in standalone mode only.
|
|
// Logging in replication clusters is handled in the raft layer.
|
|
package aof
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/echovault/sugardb/internal"
|
|
logstore "github.com/echovault/sugardb/internal/aof/log"
|
|
"github.com/echovault/sugardb/internal/aof/preamble"
|
|
"github.com/echovault/sugardb/internal/clock"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
type Engine struct {
|
|
clock clock.Clock
|
|
syncStrategy string
|
|
directory string
|
|
preambleRW preamble.ReadWriter
|
|
appendRW logstore.ReadWriter
|
|
|
|
mut sync.Mutex
|
|
logCount uint64
|
|
preambleStore *preamble.Store
|
|
appendStore *logstore.Store
|
|
|
|
startRewriteFunc func()
|
|
finishRewriteFunc func()
|
|
getStateFunc func() map[int]map[string]internal.KeyData
|
|
setKeyDataFunc func(database int, key string, data internal.KeyData)
|
|
handleCommand func(database int, command []byte)
|
|
}
|
|
|
|
func WithClock(clock clock.Clock) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.clock = clock
|
|
}
|
|
}
|
|
|
|
func WithStrategy(strategy string) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.syncStrategy = strategy
|
|
}
|
|
}
|
|
|
|
func WithDirectory(directory string) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.directory = directory
|
|
}
|
|
}
|
|
|
|
func WithStartRewriteFunc(f func()) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.startRewriteFunc = f
|
|
}
|
|
}
|
|
|
|
func WithFinishRewriteFunc(f func()) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.finishRewriteFunc = f
|
|
}
|
|
}
|
|
|
|
func WithGetStateFunc(f func() map[int]map[string]internal.KeyData) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.getStateFunc = f
|
|
}
|
|
}
|
|
|
|
func WithSetKeyDataFunc(f func(database int, key string, data internal.KeyData)) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.setKeyDataFunc = f
|
|
}
|
|
}
|
|
|
|
func WithHandleCommandFunc(f func(database int, command []byte)) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.handleCommand = f
|
|
}
|
|
}
|
|
|
|
func WithPreambleReadWriter(rw preamble.ReadWriter) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.preambleRW = rw
|
|
}
|
|
}
|
|
|
|
func WithAppendReadWriter(rw logstore.ReadWriter) func(engine *Engine) {
|
|
return func(engine *Engine) {
|
|
engine.appendRW = rw
|
|
}
|
|
}
|
|
|
|
func NewAOFEngine(options ...func(engine *Engine)) (*Engine, error) {
|
|
engine := &Engine{
|
|
clock: clock.NewClock(),
|
|
syncStrategy: "everysec",
|
|
directory: "",
|
|
mut: sync.Mutex{},
|
|
logCount: 0,
|
|
startRewriteFunc: func() {},
|
|
finishRewriteFunc: func() {},
|
|
getStateFunc: func() map[int]map[string]internal.KeyData { return nil },
|
|
setKeyDataFunc: func(database int, key string, data internal.KeyData) {},
|
|
handleCommand: func(database int, command []byte) {},
|
|
}
|
|
|
|
// Setup AOFEngine options first as these options are used
|
|
// when setting up the PreambleStore and AppendStore
|
|
for _, option := range options {
|
|
option(engine)
|
|
}
|
|
|
|
// Setup Preamble engine
|
|
preambleStore, err := preamble.NewPreambleStore(
|
|
preamble.WithClock(engine.clock),
|
|
preamble.WithDirectory(engine.directory),
|
|
preamble.WithReadWriter(engine.preambleRW),
|
|
preamble.WithGetStateFunc(engine.getStateFunc),
|
|
preamble.WithSetKeyDataFunc(engine.setKeyDataFunc),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
engine.preambleStore = preambleStore
|
|
|
|
// Setup AOF log store engine
|
|
appendStore, err := logstore.NewAppendStore(
|
|
logstore.WithClock(engine.clock),
|
|
logstore.WithDirectory(engine.directory),
|
|
logstore.WithStrategy(engine.syncStrategy),
|
|
logstore.WithReadWriter(engine.appendRW),
|
|
logstore.WithHandleCommandFunc(engine.handleCommand),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
engine.appendStore = appendStore
|
|
|
|
return engine, nil
|
|
}
|
|
|
|
func (engine *Engine) LogCommand(database int, command []byte) {
|
|
if err := engine.appendStore.Write(database, command); err != nil {
|
|
log.Printf("log command error: %+v\n", err)
|
|
}
|
|
}
|
|
|
|
func (engine *Engine) RewriteLog() error {
|
|
engine.mut.Lock()
|
|
defer engine.mut.Unlock()
|
|
|
|
engine.startRewriteFunc()
|
|
defer engine.finishRewriteFunc()
|
|
|
|
// Create AOF preamble.
|
|
if err := engine.preambleStore.CreatePreamble(); err != nil {
|
|
return fmt.Errorf("rewrite log error: create preamble error: %+v", err)
|
|
}
|
|
|
|
// Truncate the AOF file.
|
|
if err := engine.appendStore.Truncate(); err != nil {
|
|
return fmt.Errorf("rewrite log error: create aof error: %+v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (engine *Engine) Restore() error {
|
|
if err := engine.preambleStore.Restore(); err != nil {
|
|
return fmt.Errorf("restore aof error: restore preamble error: %+v", err)
|
|
}
|
|
if err := engine.appendStore.Restore(); err != nil {
|
|
return fmt.Errorf("restore aof error: restore aof error: %+v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (engine *Engine) Close() {
|
|
if err := engine.preambleStore.Close(); err != nil {
|
|
log.Printf("close preamble store error: %+v\n", engine)
|
|
}
|
|
if err := engine.appendStore.Close(); err != nil {
|
|
log.Printf("close append store error: %+v\n", engine)
|
|
}
|
|
}
|