From d39935339459edfd7e9d07ec17d4b12aa3265e94 Mon Sep 17 00:00:00 2001 From: finley Date: Sun, 25 Dec 2022 22:50:57 +0800 Subject: [PATCH] add AppendFsync --- aof/aof.go | 107 ++++++++++++++++++++++++---- config/config.go | 1 + database/persistence.go | 10 +-- database/persistence_test.go | 40 +++++++++++ database/replication_master_test.go | 4 +- database/replication_slave.go | 7 +- database/replication_slave_test.go | 5 +- database/server.go | 5 +- 8 files changed, 151 insertions(+), 28 deletions(-) diff --git a/aof/aof.go b/aof/aof.go index ba422aa..e72e840 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -1,6 +1,7 @@ package aof import ( + "context" "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/utils" @@ -10,7 +11,9 @@ import ( "io" "os" "strconv" + "strings" "sync" + "time" ) // CmdLine is alias for [][]byte, represents a command line @@ -20,9 +23,19 @@ const ( aofQueueSize = 1 << 16 ) +const ( + // FsyncAlways do fsync for every command + FsyncAlways = "always" + // FsyncEverySec do fsync every second + FsyncEverySec = "everysec" + // FsyncNo lets operating system decides when to do fsync + FsyncNo = "no" +) + type payload struct { cmdLine CmdLine dbIndex int + wg *sync.WaitGroup } // Listener will be called-back after receiving a aof payload @@ -34,23 +47,27 @@ type Listener interface { // Persister receive msgs from channel and write to AOF file type Persister struct { + ctx context.Context + cancel context.CancelFunc db database.DBEngine tmpDBMaker func() database.DBEngine aofChan chan *payload aofFile *os.File aofFilename string - // aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shutdown + aofFsync string + // aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down aofFinished chan struct{} // pause aof for start/finish aof rewrite progress - pausingAof sync.RWMutex + pausingAof sync.Mutex currentDB int listeners map[Listener]struct{} } // NewPersister creates a new aof.Persister -func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker func() database.DBEngine) (*Persister, error) { +func NewPersister(db database.DBEngine, filename string, load bool, fsync string, tmpDBMaker func() database.DBEngine) (*Persister, error) { persister := &Persister{} persister.aofFilename = filename + persister.aofFsync = strings.ToLower(fsync) persister.db = db persister.tmpDBMaker = tmpDBMaker if load { @@ -65,8 +82,14 @@ func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker f persister.aofFinished = make(chan struct{}) persister.listeners = make(map[Listener]struct{}) go func() { - persister.handleAof() + persister.listenCmd() }() + ctx, cancel := context.WithCancel(context.Background()) + persister.ctx = ctx + persister.cancel = cancel + if persister.aofFsync == FsyncEverySec { + persister.fsyncEverySecond() + } return persister, nil } @@ -77,24 +100,53 @@ func (persister *Persister) RemoveListener(listener Listener) { delete(persister.listeners, listener) } -// AddAof send command to aof goroutine through channel -func (persister *Persister) AddAof(dbIndex int, cmdLine CmdLine) { - if persister.aofChan != nil { +var wgPool = sync.Pool{ + New: func() interface{} { + return &sync.WaitGroup{} + }, +} + +func getWg() *sync.WaitGroup { + return wgPool.Get().(*sync.WaitGroup) +} + +func returnWg(wg *sync.WaitGroup) { + wgPool.Put(wg) +} + +// SaveCmdLine send command to aof goroutine through channel +func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) { + // aofChan will be set as nil temporarily during load aof see Persister.LoadAof + if persister.aofChan == nil { + return + } + if persister.aofFsync == FsyncAlways { + // use WaitGroup to wait for saving finished + wg := getWg() + defer returnWg(wg) + wg.Add(1) persister.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, + wg: wg, } + wg.Wait() + } + persister.aofChan <- &payload{ + cmdLine: cmdLine, + dbIndex: dbIndex, } } -// handleAof listen aof channel and write into file -func (persister *Persister) handleAof() { +// listenCmd listen aof channel and write into file +func (persister *Persister) listenCmd() { // serialized execution var cmdLines []CmdLine persister.currentDB = 0 for p := range persister.aofChan { - cmdLines = cmdLines[:0] // reuse underlying array - persister.pausingAof.RLock() // prevent other goroutines from pausing aof + cmdLines = cmdLines[:0] // reuse underlying array + persister.pausingAof.Lock() // prevent other goroutines from pausing aof + // ensure aof is in the right database if p.dbIndex != persister.currentDB { // select db selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex)) @@ -103,26 +155,34 @@ func (persister *Persister) handleAof() { _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) - persister.pausingAof.RUnlock() + persister.pausingAof.Unlock() continue // skip this command } persister.currentDB = p.dbIndex } + // save command data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes() cmdLines = append(cmdLines, p.cmdLine) _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) } - persister.pausingAof.RUnlock() for listener := range persister.listeners { listener.Callback(cmdLines) } + if persister.aofFsync == FsyncAlways { + _ = persister.aofFile.Sync() + } + if p.wg != nil { + p.wg.Done() + } + persister.pausingAof.Unlock() + } persister.aofFinished <- struct{}{} } -// LoadAof read aof file, can only be used before Persister.handleAof started +// LoadAof read aof file, can only be used before Persister.listenCmd started func (persister *Persister) LoadAof(maxBytes int) { // persister.db.Exec may call persister.addAof // delete aofChan to prevent loaded commands back into aofChan @@ -184,4 +244,23 @@ func (persister *Persister) Close() { logger.Warn(err) } } + persister.cancel() +} + +func (persister *Persister) fsyncEverySecond() { + ticker := time.NewTicker(time.Second) + go func() { + for { + select { + case <-ticker.C: + persister.pausingAof.Lock() + if err := persister.aofFile.Sync(); err != nil { + logger.Errorf("fsync failed: %v", err) + } + persister.pausingAof.Unlock() + case <-persister.ctx.Done(): + return + } + } + }() } diff --git a/config/config.go b/config/config.go index b21282b..1c1bfa3 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ type ServerProperties struct { Port int `cfg:"port"` AppendOnly bool `cfg:"appendonly"` AppendFilename string `cfg:"appendfilename"` + AppendFsync string `cfg:"appendfsync"` MaxClients int `cfg:"maxclients"` RequirePass string `cfg:"requirepass"` Databases int `cfg:"databases"` diff --git a/database/persistence.go b/database/persistence.go index 4ad30ef..ed95714 100644 --- a/database/persistence.go +++ b/database/persistence.go @@ -79,26 +79,26 @@ func (server *Server) loadRDB(dec *core.Decoder) error { }) } -func NewPersister(db database.DBEngine, filename string, load bool) (*aof.Persister, error) { - return aof.NewPersister(db, filename, load, func() database.DBEngine { +func NewPersister(db database.DBEngine, filename string, load bool, fsync string) (*aof.Persister, error) { + return aof.NewPersister(db, filename, load, fsync, func() database.DBEngine { return MakeAuxiliaryServer() }) } func (server *Server) AddAof(dbIndex int, cmdLine CmdLine) { if server.persister != nil { - server.persister.AddAof(dbIndex, cmdLine) + server.persister.SaveCmdLine(dbIndex, cmdLine) } } func (server *Server) bindPersister(aofHandler *aof.Persister) { server.persister = aofHandler - // bind AddAof + // bind SaveCmdLine for _, db := range server.dbSet { singleDB := db.Load().(*DB) singleDB.addAof = func(line CmdLine) { if config.Properties.AppendOnly { // config may be changed during runtime - server.persister.AddAof(singleDB.index, line) + server.persister.SaveCmdLine(singleDB.index, line) } } } diff --git a/database/persistence_test.go b/database/persistence_test.go index 8130959..d8421d7 100644 --- a/database/persistence_test.go +++ b/database/persistence_test.go @@ -1,13 +1,16 @@ package database import ( + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/protocol/asserts" + "io/ioutil" "path/filepath" "runtime" "testing" + "time" ) func TestLoadRDB(t *testing.T) { @@ -38,3 +41,40 @@ func TestLoadRDB(t *testing.T) { result = rdbDB.Exec(conn, utils.ToCmdLine("Get", "str")) asserts.AssertNullBulk(t, result) } + +func TestServerFsyncAlways(t *testing.T) { + aofFile, err := ioutil.TempFile("", "*.aof") + if err != nil { + t.Error(err) + return + } + config.Properties.AppendOnly = true + config.Properties.AppendFilename = aofFile.Name() + config.Properties.AppendFsync = aof.FsyncAlways + server := NewStandaloneServer() + conn := connection.NewFakeConn() + ret := server.Exec(conn, utils.ToCmdLine("set", "1", "1")) + asserts.AssertNotError(t, ret) + reader := NewStandaloneServer() + ret = reader.Exec(conn, utils.ToCmdLine("get", "1")) + asserts.AssertBulkReply(t, ret, "1") +} + +func TestServerFsyncEverySec(t *testing.T) { + aofFile, err := ioutil.TempFile("", "*.aof") + if err != nil { + t.Error(err) + return + } + config.Properties.AppendOnly = true + config.Properties.AppendFilename = aofFile.Name() + config.Properties.AppendFsync = aof.FsyncEverySec + server := NewStandaloneServer() + conn := connection.NewFakeConn() + ret := server.Exec(conn, utils.ToCmdLine("set", "1", "1")) + asserts.AssertNotError(t, ret) + time.Sleep(1500 * time.Millisecond) + reader := NewStandaloneServer() + ret = reader.Exec(conn, utils.ToCmdLine("get", "1")) + asserts.AssertBulkReply(t, ret, "1") +} diff --git a/database/replication_master_test.go b/database/replication_master_test.go index e21adcc..b5574d6 100644 --- a/database/replication_master_test.go +++ b/database/replication_master_test.go @@ -50,7 +50,7 @@ func TestReplicationMasterSide(t *testing.T) { AppendFilename: aofFilename, } master := mockServer() - aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true) + aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true, config.Properties.AppendFsync) if err != nil { panic(err) } @@ -213,7 +213,7 @@ func TestReplicationMasterRewriteRDB(t *testing.T) { AppendFilename: aofFilename, } master := mockServer() - aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true) + aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true, config.Properties.AppendFsync) if err != nil { panic(err) } diff --git a/database/replication_slave.go b/database/replication_slave.go index 02a6df2..5a3ffb3 100644 --- a/database/replication_slave.go +++ b/database/replication_slave.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" @@ -315,7 +316,7 @@ func makeRdbLoader(upgradeAof bool) (*Server, string, error) { return nil, "", fmt.Errorf("create temp rdb failed: %v", err) } newAofFilename := newAofFile.Name() - aofHandler, err := NewPersister(rdbLoader, newAofFilename, false) + aofHandler, err := NewPersister(rdbLoader, newAofFilename, false, aof.FsyncNo) if err != nil { return nil, "", err } @@ -364,11 +365,11 @@ func (server *Server) loadMasterRDB(configVersion int32) error { if err != nil { return err } - aofHandler, err := NewPersister(server, config.Properties.AppendFilename, false) + persister, err := NewPersister(server, config.Properties.AppendFilename, false, config.Properties.AppendFsync) if err != nil { return err } - server.bindPersister(aofHandler) + server.bindPersister(persister) } return nil diff --git a/database/replication_slave_test.go b/database/replication_slave_test.go index 821ad9a..2bf07f7 100644 --- a/database/replication_slave_test.go +++ b/database/replication_slave_test.go @@ -2,6 +2,7 @@ package database import ( "bytes" + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/client" @@ -37,7 +38,7 @@ func TestReplicationSlaveSide(t *testing.T) { t.Error(err) return } - aofHandler, err := NewPersister(server, config.Properties.AppendFilename, true) + aofHandler, err := NewPersister(server, config.Properties.AppendFilename, true, aof.FsyncNo) if err != nil { t.Error(err) return @@ -151,7 +152,7 @@ func TestReplicationSlaveSide(t *testing.T) { // check slave aof file aofLoader := MakeAuxiliaryServer() - aofHandler2, err := NewPersister(aofLoader, config.Properties.AppendFilename, true) + aofHandler2, err := NewPersister(aofLoader, config.Properties.AppendFilename, true, aof.FsyncNo) aofLoader.bindPersister(aofHandler2) ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "zz")) asserts.AssertNullBulk(t, ret) diff --git a/database/server.go b/database/server.go index c686ddb..d11f41a 100644 --- a/database/server.go +++ b/database/server.go @@ -49,7 +49,8 @@ func NewStandaloneServer() *Server { server.hub = pubsub.MakeHub() validAof := false if config.Properties.AppendOnly { - aofHandler, err := NewPersister(server, config.Properties.AppendFilename, true) + aofHandler, err := NewPersister(server, + config.Properties.AppendFilename, true, config.Properties.AppendFsync) if err != nil { panic(err) } @@ -221,7 +222,7 @@ func (server *Server) flushAll() redis.Reply { server.flushDB(i) } if server.persister != nil { - server.persister.AddAof(0, utils.ToCmdLine("FlushAll")) + server.persister.SaveCmdLine(0, utils.ToCmdLine("FlushAll")) } return &protocol.OkReply{} }