package aof import ( "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" "io" "io/ioutil" "os" "strconv" "time" ) func (persister *Persister) newRewriteHandler() *Persister { h := &Persister{} h.aofFilename = persister.aofFilename h.db = persister.tmpDBMaker() return h } // RewriteCtx holds context of an AOF rewriting procedure type RewriteCtx struct { tmpFile *os.File fileSize int64 dbIdx int // selected db index when startRewrite } // Rewrite carries out AOF rewrite func (persister *Persister) Rewrite() error { ctx, err := persister.StartRewrite() if err != nil { return err } err = persister.DoRewrite(ctx) if err != nil { return err } persister.FinishRewrite(ctx) return nil } // DoRewrite actually rewrite aof file // makes DoRewrite public for testing only, please use Rewrite instead func (persister *Persister) DoRewrite(ctx *RewriteCtx) error { tmpFile := ctx.tmpFile // load aof tmpFile tmpAof := persister.newRewriteHandler() tmpAof.LoadAof(int(ctx.fileSize)) // rewrite aof tmpFile for i := 0; i < config.Properties.Databases; i++ { // select db data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes() _, err := tmpFile.Write(data) if err != nil { return err } // dump db tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool { cmd := EntityToCmd(key, entity) if cmd != nil { _, _ = tmpFile.Write(cmd.ToBytes()) } if expiration != nil { cmd := MakeExpireCmd(key, *expiration) if cmd != nil { _, _ = tmpFile.Write(cmd.ToBytes()) } } return true }) } return nil } // StartRewrite prepares rewrite procedure func (persister *Persister) StartRewrite() (*RewriteCtx, error) { persister.pausingAof.Lock() // pausing aof defer persister.pausingAof.Unlock() err := persister.aofFile.Sync() if err != nil { logger.Warn("fsync failed") return nil, err } // get current aof file size fileInfo, _ := os.Stat(persister.aofFilename) filesize := fileInfo.Size() // create tmp file file, err := ioutil.TempFile("", "*.aof") if err != nil { logger.Warn("tmp file create failed") return nil, err } return &RewriteCtx{ tmpFile: file, fileSize: filesize, dbIdx: persister.currentDB, }, nil } // FinishRewrite finish rewrite procedure func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { persister.pausingAof.Lock() // pausing aof defer persister.pausingAof.Unlock() tmpFile := ctx.tmpFile // write commands executed during rewriting to tmp file src, err := os.Open(persister.aofFilename) if err != nil { logger.Error("open aofFilename failed: " + err.Error()) return } defer func() { _ = src.Close() }() _, err = src.Seek(ctx.fileSize, 0) if err != nil { logger.Error("seek failed: " + err.Error()) return } // sync tmpFile's db index with online aofFile data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes() _, err = tmpFile.Write(data) if err != nil { logger.Error("tmp file rewrite failed: " + err.Error()) return } // copy data _, err = io.Copy(tmpFile, src) if err != nil { logger.Error("copy aof filed failed: " + err.Error()) return } tmpFileName := tmpFile.Name() _ = tmpFile.Close() // replace current aof file by tmp file _ = persister.aofFile.Close() _ = os.Rename(tmpFileName, persister.aofFilename) // reopen aof file for further write aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { panic(err) } persister.aofFile = aofFile // write select command again to ensure aof file has the same db index with persister.currentDB data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes() _, err = persister.aofFile.Write(data) if err != nil { panic(err) } }