rename Persister receiver

This commit is contained in:
finley
2022-12-25 21:34:07 +08:00
parent 9752c7e7e9
commit 8089b357e4
4 changed files with 84 additions and 85 deletions

View File

@@ -49,38 +49,38 @@ type Persister struct {
// NewPersister creates a new aof.Persister // NewPersister creates a new aof.Persister
func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker func() database.DBEngine) (*Persister, error) { func NewPersister(db database.DBEngine, filename string, load bool, tmpDBMaker func() database.DBEngine) (*Persister, error) {
handler := &Persister{} persister := &Persister{}
handler.aofFilename = filename persister.aofFilename = filename
handler.db = db persister.db = db
handler.tmpDBMaker = tmpDBMaker persister.tmpDBMaker = tmpDBMaker
if load { if load {
handler.LoadAof(0) persister.LoadAof(0)
} }
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil { if err != nil {
return nil, err return nil, err
} }
handler.aofFile = aofFile persister.aofFile = aofFile
handler.aofChan = make(chan *payload, aofQueueSize) persister.aofChan = make(chan *payload, aofQueueSize)
handler.aofFinished = make(chan struct{}) persister.aofFinished = make(chan struct{})
handler.listeners = make(map[Listener]struct{}) persister.listeners = make(map[Listener]struct{})
go func() { go func() {
handler.handleAof() persister.handleAof()
}() }()
return handler, nil return persister, nil
} }
// RemoveListener removes a listener from aof handler, so we can close the listener // RemoveListener removes a listener from aof handler, so we can close the listener
func (handler *Persister) RemoveListener(listener Listener) { func (persister *Persister) RemoveListener(listener Listener) {
handler.pausingAof.Lock() persister.pausingAof.Lock()
defer handler.pausingAof.Unlock() defer persister.pausingAof.Unlock()
delete(handler.listeners, listener) delete(persister.listeners, listener)
} }
// AddAof send command to aof goroutine through channel // AddAof send command to aof goroutine through channel
func (handler *Persister) AddAof(dbIndex int, cmdLine CmdLine) { func (persister *Persister) AddAof(dbIndex int, cmdLine CmdLine) {
if handler.aofChan != nil { if persister.aofChan != nil {
handler.aofChan <- &payload{ persister.aofChan <- &payload{
cmdLine: cmdLine, cmdLine: cmdLine,
dbIndex: dbIndex, dbIndex: dbIndex,
} }
@@ -88,51 +88,51 @@ func (handler *Persister) AddAof(dbIndex int, cmdLine CmdLine) {
} }
// handleAof listen aof channel and write into file // handleAof listen aof channel and write into file
func (handler *Persister) handleAof() { func (persister *Persister) handleAof() {
// serialized execution // serialized execution
var cmdLines []CmdLine var cmdLines []CmdLine
handler.currentDB = 0 persister.currentDB = 0
for p := range handler.aofChan { for p := range persister.aofChan {
cmdLines = cmdLines[:0] // reuse underlying array cmdLines = cmdLines[:0] // reuse underlying array
handler.pausingAof.RLock() // prevent other goroutines from pausing aof persister.pausingAof.RLock() // prevent other goroutines from pausing aof
if p.dbIndex != handler.currentDB { if p.dbIndex != persister.currentDB {
// select db // select db
selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex)) selectCmd := utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))
cmdLines = append(cmdLines, selectCmd) cmdLines = append(cmdLines, selectCmd)
data := protocol.MakeMultiBulkReply(selectCmd).ToBytes() data := protocol.MakeMultiBulkReply(selectCmd).ToBytes()
_, err := handler.aofFile.Write(data) _, err := persister.aofFile.Write(data)
if err != nil { if err != nil {
logger.Warn(err) logger.Warn(err)
handler.pausingAof.RUnlock() persister.pausingAof.RUnlock()
continue // skip this command continue // skip this command
} }
handler.currentDB = p.dbIndex persister.currentDB = p.dbIndex
} }
data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes() data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
cmdLines = append(cmdLines, p.cmdLine) cmdLines = append(cmdLines, p.cmdLine)
_, err := handler.aofFile.Write(data) _, err := persister.aofFile.Write(data)
if err != nil { if err != nil {
logger.Warn(err) logger.Warn(err)
} }
handler.pausingAof.RUnlock() persister.pausingAof.RUnlock()
for listener := range handler.listeners { for listener := range persister.listeners {
listener.Callback(cmdLines) listener.Callback(cmdLines)
} }
} }
handler.aofFinished <- struct{}{} persister.aofFinished <- struct{}{}
} }
// LoadAof read aof file, can only be used before Persister.handleAof started // LoadAof read aof file, can only be used before Persister.handleAof started
func (handler *Persister) LoadAof(maxBytes int) { func (persister *Persister) LoadAof(maxBytes int) {
// handler.db.Exec may call handler.addAof // persister.db.Exec may call persister.addAof
// delete aofChan to prevent loaded commands back into aofChan // delete aofChan to prevent loaded commands back into aofChan
aofChan := handler.aofChan aofChan := persister.aofChan
handler.aofChan = nil persister.aofChan = nil
defer func(aofChan chan *payload) { defer func(aofChan chan *payload) {
handler.aofChan = aofChan persister.aofChan = aofChan
}(aofChan) }(aofChan)
file, err := os.Open(handler.aofFilename) file, err := os.Open(persister.aofFilename)
if err != nil { if err != nil {
if _, ok := err.(*os.PathError); ok { if _, ok := err.(*os.PathError); ok {
return return
@@ -167,7 +167,7 @@ func (handler *Persister) LoadAof(maxBytes int) {
logger.Error("require multi bulk protocol") logger.Error("require multi bulk protocol")
continue continue
} }
ret := handler.db.Exec(fakeConn, r.Args) ret := persister.db.Exec(fakeConn, r.Args)
if protocol.IsErrorReply(ret) { if protocol.IsErrorReply(ret) {
logger.Error("exec err", string(ret.ToBytes())) logger.Error("exec err", string(ret.ToBytes()))
} }
@@ -175,11 +175,11 @@ func (handler *Persister) LoadAof(maxBytes int) {
} }
// Close gracefully stops aof persistence procedure // Close gracefully stops aof persistence procedure
func (handler *Persister) Close() { func (persister *Persister) Close() {
if handler.aofFile != nil { if persister.aofFile != nil {
close(handler.aofChan) close(persister.aofChan)
<-handler.aofFinished // wait for aof finished <-persister.aofFinished // wait for aof finished
err := handler.aofFile.Close() err := persister.aofFile.Close()
if err != nil { if err != nil {
logger.Warn(err) logger.Warn(err)
} }

View File

@@ -19,12 +19,12 @@ import (
// todo: forbid concurrent rewrite // todo: forbid concurrent rewrite
// Rewrite2RDB rewrite aof data into rdb // Rewrite2RDB rewrite aof data into rdb
func (handler *Persister) Rewrite2RDB(rdbFilename string) error { func (persister *Persister) Rewrite2RDB(rdbFilename string) error {
ctx, err := handler.startRewrite2RDB(nil, nil) ctx, err := persister.startRewrite2RDB(nil, nil)
if err != nil { if err != nil {
return err return err
} }
err = handler.rewrite2RDB(ctx) err = persister.rewrite2RDB(ctx)
if err != nil { if err != nil {
return err return err
} }
@@ -42,12 +42,12 @@ func (handler *Persister) Rewrite2RDB(rdbFilename string) error {
// Rewrite2RDBForReplication asynchronously rewrite aof data into rdb and returns a channel to receive following data // Rewrite2RDBForReplication asynchronously rewrite aof data into rdb and returns a channel to receive following data
// parameter listener would receive following updates of rdb // parameter listener would receive following updates of rdb
// parameter hook allows you to do something during aof pausing // parameter hook allows you to do something during aof pausing
func (handler *Persister) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error { func (persister *Persister) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error {
ctx, err := handler.startRewrite2RDB(listener, hook) ctx, err := persister.startRewrite2RDB(listener, hook)
if err != nil { if err != nil {
return err return err
} }
err = handler.rewrite2RDB(ctx) err = persister.rewrite2RDB(ctx)
if err != nil { if err != nil {
return err return err
} }
@@ -62,18 +62,18 @@ func (handler *Persister) Rewrite2RDBForReplication(rdbFilename string, listener
return nil return nil
} }
func (handler *Persister) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) { func (persister *Persister) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) {
handler.pausingAof.Lock() // pausing aof persister.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock() defer persister.pausingAof.Unlock()
err := handler.aofFile.Sync() err := persister.aofFile.Sync()
if err != nil { if err != nil {
logger.Warn("fsync failed") logger.Warn("fsync failed")
return nil, err return nil, err
} }
// get current aof file size // get current aof file size
fileInfo, _ := os.Stat(handler.aofFilename) fileInfo, _ := os.Stat(persister.aofFilename)
filesize := fileInfo.Size() filesize := fileInfo.Size()
// create tmp file // create tmp file
file, err := ioutil.TempFile("", "*.aof") file, err := ioutil.TempFile("", "*.aof")
@@ -82,7 +82,7 @@ func (handler *Persister) startRewrite2RDB(newListener Listener, hook func()) (*
return nil, err return nil, err
} }
if newListener != nil { if newListener != nil {
handler.listeners[newListener] = struct{}{} persister.listeners[newListener] = struct{}{}
} }
if hook != nil { if hook != nil {
hook() hook()
@@ -93,9 +93,9 @@ func (handler *Persister) startRewrite2RDB(newListener Listener, hook func()) (*
}, nil }, nil
} }
func (handler *Persister) rewrite2RDB(ctx *RewriteCtx) error { func (persister *Persister) rewrite2RDB(ctx *RewriteCtx) error {
// load aof tmpFile // load aof tmpFile
tmpHandler := handler.newRewriteHandler() tmpHandler := persister.newRewriteHandler()
tmpHandler.LoadAof(int(ctx.fileSize)) tmpHandler.LoadAof(int(ctx.fileSize))
encoder := rdb.NewEncoder(ctx.tmpFile).EnableCompress() encoder := rdb.NewEncoder(ctx.tmpFile).EnableCompress()
err := encoder.WriteHeader() err := encoder.WriteHeader()

View File

@@ -13,10 +13,10 @@ import (
"time" "time"
) )
func (handler *Persister) newRewriteHandler() *Persister { func (persister *Persister) newRewriteHandler() *Persister {
h := &Persister{} h := &Persister{}
h.aofFilename = handler.aofFilename h.aofFilename = persister.aofFilename
h.db = handler.tmpDBMaker() h.db = persister.tmpDBMaker()
return h return h
} }
@@ -28,27 +28,27 @@ type RewriteCtx struct {
} }
// Rewrite carries out AOF rewrite // Rewrite carries out AOF rewrite
func (handler *Persister) Rewrite() error { func (persister *Persister) Rewrite() error {
ctx, err := handler.StartRewrite() ctx, err := persister.StartRewrite()
if err != nil { if err != nil {
return err return err
} }
err = handler.DoRewrite(ctx) err = persister.DoRewrite(ctx)
if err != nil { if err != nil {
return err return err
} }
handler.FinishRewrite(ctx) persister.FinishRewrite(ctx)
return nil return nil
} }
// DoRewrite actually rewrite aof file // DoRewrite actually rewrite aof file
// makes DoRewrite public for testing only, please use Rewrite instead // makes DoRewrite public for testing only, please use Rewrite instead
func (handler *Persister) DoRewrite(ctx *RewriteCtx) error { func (persister *Persister) DoRewrite(ctx *RewriteCtx) error {
tmpFile := ctx.tmpFile tmpFile := ctx.tmpFile
// load aof tmpFile // load aof tmpFile
tmpAof := handler.newRewriteHandler() tmpAof := persister.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize)) tmpAof.LoadAof(int(ctx.fileSize))
// rewrite aof tmpFile // rewrite aof tmpFile
@@ -78,18 +78,18 @@ func (handler *Persister) DoRewrite(ctx *RewriteCtx) error {
} }
// StartRewrite prepares rewrite procedure // StartRewrite prepares rewrite procedure
func (handler *Persister) StartRewrite() (*RewriteCtx, error) { func (persister *Persister) StartRewrite() (*RewriteCtx, error) {
handler.pausingAof.Lock() // pausing aof persister.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock() defer persister.pausingAof.Unlock()
err := handler.aofFile.Sync() err := persister.aofFile.Sync()
if err != nil { if err != nil {
logger.Warn("fsync failed") logger.Warn("fsync failed")
return nil, err return nil, err
} }
// get current aof file size // get current aof file size
fileInfo, _ := os.Stat(handler.aofFilename) fileInfo, _ := os.Stat(persister.aofFilename)
filesize := fileInfo.Size() filesize := fileInfo.Size()
// create tmp file // create tmp file
@@ -101,18 +101,18 @@ func (handler *Persister) StartRewrite() (*RewriteCtx, error) {
return &RewriteCtx{ return &RewriteCtx{
tmpFile: file, tmpFile: file,
fileSize: filesize, fileSize: filesize,
dbIdx: handler.currentDB, dbIdx: persister.currentDB,
}, nil }, nil
} }
// FinishRewrite finish rewrite procedure // FinishRewrite finish rewrite procedure
func (handler *Persister) FinishRewrite(ctx *RewriteCtx) { func (persister *Persister) FinishRewrite(ctx *RewriteCtx) {
handler.pausingAof.Lock() // pausing aof persister.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock() defer persister.pausingAof.Unlock()
tmpFile := ctx.tmpFile tmpFile := ctx.tmpFile
// write commands executed during rewriting to tmp file // write commands executed during rewriting to tmp file
src, err := os.Open(handler.aofFilename) src, err := os.Open(persister.aofFilename)
if err != nil { if err != nil {
logger.Error("open aofFilename failed: " + err.Error()) logger.Error("open aofFilename failed: " + err.Error())
return return
@@ -141,19 +141,19 @@ func (handler *Persister) FinishRewrite(ctx *RewriteCtx) {
} }
// replace current aof file by tmp file // replace current aof file by tmp file
_ = handler.aofFile.Close() _ = persister.aofFile.Close()
_ = os.Rename(tmpFile.Name(), handler.aofFilename) _ = os.Rename(tmpFile.Name(), persister.aofFilename)
// reopen aof file for further write // reopen aof file for further write
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil { if err != nil {
panic(err) panic(err)
} }
handler.aofFile = aofFile persister.aofFile = aofFile
// write select command again to ensure aof file has the same db index with handler.currentDB // write select command again to ensure aof file has the same db index with persister.currentDB
data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes() data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes()
_, err = handler.aofFile.Write(data) _, err = persister.aofFile.Write(data)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -70,7 +70,6 @@ func (server *Server) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply
atomic.StoreInt32(&server.role, slaveRole) atomic.StoreInt32(&server.role, slaveRole)
server.slaveStatus.masterHost = host server.slaveStatus.masterHost = host
server.slaveStatus.masterPort = port server.slaveStatus.masterPort = port
// use buffered channel in case receiver goroutine exited before controller send stop signal
atomic.AddInt32(&server.slaveStatus.configVersion, 1) atomic.AddInt32(&server.slaveStatus.configVersion, 1)
server.slaveStatus.mutex.Unlock() server.slaveStatus.mutex.Unlock()
go server.setupMaster() go server.setupMaster()