diff --git a/echovault/echovault.go b/echovault/echovault.go index 0f73c82..d4f9f92 100644 --- a/echovault/echovault.go +++ b/echovault/echovault.go @@ -256,7 +256,7 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) { }), ) // Set up standalone AOF engine - echovault.aofEngine = aof.NewAOFEngine( + aofEngine, err := aof.NewAOFEngine( aof.WithClock(echovault.clock), aof.WithDirectory(echovault.config.DataDir), aof.WithStrategy(echovault.config.AOFSyncStrategy), @@ -289,6 +289,10 @@ func NewEchoVault(options ...func(echovault *EchoVault)) (*EchoVault, error) { } }), ) + if err != nil { + return nil, err + } + echovault.aofEngine = aofEngine } // If eviction policy is not noeviction, start a goroutine to evict keys every 100 milliseconds. diff --git a/internal/aof/engine.go b/internal/aof/engine.go index e630c16..d23b171 100644 --- a/internal/aof/engine.go +++ b/internal/aof/engine.go @@ -107,7 +107,7 @@ func WithAppendReadWriter(rw logstore.AppendReadWriter) func(engine *Engine) { } } -func NewAOFEngine(options ...func(engine *Engine)) *Engine { +func NewAOFEngine(options ...func(engine *Engine)) (*Engine, error) { engine := &Engine{ clock: clock.NewClock(), syncStrategy: "everysec", @@ -129,22 +129,30 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { } // Setup Preamble engine - engine.preambleStore = preamble.NewPreambleStore( + preambleStore, err := preamble.NewPreambleStore( preamble.WithClock(engine.clock), preamble.WithDirectory(engine.directory), preamble.WithReadWriter(engine.preambleRW), preamble.WithGetStateFunc(engine.getStateFunc), preamble.WithSetKeyDataFunc(engine.setKeyDataFunc), ) + if err != nil { + return nil, err + } + engine.preambleStore = preambleStore // Setup AOF log store engine - engine.appendStore = logstore.NewAppendStore( + appendStore, err := logstore.NewAppendStore( logstore.WithClock(engine.clock), logstore.WithDirectory(engine.directory), logstore.WithStrategy(engine.syncStrategy), logstore.WithReadWriter(engine.appendRW), logstore.WithHandleCommandFunc(engine.handleCommand), ) + if err != nil { + return nil, err + } + engine.appendStore = appendStore // 3. Start the goroutine to pick up queued commands in order to write them to the file. // LogCommand will get the open file handler from the struct top perform the AOF operation. @@ -157,7 +165,7 @@ func NewAOFEngine(options ...func(engine *Engine)) *Engine { } }() - return engine + return engine, nil } func (engine *Engine) QueueCommand(command []byte) { @@ -173,12 +181,12 @@ func (engine *Engine) RewriteLog() error { // Create AOF preamble if err := engine.preambleStore.CreatePreamble(); err != nil { - log.Println(fmt.Errorf("rewrite log -> create preamble error: %+v", err)) + return fmt.Errorf("rewrite log -> create preamble error: %+v", err) } // Truncate the AOF file. if err := engine.appendStore.Truncate(); err != nil { - log.Println(fmt.Errorf("rewrite log -> create aof error: %+v", err)) + return fmt.Errorf("rewrite log -> create aof error: %+v", err) } return nil @@ -186,10 +194,10 @@ func (engine *Engine) RewriteLog() error { func (engine *Engine) Restore() error { if err := engine.preambleStore.Restore(); err != nil { - log.Println(fmt.Errorf("restore aof -> restore preamble error: %+v", err)) + return fmt.Errorf("restore aof -> restore preamble error: %+v", err) } if err := engine.appendStore.Restore(); err != nil { - log.Println(fmt.Errorf("restore aof -> restore aof error: %+v", err)) + return fmt.Errorf("restore aof -> restore aof error: %+v", err) } return nil } diff --git a/internal/aof/engine_test.go b/internal/aof/engine_test.go new file mode 100644 index 0000000..5548a48 --- /dev/null +++ b/internal/aof/engine_test.go @@ -0,0 +1,163 @@ +// Copyright 2024 Kelvin Clement Mwinuka +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aof_test + +import ( + "fmt" + "github.com/echovault/echovault/internal" + "github.com/echovault/echovault/internal/aof" + "github.com/echovault/echovault/internal/aof/log" + "github.com/echovault/echovault/internal/aof/preamble" + "github.com/echovault/echovault/internal/clock" + "os" + "sync/atomic" + "testing" + "time" +) + +func marshalRespCommand(command []string) []byte { + return []byte(fmt.Sprintf( + "*%d\r\n$%d\r\n%s\r\n$%d\r\n%s\r\n$%d\r\n%s", len(command), + len(command[0]), command[0], + len(command[1]), command[1], + len(command[2]), command[2], + )) +} + +func Test_AOFEngine(t *testing.T) { + strategy := "always" + directory := "./testdata" + + var rewriteInProgress atomic.Bool + startRewriteFunc := func() { + if rewriteInProgress.Load() { + t.Error("expected rewriteInProgress to be false, got true") + } + rewriteInProgress.Store(true) + } + finishRewriteFunc := func() { + if !rewriteInProgress.Load() { + t.Error("expected rewriteInProgress to be true, got false") + rewriteInProgress.Store(false) + } + } + + state := map[string]internal.KeyData{ + "key1": {Value: "value1", ExpireAt: clock.NewClock().Now().Add(10 * time.Second)}, + "key2": {Value: "value2", ExpireAt: clock.NewClock().Now().Add(-10 * time.Second)}, // Should be excluded on restore + "key3": {Value: "value3", ExpireAt: clock.NewClock().Now().Add(10 * time.Second)}, + } + restoredState := map[string]internal.KeyData{} + wantRestoredState := map[string]internal.KeyData{ + "key1": {Value: "value1", ExpireAt: clock.NewClock().Now().Add(10 * time.Second)}, + "key3": {Value: "value3", ExpireAt: clock.NewClock().Now().Add(10 * time.Second)}, + "key4": {Value: "value4", ExpireAt: time.Time{}}, + "key5": {Value: "value5", ExpireAt: time.Time{}}, + "key6": {Value: "value6", ExpireAt: time.Time{}}, + "key7": {Value: "value7", ExpireAt: time.Time{}}, + "key8": {Value: "value8", ExpireAt: time.Time{}}, + "key9": {Value: "value9", ExpireAt: time.Time{}}, + "key10": {Value: "value10", ExpireAt: time.Time{}}, + } + getStateFunc := func() map[string]internal.KeyData { + return state + } + setKeyDataFunc := func(key string, data internal.KeyData) { + restoredState[key] = data + } + handleCommandFunc := func(command []byte) { + cmd, err := internal.Decode(command) + if err != nil { + t.Error(err) + } + restoredState[cmd[1]] = internal.KeyData{Value: cmd[2], ExpireAt: time.Time{}} + } + + preambleReadWriter := func() preamble.PreambleReadWriter { + return nil + }() + appendReadWriter := func() log.AppendReadWriter { + return nil + }() + + engine, err := aof.NewAOFEngine( + aof.WithClock(clock.NewClock()), + aof.WithStrategy(strategy), + aof.WithDirectory(directory), + aof.WithStartRewriteFunc(startRewriteFunc), + aof.WithFinishRewriteFunc(finishRewriteFunc), + aof.WithGetStateFunc(getStateFunc), + aof.WithSetKeyDataFunc(setKeyDataFunc), + aof.WithHandleCommandFunc(handleCommandFunc), + aof.WithPreambleReadWriter(preambleReadWriter), + aof.WithAppendReadWriter(appendReadWriter), + ) + if err != nil { + t.Error(err) + } + + // Log some commands to mutate the state + preRewriteCommands := [][]string{ + {"SET", "key4", "value4"}, + {"SET", "key5", "value5"}, + {"SET", "key6", "value6"}, + } + for _, command := range preRewriteCommands { + state[command[1]] = internal.KeyData{Value: command[2], ExpireAt: time.Time{}} + engine.QueueCommand(marshalRespCommand(command)) + } + <-time.After(100 * time.Millisecond) + + // Trigger log rewrite + if err = engine.RewriteLog(); err != nil { + t.Error(err) + } + + // Log some mode commands + postRewriteCommands := [][]string{ + {"SET", "key7", "value7"}, + {"SET", "key8", "value8"}, + {"SET", "key9", "value9"}, + {"SET", "key10", "value10"}, + } + for _, command := range postRewriteCommands { + state[command[1]] = internal.KeyData{Value: command[2], ExpireAt: time.Time{}} + engine.QueueCommand(marshalRespCommand(command)) + } + <-time.After(100 * time.Millisecond) + + // Restore logs + if err = engine.Restore(); err != nil { + t.Error(err) + } + + if len(wantRestoredState) != len(restoredState) { + t.Errorf("expected restored state to be lenght %d, got %d", len(wantRestoredState), len(restoredState)) + for key, data := range restoredState { + want, ok := wantRestoredState[key] + if !ok { + t.Errorf("could not find key %s in expected state state", key) + } + if want.Value != data.Value { + t.Errorf("expected value %v for key %s, got %v", want.Value, key, data.Value) + } + if !want.ExpireAt.Equal(data.ExpireAt) { + t.Errorf("expected expiry time of %v for key %s, got %v", want.ExpireAt, key, data.ExpireAt) + } + } + } + + _ = os.RemoveAll(directory) +} diff --git a/internal/aof/log/store_test.go b/internal/aof/log/store_test.go index e5a3b56..92fa0cf 100644 --- a/internal/aof/log/store_test.go +++ b/internal/aof/log/store_test.go @@ -34,7 +34,7 @@ func marshalRespCommand(command []string) []byte { )) } -func Test_LogWithNoReadWriter(t *testing.T) { +func Test_AppendStore(t *testing.T) { tests := []struct { name string directory string @@ -44,7 +44,7 @@ func Test_LogWithNoReadWriter(t *testing.T) { }{ { name: "1. Not passing an AppendReadWriter to NewAppendStore should create a new append file", - directory: "./testdata/with_no_read_writer", + directory: "./testdata/log/with_no_read_writer", strategy: "always", commands: [][]string{ {"SET", "key1", "value1"}, @@ -55,7 +55,7 @@ func Test_LogWithNoReadWriter(t *testing.T) { }, { name: "2. Passing an existing AppendReadWriter to NewAppendStore should successfully append and restore", - directory: "./testdata/with_read_writer", + directory: "./testdata/log/with_read_writer", strategy: "always", commands: [][]string{ {"SET", "key1", "value1"}, @@ -77,7 +77,7 @@ func Test_LogWithNoReadWriter(t *testing.T) { }, { name: "3. Using everysec strategy should sync the AOF file after one second", - directory: "./testdata/with_everysec_strategy", + directory: "./testdata/log/with_everysec_strategy", strategy: "everysec", commands: [][]string{ {"SET", "key1", "value1"}, @@ -141,4 +141,6 @@ func Test_LogWithNoReadWriter(t *testing.T) { t.Error("timeout error") } } + + _ = os.RemoveAll("./testdata") } diff --git a/internal/aof/preamble/store.go b/internal/aof/preamble/store.go index 7c97359..4dacc7d 100644 --- a/internal/aof/preamble/store.go +++ b/internal/aof/preamble/store.go @@ -23,6 +23,7 @@ import ( "os" "path" "sync" + "time" ) type PreambleReadWriter interface { @@ -141,6 +142,11 @@ func (store *PreambleStore) Restore() error { return nil } + // Seek to the beginning of the file before beginning restore + if _, err := store.rw.Seek(0, 0); err != nil { + return fmt.Errorf("restore preamble: %v", err) + } + b, err := io.ReadAll(store.rw) if err != nil { return err @@ -173,6 +179,9 @@ func (store *PreambleStore) Close() error { func (store *PreambleStore) filterExpiredKeys(state map[string]internal.KeyData) map[string]internal.KeyData { var keysToDelete []string for k, v := range state { + if v.ExpireAt.Equal(time.Time{}) { + continue + } if v.ExpireAt.Before(store.clock.Now()) { keysToDelete = append(keysToDelete, k) } diff --git a/internal/aof/preamble/store_test.go b/internal/aof/preamble/store_test.go new file mode 100644 index 0000000..846cfea --- /dev/null +++ b/internal/aof/preamble/store_test.go @@ -0,0 +1,183 @@ +// Copyright 2024 Kelvin Clement Mwinuka +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package preamble_test + +import ( + "github.com/echovault/echovault/internal" + "github.com/echovault/echovault/internal/aof/preamble" + "github.com/echovault/echovault/internal/clock" + "os" + "path" + "testing" + "time" +) + +func Test_PreambleStore(t *testing.T) { + directory := "./testdata/preamble" + tests := []struct { + name string + directory string + state map[string]internal.KeyData + preambleReadWriter preamble.PreambleReadWriter + wantState map[string]internal.KeyData + }{ + { + name: "1. Preamble store with no preamble read writer passed should trigger one to be created upon initialization", + directory: directory, + state: map[string]internal.KeyData{ + "key1": { + Value: "value1", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key2": { + Value: "value2", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key3": { + Value: "value3", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + }, + preambleReadWriter: nil, + wantState: map[string]internal.KeyData{ + "key1": { + Value: "value1", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key2": { + Value: "value2", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key3": { + Value: "value3", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + }, + }, + { + name: "2. Pass a pre-existing preamble read writer to constructor", + directory: directory, + state: map[string]internal.KeyData{ + "key4": { + Value: "value4", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key5": { + Value: "value5", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key6": { + Value: "value6", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + }, + preambleReadWriter: func() preamble.PreambleReadWriter { + if err := os.MkdirAll(path.Join("./testdata/preamble", "aof"), os.ModePerm); err != nil { + t.Error(err) + } + f, err := os.OpenFile(path.Join("./testdata/preamble", "aof", "preamble.bin"), + os.O_RDWR|os.O_CREATE, os.ModePerm) + if err != nil { + t.Error(err) + } + return f + }(), + wantState: map[string]internal.KeyData{ + "key4": { + Value: "value4", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key5": { + Value: "value5", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key6": { + Value: "value6", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + }, + }, + { + name: "3. Skip expired keys when saving/loading state from preamble read writer", + directory: directory, + state: map[string]internal.KeyData{ + "key7": { + Value: "value7", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key8": { + Value: "value8", + ExpireAt: clock.NewClock().Now().Add(-10 * time.Second), + }, + "key9": { + Value: "value9", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key10": { + Value: "value10", + ExpireAt: clock.NewClock().Now().Add(-10 * time.Second), + }, + }, + preambleReadWriter: nil, + wantState: map[string]internal.KeyData{ + "key7": { + Value: "value7", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + "key9": { + Value: "value9", + ExpireAt: clock.NewClock().Now().Add(10 * time.Second), + }, + }, + }, + } + + for _, test := range tests { + options := []func(store *preamble.PreambleStore){ + preamble.WithClock(clock.NewClock()), + preamble.WithDirectory(test.directory), + preamble.WithGetStateFunc(func() map[string]internal.KeyData { + return test.state + }), + preamble.WithSetKeyDataFunc(func(key string, data internal.KeyData) { + entry, ok := test.wantState[key] + if !ok { + t.Errorf("could not find element: %v", key) + } + if entry.Value != data.Value { + t.Errorf("expected value %v for key %s, got %v", entry.Value, key, data.Value) + } + if !entry.ExpireAt.Equal(data.ExpireAt) { + t.Errorf("expected expireAt %v for key %s, got %v", entry.ExpireAt, key, data.ExpireAt) + } + }), + } + + store, err := preamble.NewPreambleStore(options...) + if err != nil { + t.Error(err) + } + + if err = store.CreatePreamble(); err != nil { + t.Error(err) + } + + if err = store.Restore(); err != nil { + t.Error(err) + } + } + + _ = os.RemoveAll("./testdata") +} diff --git a/internal/snapshot/snapshot_test.go b/internal/snapshot/snapshot_test.go new file mode 100644 index 0000000..eff8884 --- /dev/null +++ b/internal/snapshot/snapshot_test.go @@ -0,0 +1,15 @@ +// Copyright 2024 Kelvin Clement Mwinuka +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot_test