mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-13 04:23:54 +08:00
bug fix: aof rewrite
This commit is contained in:
@@ -60,7 +60,7 @@ func trim(msg []byte) string {
|
||||
}
|
||||
|
||||
// read aof file
|
||||
func (db *DB) loadAof() {
|
||||
func (db *DB) loadAof(maxBytes int) {
|
||||
file, err := os.Open(db.aofFilename)
|
||||
if err != nil {
|
||||
if _, ok := err.(*os.PathError); ok {
|
||||
@@ -78,7 +78,11 @@ func (db *DB) loadAof() {
|
||||
var args [][]byte
|
||||
processing := false
|
||||
var msg []byte
|
||||
readBytes := 0
|
||||
for {
|
||||
if maxBytes != 0 && readBytes >= maxBytes {
|
||||
break
|
||||
}
|
||||
if fixedLen == 0 {
|
||||
msg, err = reader.ReadBytes('\n')
|
||||
if err == io.EOF {
|
||||
@@ -88,9 +92,10 @@ func (db *DB) loadAof() {
|
||||
logger.Warn("invalid format: line should end with \\r\\n")
|
||||
return
|
||||
}
|
||||
readBytes += len(msg)
|
||||
} else {
|
||||
msg = make([]byte, fixedLen+2)
|
||||
_, err = io.ReadFull(reader, msg)
|
||||
n, err := io.ReadFull(reader, msg)
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
@@ -99,6 +104,7 @@ func (db *DB) loadAof() {
|
||||
return
|
||||
}
|
||||
fixedLen = 0
|
||||
readBytes += n
|
||||
}
|
||||
if err != nil {
|
||||
logger.Warn(err)
|
||||
@@ -161,7 +167,7 @@ func (db *DB) loadAof() {
|
||||
|
||||
/*-- aof rewrite --*/
|
||||
func (db *DB) aofRewrite() {
|
||||
file, err := db.startRewrite()
|
||||
file, fileSize, err := db.startRewrite()
|
||||
if err != nil {
|
||||
logger.Warn(err)
|
||||
return
|
||||
@@ -176,7 +182,7 @@ func (db *DB) aofRewrite() {
|
||||
|
||||
aofFilename: db.aofFilename,
|
||||
}
|
||||
tmpDB.loadAof()
|
||||
tmpDB.loadAof(int(fileSize))
|
||||
|
||||
// rewrite aof file
|
||||
tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
|
||||
@@ -285,20 +291,24 @@ func persistZSet(key string, zset *SortedSet.SortedSet) *reply.MultiBulkReply {
|
||||
return reply.MakeMultiBulkReply(args)
|
||||
}
|
||||
|
||||
func (db *DB) startRewrite() (*os.File, error) {
|
||||
func (db *DB) startRewrite() (*os.File, int64, error) {
|
||||
db.pausingAof.Lock() // pausing aof
|
||||
defer db.pausingAof.Unlock()
|
||||
|
||||
// create rewrite channel
|
||||
db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)
|
||||
|
||||
// get current aof file size
|
||||
fileInfo, _ := os.Stat(db.aofFilename)
|
||||
filesize := fileInfo.Size()
|
||||
|
||||
// create tmp file
|
||||
file, err := ioutil.TempFile("", "aof")
|
||||
if err != nil {
|
||||
logger.Warn("tmp file create failed")
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
return file, nil
|
||||
return file, filesize, nil
|
||||
}
|
||||
|
||||
func (db *DB) finishRewrite(tmpFile *os.File) {
|
||||
|
@@ -82,7 +82,7 @@ func MakeDB() *DB {
|
||||
|
||||
if config.Properties.AppendOnly {
|
||||
db.aofFilename = config.Properties.AppendFilename
|
||||
db.loadAof()
|
||||
db.loadAof(0)
|
||||
aofFile, err := os.OpenFile(db.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
logger.Warn(err)
|
||||
|
Reference in New Issue
Block a user