From 8089b357e4c45274228e76297cf8abea88aeaabe Mon Sep 17 00:00:00 2001 From: finley Date: Sun, 25 Dec 2022 21:34:07 +0800 Subject: [PATCH] rename Persister receiver --- aof/aof.go | 88 +++++++++++++++++------------------ aof/rdb.go | 28 +++++------ aof/rewrite.go | 52 ++++++++++----------- database/replication_slave.go | 1 - 4 files changed, 84 insertions(+), 85 deletions(-) diff --git a/aof/aof.go b/aof/aof.go index 7c8fe87..ba422aa 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -49,38 +49,38 @@ type Persister struct { // NewPersister creates a new aof.Persister func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker func() database.DBEngine) (*Persister, error) { - handler := &Persister{} - handler.aofFilename = filename - handler.db = db - handler.tmpDBMaker = tmpDBMaker + persister := &Persister{} + persister.aofFilename = filename + persister.db = db + persister.tmpDBMaker = tmpDBMaker if load { - handler.LoadAof(0) + persister.LoadAof(0) } - aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) + aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } - handler.aofFile = aofFile - handler.aofChan = make(chan *payload, aofQueueSize) - handler.aofFinished = make(chan struct{}) - handler.listeners = make(map[Listener]struct{}) + persister.aofFile = aofFile + persister.aofChan = make(chan *payload, aofQueueSize) + persister.aofFinished = make(chan struct{}) + persister.listeners = make(map[Listener]struct{}) go func() { - handler.handleAof() + persister.handleAof() }() - return handler, nil + return persister, nil } // RemoveListener removes a listener from aof handler, so we can close the listener -func (handler *Persister) RemoveListener(listener Listener) { - handler.pausingAof.Lock() - defer handler.pausingAof.Unlock() - delete(handler.listeners, listener) +func (persister *Persister) RemoveListener(listener Listener) { + persister.pausingAof.Lock() + defer persister.pausingAof.Unlock() + delete(persister.listeners, listener) } // AddAof send command to aof goroutine through channel -func (handler *Persister) AddAof(dbIndex int, cmdLine CmdLine) { - if handler.aofChan != nil { - handler.aofChan <- &payload{ +func (persister *Persister) AddAof(dbIndex int, cmdLine CmdLine) { + if persister.aofChan != nil { + persister.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } @@ -88,51 +88,51 @@ func (handler *Persister) AddAof(dbIndex int, cmdLine CmdLine) { } // handleAof listen aof channel and write into file -func (handler *Persister) handleAof() { +func (persister *Persister) handleAof() { // serialized execution var cmdLines []CmdLine - handler.currentDB = 0 - for p := range handler.aofChan { - cmdLines = cmdLines[:0] // reuse underlying array - handler.pausingAof.RLock() // prevent other goroutines from pausing aof - if p.dbIndex != handler.currentDB { + persister.currentDB = 0 + for p := range persister.aofChan { + cmdLines = cmdLines[:0] // reuse underlying array + persister.pausingAof.RLock() // prevent other goroutines from pausing aof + if p.dbIndex != persister.currentDB { // select db selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex)) cmdLines = append(cmdLines, selectCmd) data := protocol.MakeMultiBulkReply(selectCmd).ToBytes() - _, err := handler.aofFile.Write(data) + _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) - handler.pausingAof.RUnlock() + persister.pausingAof.RUnlock() continue // skip this command } - handler.currentDB = p.dbIndex + persister.currentDB = p.dbIndex } data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes() cmdLines = append(cmdLines, p.cmdLine) - _, err := handler.aofFile.Write(data) + _, err := persister.aofFile.Write(data) if err != nil { logger.Warn(err) } - handler.pausingAof.RUnlock() - for listener := range handler.listeners { + persister.pausingAof.RUnlock() + for listener := range persister.listeners { listener.Callback(cmdLines) } } - handler.aofFinished <- struct{}{} + persister.aofFinished <- struct{}{} } // LoadAof read aof file, can only be used before Persister.handleAof started -func (handler *Persister) LoadAof(maxBytes int) { - // handler.db.Exec may call handler.addAof +func (persister *Persister) LoadAof(maxBytes int) { + // persister.db.Exec may call persister.addAof // delete aofChan to prevent loaded commands back into aofChan - aofChan := handler.aofChan - handler.aofChan = nil + aofChan := persister.aofChan + persister.aofChan = nil defer func(aofChan chan *payload) { - handler.aofChan = aofChan + persister.aofChan = aofChan }(aofChan) - file, err := os.Open(handler.aofFilename) + file, err := os.Open(persister.aofFilename) if err != nil { if _, ok := err.(*os.PathError); ok { return @@ -167,7 +167,7 @@ func (handler *Persister) LoadAof(maxBytes int) { logger.Error("require multi bulk protocol") continue } - ret := handler.db.Exec(fakeConn, r.Args) + ret := persister.db.Exec(fakeConn, r.Args) if protocol.IsErrorReply(ret) { logger.Error("exec err", string(ret.ToBytes())) } @@ -175,11 +175,11 @@ func (handler *Persister) LoadAof(maxBytes int) { } // Close gracefully stops aof persistence procedure -func (handler *Persister) Close() { - if handler.aofFile != nil { - close(handler.aofChan) - <-handler.aofFinished // wait for aof finished - err := handler.aofFile.Close() +func (persister *Persister) Close() { + if persister.aofFile != nil { + close(persister.aofChan) + <-persister.aofFinished // wait for aof finished + err := persister.aofFile.Close() if err != nil { logger.Warn(err) } diff --git a/aof/rdb.go b/aof/rdb.go index d453d31..927be46 100644 --- a/aof/rdb.go +++ b/aof/rdb.go @@ -19,12 +19,12 @@ import ( // todo: forbid concurrent rewrite // Rewrite2RDB rewrite aof data into rdb -func (handler *Persister) Rewrite2RDB(rdbFilename string) error { - ctx, err := handler.startRewrite2RDB(nil, nil) +func (persister *Persister) Rewrite2RDB(rdbFilename string) error { + ctx, err := persister.startRewrite2RDB(nil, nil) if err != nil { return err } - err = handler.rewrite2RDB(ctx) + err = persister.rewrite2RDB(ctx) if err != nil { return err } @@ -42,12 +42,12 @@ func (handler *Persister) Rewrite2RDB(rdbFilename string) error { // Rewrite2RDBForReplication asynchronously rewrite aof data into rdb and returns a channel to receive following data // parameter listener would receive following updates of rdb // parameter hook allows you to do something during aof pausing -func (handler *Persister) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error { - ctx, err := handler.startRewrite2RDB(listener, hook) +func (persister *Persister) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error { + ctx, err := persister.startRewrite2RDB(listener, hook) if err != nil { return err } - err = handler.rewrite2RDB(ctx) + err = persister.rewrite2RDB(ctx) if err != nil { return err } @@ -62,18 +62,18 @@ func (handler *Persister) Rewrite2RDBForReplication(rdbFilename string, listener return nil } -func (handler *Persister) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) { - handler.pausingAof.Lock() // pausing aof - defer handler.pausingAof.Unlock() +func (persister *Persister) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) { + persister.pausingAof.Lock() // pausing aof + defer persister.pausingAof.Unlock() - err := handler.aofFile.Sync() + err := persister.aofFile.Sync() if err != nil { logger.Warn("fsync failed") return nil, err } // get current aof file size - fileInfo, _ := os.Stat(handler.aofFilename) + fileInfo, _ := os.Stat(persister.aofFilename) filesize := fileInfo.Size() // create tmp file file, err := ioutil.TempFile("", "*.aof") @@ -82,7 +82,7 @@ func (handler *Persister) startRewrite2RDB(newListener Listener, hook func()) (* return nil, err } if newListener != nil { - handler.listeners[newListener] = struct{}{} + persister.listeners[newListener] = struct{}{} } if hook != nil { hook() @@ -93,9 +93,9 @@ func (handler *Persister) startRewrite2RDB(newListener Listener, hook func()) (* }, nil } -func (handler *Persister) rewrite2RDB(ctx *RewriteCtx) error { +func (persister *Persister) rewrite2RDB(ctx *RewriteCtx) error { // load aof tmpFile - tmpHandler := handler.newRewriteHandler() + tmpHandler := persister.newRewriteHandler() tmpHandler.LoadAof(int(ctx.fileSize)) encoder := rdb.NewEncoder(ctx.tmpFile).EnableCompress() err := encoder.WriteHeader() diff --git a/aof/rewrite.go b/aof/rewrite.go index 72bdd5e..941c15b 100644 --- a/aof/rewrite.go +++ b/aof/rewrite.go @@ -13,10 +13,10 @@ import ( "time" ) -func (handler *Persister) newRewriteHandler() *Persister { +func (persister *Persister) newRewriteHandler() *Persister { h := &Persister{} - h.aofFilename = handler.aofFilename - h.db = handler.tmpDBMaker() + h.aofFilename = persister.aofFilename + h.db = persister.tmpDBMaker() return h } @@ -28,27 +28,27 @@ type RewriteCtx struct { } // Rewrite carries out AOF rewrite -func (handler *Persister) Rewrite() error { - ctx, err := handler.StartRewrite() +func (persister *Persister) Rewrite() error { + ctx, err := persister.StartRewrite() if err != nil { return err } - err = handler.DoRewrite(ctx) + err = persister.DoRewrite(ctx) if err != nil { return err } - handler.FinishRewrite(ctx) + persister.FinishRewrite(ctx) return nil } // DoRewrite actually rewrite aof file // makes DoRewrite public for testing only, please use Rewrite instead -func (handler *Persister) DoRewrite(ctx *RewriteCtx) error { +func (persister *Persister) DoRewrite(ctx *RewriteCtx) error { tmpFile := ctx.tmpFile // load aof tmpFile - tmpAof := handler.newRewriteHandler() + tmpAof := persister.newRewriteHandler() tmpAof.LoadAof(int(ctx.fileSize)) // rewrite aof tmpFile @@ -78,18 +78,18 @@ func (handler *Persister) DoRewrite(ctx *RewriteCtx) error { } // StartRewrite prepares rewrite procedure -func (handler *Persister) StartRewrite() (*RewriteCtx, error) { - handler.pausingAof.Lock() // pausing aof - defer handler.pausingAof.Unlock() +func (persister *Persister) StartRewrite() (*RewriteCtx, error) { + persister.pausingAof.Lock() // pausing aof + defer persister.pausingAof.Unlock() - err := handler.aofFile.Sync() + err := persister.aofFile.Sync() if err != nil { logger.Warn("fsync failed") return nil, err } // get current aof file size - fileInfo, _ := os.Stat(handler.aofFilename) + fileInfo, _ := os.Stat(persister.aofFilename) filesize := fileInfo.Size() // create tmp file @@ -101,18 +101,18 @@ func (handler *Persister) StartRewrite() (*RewriteCtx, error) { return &RewriteCtx{ tmpFile: file, fileSize: filesize, - dbIdx: handler.currentDB, + dbIdx: persister.currentDB, }, nil } // FinishRewrite finish rewrite procedure -func (handler *Persister) FinishRewrite(ctx *RewriteCtx) { - handler.pausingAof.Lock() // pausing aof - defer handler.pausingAof.Unlock() +func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { + persister.pausingAof.Lock() // pausing aof + defer persister.pausingAof.Unlock() tmpFile := ctx.tmpFile // write commands executed during rewriting to tmp file - src, err := os.Open(handler.aofFilename) + src, err := os.Open(persister.aofFilename) if err != nil { logger.Error("open aofFilename failed: " + err.Error()) return @@ -141,19 +141,19 @@ func (handler *Persister) FinishRewrite(ctx *RewriteCtx) { } // replace current aof file by tmp file - _ = handler.aofFile.Close() - _ = os.Rename(tmpFile.Name(), handler.aofFilename) + _ = persister.aofFile.Close() + _ = os.Rename(tmpFile.Name(), persister.aofFilename) // reopen aof file for further write - aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) + aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { panic(err) } - handler.aofFile = aofFile + persister.aofFile = aofFile - // write select command again to ensure aof file has the same db index with handler.currentDB - data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes() - _, err = handler.aofFile.Write(data) + // write select command again to ensure aof file has the same db index with persister.currentDB + data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes() + _, err = persister.aofFile.Write(data) if err != nil { panic(err) } diff --git a/database/replication_slave.go b/database/replication_slave.go index a0afb60..02a6df2 100644 --- a/database/replication_slave.go +++ b/database/replication_slave.go @@ -70,7 +70,6 @@ func (server *Server) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply atomic.StoreInt32(&server.role, slaveRole) server.slaveStatus.masterHost = host server.slaveStatus.masterPort = port - // use buffered channel in case receiver goroutine exited before controller send stop signal atomic.AddInt32(&server.slaveStatus.configVersion, 1) server.slaveStatus.mutex.Unlock() go server.setupMaster()