Files
redis-go/aof/rewrite.go
no sugar 11c0385241 bug fix: FlushDB command is not recorded by AOF (#142)
* bug fix: FlushDB command is not recorded by AOF

* bug fix: Fix an error occurred while rewriting aof rename, windows.ERROR_SHARING_VIOLATION

* bug fix: Fix an error occurred while rewriting aof rename, windows.ERROR_SHARING_VIOLATION

* change the flushDB procedures

* The savecmdline flushdb in the method is incorrectly selected, causing the testof error
2023-03-19 17:36:56 +08:00

162 lines
4.0 KiB
Go

package aof
import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
"io"
"io/ioutil"
"os"
"strconv"
"time"
)
func (persister *Persister) newRewriteHandler() *Persister {
h := &Persister{}
h.aofFilename = persister.aofFilename
h.db = persister.tmpDBMaker()
return h
}
// RewriteCtx holds context of an AOF rewriting procedure
type RewriteCtx struct {
tmpFile *os.File
fileSize int64
dbIdx int // selected db index when startRewrite
}
// Rewrite carries out AOF rewrite
func (persister *Persister) Rewrite() error {
ctx, err := persister.StartRewrite()
if err != nil {
return err
}
err = persister.DoRewrite(ctx)
if err != nil {
return err
}
persister.FinishRewrite(ctx)
return nil
}
// DoRewrite actually rewrite aof file
// makes DoRewrite public for testing only, please use Rewrite instead
func (persister *Persister) DoRewrite(ctx *RewriteCtx) error {
tmpFile := ctx.tmpFile
// load aof tmpFile
tmpAof := persister.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize))
// rewrite aof tmpFile
for i := 0; i < config.Properties.Databases; i++ {
// select db
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
_, err := tmpFile.Write(data)
if err != nil {
return err
}
// dump db
tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
cmd := EntityToCmd(key, entity)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
if expiration != nil {
cmd := MakeExpireCmd(key, *expiration)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
}
return true
})
}
return nil
}
// StartRewrite prepares rewrite procedure
func (persister *Persister) StartRewrite() (*RewriteCtx, error) {
persister.pausingAof.Lock() // pausing aof
defer persister.pausingAof.Unlock()
err := persister.aofFile.Sync()
if err != nil {
logger.Warn("fsync failed")
return nil, err
}
// get current aof file size
fileInfo, _ := os.Stat(persister.aofFilename)
filesize := fileInfo.Size()
// create tmp file
file, err := ioutil.TempFile("", "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
dbIdx: persister.currentDB,
}, nil
}
// FinishRewrite finish rewrite procedure
func (persister *Persister) FinishRewrite(ctx *RewriteCtx) {
persister.pausingAof.Lock() // pausing aof
defer persister.pausingAof.Unlock()
tmpFile := ctx.tmpFile
// write commands executed during rewriting to tmp file
src, err := os.Open(persister.aofFilename)
if err != nil {
logger.Error("open aofFilename failed: " + err.Error())
return
}
defer func() {
_ = src.Close()
}()
_, err = src.Seek(ctx.fileSize, 0)
if err != nil {
logger.Error("seek failed: " + err.Error())
return
}
// sync tmpFile's db index with online aofFile
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil {
logger.Error("tmp file rewrite failed: " + err.Error())
return
}
// copy data
_, err = io.Copy(tmpFile, src)
if err != nil {
logger.Error("copy aof filed failed: " + err.Error())
return
}
tmpFileName := tmpFile.Name()
_ = tmpFile.Close()
// replace current aof file by tmp file
_ = persister.aofFile.Close()
_ = os.Rename(tmpFileName, persister.aofFilename)
// reopen aof file for further write
aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
persister.aofFile = aofFile
// write select command again to ensure aof file has the same db index with persister.currentDB
data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes()
_, err = persister.aofFile.Write(data)
if err != nil {
panic(err)
}
}