Files
redis-go/aof/rdb.go

196 lines
4.8 KiB
Go

package aof
import (
"os"
"strconv"
"time"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/datastruct/dict"
List "github.com/hdt3213/godis/datastruct/list"
"github.com/hdt3213/godis/datastruct/set"
SortedSet "github.com/hdt3213/godis/datastruct/sortedset"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
rdb "github.com/hdt3213/rdb/encoder"
"github.com/hdt3213/rdb/model"
)
// todo: forbid concurrent rewrite
// GenerateRDB generates rdb file from aof file
func (persister *Persister) GenerateRDB(rdbFilename string) error {
ctx, err := persister.startGenerateRDB(nil, nil)
if err != nil {
return err
}
err = persister.generateRDB(ctx)
if err != nil {
return err
}
err = ctx.tmpFile.Close()
if err != nil {
return err
}
err = os.Rename(ctx.tmpFile.Name(), rdbFilename)
if err != nil {
return err
}
return nil
}
// GenerateRDBForReplication asynchronously generates rdb file from aof file and returns a channel to receive following data
// parameter listener would receive following updates of rdb
// parameter hook allows you to do something during aof pausing
func (persister *Persister) GenerateRDBForReplication(rdbFilename string, listener Listener, hook func()) error {
ctx, err := persister.startGenerateRDB(listener, hook)
if err != nil {
return err
}
err = persister.generateRDB(ctx)
if err != nil {
return err
}
err = ctx.tmpFile.Close()
if err != nil {
return err
}
err = os.Rename(ctx.tmpFile.Name(), rdbFilename)
if err != nil {
return err
}
return nil
}
func (persister *Persister) startGenerateRDB(newListener Listener, hook func()) (*RewriteCtx, error) {
persister.pausingAof.Lock() // pausing aof
defer persister.pausingAof.Unlock()
err := persister.aofFile.Sync()
if err != nil {
logger.Warn("fsync failed")
return nil, err
}
// get current aof file size
fileInfo, _ := os.Stat(persister.aofFilename)
filesize := fileInfo.Size()
// create tmp file
file, err := os.CreateTemp(config.GetTmpDir(), "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
if newListener != nil {
persister.listeners[newListener] = struct{}{}
}
if hook != nil {
hook()
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
}, nil
}
// generateRDB generates rdb file from aof file
func (persister *Persister) generateRDB(ctx *RewriteCtx) error {
// load aof tmpFile
tmpHandler := persister.newRewriteHandler()
tmpHandler.LoadAof(int(ctx.fileSize))
encoder := rdb.NewEncoder(ctx.tmpFile).EnableCompress()
err := encoder.WriteHeader()
if err != nil {
return err
}
auxMap := map[string]string{
"redis-ver": "6.0.0",
"redis-bits": "64",
"aof-preamble": "0",
"ctime": strconv.FormatInt(time.Now().Unix(), 10),
}
// change aof preamble
if config.Properties.AofUseRdbPreamble {
auxMap["aof-preamble"] = "1"
}
for k, v := range auxMap {
err := encoder.WriteAux(k, v)
if err != nil {
return err
}
}
for i := 0; i < config.Properties.Databases; i++ {
keyCount, ttlCount := tmpHandler.db.GetDBSize(i)
if keyCount == 0 {
continue
}
err = encoder.WriteDBHeader(uint(i), uint64(keyCount), uint64(ttlCount))
if err != nil {
return err
}
// dump db
var err2 error
tmpHandler.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
var opts []interface{}
if expiration != nil {
opts = append(opts, rdb.WithTTL(uint64(expiration.UnixNano()/1e6)))
}
switch obj := entity.Data.(type) {
case []byte:
err = encoder.WriteStringObject(key, obj, opts...)
case List.List:
vals := make([][]byte, 0, obj.Len())
obj.ForEach(func(i int, v interface{}) bool {
bytes, _ := v.([]byte)
vals = append(vals, bytes)
return true
})
err = encoder.WriteListObject(key, vals, opts...)
case *set.Set:
vals := make([][]byte, 0, obj.Len())
obj.ForEach(func(m string) bool {
vals = append(vals, []byte(m))
return true
})
err = encoder.WriteSetObject(key, vals, opts...)
case dict.Dict:
hash := make(map[string][]byte)
obj.ForEach(func(key string, val interface{}) bool {
bytes, _ := val.([]byte)
hash[key] = bytes
return true
})
err = encoder.WriteHashMapObject(key, hash, opts...)
case *SortedSet.SortedSet:
var entries []*model.ZSetEntry
obj.ForEachByRank(int64(0), obj.Len(), true, func(element *SortedSet.Element) bool {
entries = append(entries, &model.ZSetEntry{
Member: element.Member,
Score: element.Score,
})
return true
})
err = encoder.WriteZSetObject(key, entries, opts...)
}
if err != nil {
err2 = err
return false
}
return true
})
if err2 != nil {
return err2
}
}
err = encoder.WriteEnd()
if err != nil {
return err
}
return nil
}