GetState receiver function now checks wether there's an active copy or state mutation before proceeding with the copy.

Handling of write commands is delayed until state copy is complete.
This commit is contained in:
Kelvin Clement Mwinuka
2024-01-31 01:55:51 +08:00
parent 25b2cb7154
commit ce0eabf865
6 changed files with 72 additions and 26 deletions

View File

@@ -16,7 +16,6 @@ import (
"log"
"net"
"os"
"slices"
"sync"
"sync/atomic"
"time"
@@ -42,6 +41,9 @@ type Server struct {
PubSub *pubsub.PubSub
SnapshotInProgress atomic.Bool
RewriteAOFInProgress atomic.Bool
StateCopyInProgress atomic.Bool
StateMutationInProgress atomic.Bool
LatestSnapshotMilliseconds atomic.Int64 // Unix epoch in milliseconds
SnapshotEngine *snapshot.Engine
AOFEngine *aof.Engine
@@ -154,6 +156,16 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) {
continue
}
// If we're not in cluster mode and command/subcommand is a write command, wait for state copy to finish.
if utils.IsWriteCommand(command, subCommand) {
for {
if !server.StateCopyInProgress.Load() {
server.StateMutationInProgress.Store(true)
break
}
}
}
if !server.IsInCluster() || !synchronize {
if res, err := handler(ctx, cmd, server, &conn); err != nil {
if _, err := w.Write([]byte(fmt.Sprintf("-%s\r\n\r\n", err.Error()))); err != nil {
@@ -163,14 +175,11 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) {
if _, err := w.Write(res); err != nil {
log.Println(err)
}
if slices.Contains(append(command.Categories, subCommand.Categories...), utils.WriteCategory) {
// Log successful write command
err := server.AOFEngine.LogCommand(message) // TODO: Handle error
if err != nil {
log.Println(err)
}
if utils.IsWriteCommand(command, subCommand) {
// TODO: Queue successful write command instead of logging it directly
}
}
server.StateMutationInProgress.Store(false)
continue
}
@@ -242,8 +251,10 @@ func (server *Server) Start(ctx context.Context) {
} else {
// Initialize standalone AOF engine
server.AOFEngine = aof.NewAOFEngine(aof.Opts{
Config: conf,
GetState: server.GetState,
Config: conf,
GetState: server.GetState,
StartRewriteAOF: server.StartRewriteAOF,
FinishRewriteAOF: server.FinishRewriteAOF,
})
// Initialize and start standalone snapshot engine
server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{
@@ -288,6 +299,14 @@ func (server *Server) TakeSnapshot() error {
return nil
}
func (server *Server) StartSnapshot() {
server.SnapshotInProgress.Store(true)
}
func (server *Server) FinishSnapshot() {
server.SnapshotInProgress.Store(false)
}
func (server *Server) SetLatestSnapshot(msec int64) {
server.LatestSnapshotMilliseconds.Store(msec)
}
@@ -296,9 +315,24 @@ func (server *Server) GetLatestSnapshot() int64 {
return server.LatestSnapshotMilliseconds.Load()
}
func (server *Server) StartRewriteAOF() {
server.RewriteAOFInProgress.Store(true)
}
func (server *Server) FinishRewriteAOF() {
server.RewriteAOFInProgress.Store(false)
}
func (server *Server) RewriteAOF() error {
// TODO: Make this concurrent
return server.AOFEngine.RewriteLog()
if server.RewriteAOFInProgress.Load() {
return errors.New("aof rewrite in progress")
}
go func() {
if err := server.AOFEngine.RewriteLog(); err != nil {
log.Println(err)
}
}()
return nil
}
func (server *Server) ShutDown(ctx context.Context) {