add AppendFsync

This commit is contained in:
finley
2022-12-25 22:50:57 +08:00
parent 8089b357e4
commit d399353394
8 changed files with 151 additions and 28 deletions

View File

@@ -1,6 +1,7 @@
package aof
import (
"context"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
@@ -10,7 +11,9 @@ import (
"io"
"os"
"strconv"
"strings"
"sync"
"time"
)
// CmdLine is alias for [][]byte, represents a command line
@@ -20,9 +23,19 @@ const (
aofQueueSize = 1 << 16
)
const (
// FsyncAlways do fsync for every command
FsyncAlways = "always"
// FsyncEverySec do fsync every second
FsyncEverySec = "everysec"
// FsyncNo lets operating system decides when to do fsync
FsyncNo = "no"
)
type payload struct {
cmdLine CmdLine
dbIndex int
wg *sync.WaitGroup
}
// Listener will be called-back after receiving a aof payload
@@ -34,23 +47,27 @@ type Listener interface {
// Persister receive msgs from channel and write to AOF file
type Persister struct {
ctx context.Context
cancel context.CancelFunc
db database.DBEngine
tmpDBMaker func() database.DBEngine
aofChan chan *payload
aofFile *os.File
aofFilename string
// aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shutdown
aofFsync string
// aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down
aofFinished chan struct{}
// pause aof for start/finish aof rewrite progress
pausingAof sync.RWMutex
pausingAof sync.Mutex
currentDB int
listeners map[Listener]struct{}
}
// NewPersister creates a new aof.Persister
func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker func() database.DBEngine) (*Persister, error) {
func NewPersister(db database.DBEngine, filename string, load bool, fsync string, tmpDBMaker func() database.DBEngine) (*Persister, error) {
persister := &Persister{}
persister.aofFilename = filename
persister.aofFsync = strings.ToLower(fsync)
persister.db = db
persister.tmpDBMaker = tmpDBMaker
if load {
@@ -65,8 +82,14 @@ func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker f
persister.aofFinished = make(chan struct{})
persister.listeners = make(map[Listener]struct{})
go func() {
persister.handleAof()
persister.listenCmd()
}()
ctx, cancel := context.WithCancel(context.Background())
persister.ctx = ctx
persister.cancel = cancel
if persister.aofFsync == FsyncEverySec {
persister.fsyncEverySecond()
}
return persister, nil
}
@@ -77,24 +100,53 @@ func (persister *Persister) RemoveListener(listener Listener) {
delete(persister.listeners, listener)
}
// AddAof send command to aof goroutine through channel
func (persister *Persister) AddAof(dbIndex int, cmdLine CmdLine) {
if persister.aofChan != nil {
var wgPool = sync.Pool{
New: func() interface{} {
return &sync.WaitGroup{}
},
}
func getWg() *sync.WaitGroup {
return wgPool.Get().(*sync.WaitGroup)
}
func returnWg(wg *sync.WaitGroup) {
wgPool.Put(wg)
}
// SaveCmdLine send command to aof goroutine through channel
func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) {
// aofChan will be set as nil temporarily during load aof see Persister.LoadAof
if persister.aofChan == nil {
return
}
if persister.aofFsync == FsyncAlways {
// use WaitGroup to wait for saving finished
wg := getWg()
defer returnWg(wg)
wg.Add(1)
persister.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
wg: wg,
}
wg.Wait()
}
persister.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
// handleAof listen aof channel and write into file
func (persister *Persister) handleAof() {
// listenCmd listen aof channel and write into file
func (persister *Persister) listenCmd() {
// serialized execution
var cmdLines []CmdLine
persister.currentDB = 0
for p := range persister.aofChan {
cmdLines = cmdLines[:0] // reuse underlying array
persister.pausingAof.RLock() // prevent other goroutines from pausing aof
cmdLines = cmdLines[:0] // reuse underlying array
persister.pausingAof.Lock() // prevent other goroutines from pausing aof
// ensure aof is in the right database
if p.dbIndex != persister.currentDB {
// select db
selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
@@ -103,26 +155,34 @@ func (persister *Persister) handleAof() {
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
persister.pausingAof.RUnlock()
persister.pausingAof.Unlock()
continue // skip this command
}
persister.currentDB = p.dbIndex
}
// save command
data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
cmdLines = append(cmdLines, p.cmdLine)
_, err := persister.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
persister.pausingAof.RUnlock()
for listener := range persister.listeners {
listener.Callback(cmdLines)
}
if persister.aofFsync == FsyncAlways {
_ = persister.aofFile.Sync()
}
if p.wg != nil {
p.wg.Done()
}
persister.pausingAof.Unlock()
}
persister.aofFinished <- struct{}{}
}
// LoadAof read aof file, can only be used before Persister.handleAof started
// LoadAof read aof file, can only be used before Persister.listenCmd started
func (persister *Persister) LoadAof(maxBytes int) {
// persister.db.Exec may call persister.addAof
// delete aofChan to prevent loaded commands back into aofChan
@@ -184,4 +244,23 @@ func (persister *Persister) Close() {
logger.Warn(err)
}
}
persister.cancel()
}
func (persister *Persister) fsyncEverySecond() {
ticker := time.NewTicker(time.Second)
go func() {
for {
select {
case <-ticker.C:
persister.pausingAof.Lock()
if err := persister.aofFile.Sync(); err != nil {
logger.Errorf("fsync failed: %v", err)
}
persister.pausingAof.Unlock()
case <-persister.ctx.Done():
return
}
}
}()
}

View File

@@ -17,6 +17,7 @@ type ServerProperties struct {
Port int `cfg:"port"`
AppendOnly bool `cfg:"appendonly"`
AppendFilename string `cfg:"appendfilename"`
AppendFsync string `cfg:"appendfsync"`
MaxClients int `cfg:"maxclients"`
RequirePass string `cfg:"requirepass"`
Databases int `cfg:"databases"`

View File

@@ -79,26 +79,26 @@ func (server *Server) loadRDB(dec *core.Decoder) error {
})
}
func NewPersister(db database.DBEngine, filename string, load bool) (*aof.Persister, error) {
return aof.NewPersister(db, filename, load, func() database.DBEngine {
func NewPersister(db database.DBEngine, filename string, load bool, fsync string) (*aof.Persister, error) {
return aof.NewPersister(db, filename, load, fsync, func() database.DBEngine {
return MakeAuxiliaryServer()
})
}
func (server *Server) AddAof(dbIndex int, cmdLine CmdLine) {
if server.persister != nil {
server.persister.AddAof(dbIndex, cmdLine)
server.persister.SaveCmdLine(dbIndex, cmdLine)
}
}
func (server *Server) bindPersister(aofHandler *aof.Persister) {
server.persister = aofHandler
// bind AddAof
// bind SaveCmdLine
for _, db := range server.dbSet {
singleDB := db.Load().(*DB)
singleDB.addAof = func(line CmdLine) {
if config.Properties.AppendOnly { // config may be changed during runtime
server.persister.AddAof(singleDB.index, line)
server.persister.SaveCmdLine(singleDB.index, line)
}
}
}

View File

@@ -1,13 +1,16 @@
package database
import (
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol/asserts"
"io/ioutil"
"path/filepath"
"runtime"
"testing"
"time"
)
func TestLoadRDB(t *testing.T) {
@@ -38,3 +41,40 @@ func TestLoadRDB(t *testing.T) {
result = rdbDB.Exec(conn, utils.ToCmdLine("Get", "str"))
asserts.AssertNullBulk(t, result)
}
func TestServerFsyncAlways(t *testing.T) {
aofFile, err := ioutil.TempFile("", "*.aof")
if err != nil {
t.Error(err)
return
}
config.Properties.AppendOnly = true
config.Properties.AppendFilename = aofFile.Name()
config.Properties.AppendFsync = aof.FsyncAlways
server := NewStandaloneServer()
conn := connection.NewFakeConn()
ret := server.Exec(conn, utils.ToCmdLine("set", "1", "1"))
asserts.AssertNotError(t, ret)
reader := NewStandaloneServer()
ret = reader.Exec(conn, utils.ToCmdLine("get", "1"))
asserts.AssertBulkReply(t, ret, "1")
}
func TestServerFsyncEverySec(t *testing.T) {
aofFile, err := ioutil.TempFile("", "*.aof")
if err != nil {
t.Error(err)
return
}
config.Properties.AppendOnly = true
config.Properties.AppendFilename = aofFile.Name()
config.Properties.AppendFsync = aof.FsyncEverySec
server := NewStandaloneServer()
conn := connection.NewFakeConn()
ret := server.Exec(conn, utils.ToCmdLine("set", "1", "1"))
asserts.AssertNotError(t, ret)
time.Sleep(1500 * time.Millisecond)
reader := NewStandaloneServer()
ret = reader.Exec(conn, utils.ToCmdLine("get", "1"))
asserts.AssertBulkReply(t, ret, "1")
}

View File

@@ -50,7 +50,7 @@ func TestReplicationMasterSide(t *testing.T) {
AppendFilename: aofFilename,
}
master := mockServer()
aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true)
aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true, config.Properties.AppendFsync)
if err != nil {
panic(err)
}
@@ -213,7 +213,7 @@ func TestReplicationMasterRewriteRDB(t *testing.T) {
AppendFilename: aofFilename,
}
master := mockServer()
aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true)
aofHandler, err := NewPersister(master, config.Properties.AppendFilename, true, config.Properties.AppendFsync)
if err != nil {
panic(err)
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
@@ -315,7 +316,7 @@ func makeRdbLoader(upgradeAof bool) (*Server, string, error) {
return nil, "", fmt.Errorf("create temp rdb failed: %v", err)
}
newAofFilename := newAofFile.Name()
aofHandler, err := NewPersister(rdbLoader, newAofFilename, false)
aofHandler, err := NewPersister(rdbLoader, newAofFilename, false, aof.FsyncNo)
if err != nil {
return nil, "", err
}
@@ -364,11 +365,11 @@ func (server *Server) loadMasterRDB(configVersion int32) error {
if err != nil {
return err
}
aofHandler, err := NewPersister(server, config.Properties.AppendFilename, false)
persister, err := NewPersister(server, config.Properties.AppendFilename, false, config.Properties.AppendFsync)
if err != nil {
return err
}
server.bindPersister(aofHandler)
server.bindPersister(persister)
}
return nil

View File

@@ -2,6 +2,7 @@ package database
import (
"bytes"
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/client"
@@ -37,7 +38,7 @@ func TestReplicationSlaveSide(t *testing.T) {
t.Error(err)
return
}
aofHandler, err := NewPersister(server, config.Properties.AppendFilename, true)
aofHandler, err := NewPersister(server, config.Properties.AppendFilename, true, aof.FsyncNo)
if err != nil {
t.Error(err)
return
@@ -151,7 +152,7 @@ func TestReplicationSlaveSide(t *testing.T) {
// check slave aof file
aofLoader := MakeAuxiliaryServer()
aofHandler2, err := NewPersister(aofLoader, config.Properties.AppendFilename, true)
aofHandler2, err := NewPersister(aofLoader, config.Properties.AppendFilename, true, aof.FsyncNo)
aofLoader.bindPersister(aofHandler2)
ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "zz"))
asserts.AssertNullBulk(t, ret)

View File

@@ -49,7 +49,8 @@ func NewStandaloneServer() *Server {
server.hub = pubsub.MakeHub()
validAof := false
if config.Properties.AppendOnly {
aofHandler, err := NewPersister(server, config.Properties.AppendFilename, true)
aofHandler, err := NewPersister(server,
config.Properties.AppendFilename, true, config.Properties.AppendFsync)
if err != nil {
panic(err)
}
@@ -221,7 +222,7 @@ func (server *Server) flushAll() redis.Reply {
server.flushDB(i)
}
if server.persister != nil {
server.persister.AddAof(0, utils.ToCmdLine("FlushAll"))
server.persister.SaveCmdLine(0, utils.ToCmdLine("FlushAll"))
}
return &protocol.OkReply{}
}