diff --git a/aof/aof.go b/aof/aof.go index bfe02a1..5e171f6 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -2,18 +2,23 @@ package aof import ( "context" - "github.com/hdt3213/godis/interface/database" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" "io" "os" "strconv" "strings" "sync" "time" + + rdb "github.com/hdt3213/rdb/core" + + "github.com/hdt3213/godis/config" + + "github.com/hdt3213/godis/interface/database" + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/parser" + "github.com/hdt3213/godis/redis/protocol" ) // CmdLine is alias for [][]byte, represents a command line @@ -47,14 +52,18 @@ type Listener interface { // Persister receive msgs from channel and write to AOF file type Persister struct { - ctx context.Context - cancel context.CancelFunc - db database.DBEngine - tmpDBMaker func() database.DBEngine - aofChan chan *payload - aofFile *os.File + ctx context.Context + cancel context.CancelFunc + db database.DBEngine + tmpDBMaker func() database.DBEngine + // aofChan is the channel to receive aof payload(listenCmd will send payload to this channel) + aofChan chan *payload + // aofFile is the file handler of aof file + aofFile *os.File + // aofFilename is the path of aof file aofFilename string - aofFsync string + // aofFsync is the strategy of fsync + aofFsync string // aof goroutine will send msg to main goroutine through this channel when aof tasks finished and ready to shut down aofFinished chan struct{} // pause aof for start/finish aof rewrite progress @@ -73,6 +82,7 @@ func NewPersister(db database.DBEngine, filename string, load bool, fsync string persister.db = db persister.tmpDBMaker = tmpDBMaker persister.currentDB = 0 + // load aof file if needed if load { persister.LoadAof(0) } @@ -84,12 +94,14 @@ func NewPersister(db database.DBEngine, filename string, load bool, fsync string persister.aofChan = make(chan *payload, aofQueueSize) persister.aofFinished = make(chan struct{}) persister.listeners = make(map[Listener]struct{}) + // start aof goroutine to write aof file in background and fsync periodically if needed (see fsyncEverySecond) go func() { persister.listenCmd() }() ctx, cancel := context.WithCancel(context.Background()) persister.ctx = ctx persister.cancel = cancel + // fsync every second if needed if persister.aofFsync == FsyncEverySec { persister.fsyncEverySecond() } @@ -109,6 +121,7 @@ func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) { if persister.aofChan == nil { return } + if persister.aofFsync == FsyncAlways { p := &payload{ cmdLine: cmdLine, @@ -117,10 +130,12 @@ func (persister *Persister) SaveCmdLine(dbIndex int, cmdLine CmdLine) { persister.writeAof(p) return } + persister.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } + } // listenCmd listen aof channel and write into file @@ -165,7 +180,7 @@ func (persister *Persister) writeAof(p *payload) { // LoadAof read aof file, can only be used before Persister.listenCmd started func (persister *Persister) LoadAof(maxBytes int) { - // persister.db.Exec may call persister.addAof + // persister.db.Exec may call persister.AddAof // delete aofChan to prevent loaded commands back into aofChan aofChan := persister.aofChan persister.aofChan = nil @@ -183,6 +198,17 @@ func (persister *Persister) LoadAof(maxBytes int) { } defer file.Close() + // load rdb preamble if needed + decoder := rdb.NewDecoder(file) + err = persister.db.LoadRDB(decoder) + if err != nil { + // no rdb preamble + file.Seek(0, io.SeekStart) + } else { + // has rdb preamble + _, _ = file.Seek(int64(decoder.GetReadCount())+1, io.SeekStart) + maxBytes = maxBytes - decoder.GetReadCount() + } var reader io.Reader if maxBytes > 0 { reader = io.LimitReader(file, int64(maxBytes)) @@ -235,6 +261,7 @@ func (persister *Persister) Close() { persister.cancel() } +// fsyncEverySecond fsync aof file every second func (persister *Persister) fsyncEverySecond() { ticker := time.NewTicker(time.Second) go func() { @@ -252,3 +279,34 @@ func (persister *Persister) fsyncEverySecond() { } }() } + +func (persister *Persister) generateAof(ctx *RewriteCtx) error { + // rewrite aof tmpFile + tmpFile := ctx.tmpFile + // load aof tmpFile + tmpAof := persister.newRewriteHandler() + tmpAof.LoadAof(int(ctx.fileSize)) + for i := 0; i < config.Properties.Databases; i++ { + // select db + data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes() + _, err := tmpFile.Write(data) + if err != nil { + return err + } + // dump db + tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool { + cmd := EntityToCmd(key, entity) + if cmd != nil { + _, _ = tmpFile.Write(cmd.ToBytes()) + } + if expiration != nil { + cmd := MakeExpireCmd(key, *expiration) + if cmd != nil { + _, _ = tmpFile.Write(cmd.ToBytes()) + } + } + return true + }) + } + return nil +} diff --git a/aof/rdb.go b/aof/rdb.go index 927be46..1dd04e1 100644 --- a/aof/rdb.go +++ b/aof/rdb.go @@ -1,6 +1,10 @@ package aof import ( + "os" + "strconv" + "time" + "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/datastruct/dict" List "github.com/hdt3213/godis/datastruct/list" @@ -10,21 +14,17 @@ import ( "github.com/hdt3213/godis/lib/logger" rdb "github.com/hdt3213/rdb/encoder" "github.com/hdt3213/rdb/model" - "io/ioutil" - "os" - "strconv" - "time" ) // todo: forbid concurrent rewrite -// Rewrite2RDB rewrite aof data into rdb -func (persister *Persister) Rewrite2RDB(rdbFilename string) error { - ctx, err := persister.startRewrite2RDB(nil, nil) +// GenerateRDB generates rdb file from aof file +func (persister *Persister) GenerateRDB(rdbFilename string) error { + ctx, err := persister.startGenerateRDB(nil, nil) if err != nil { return err } - err = persister.rewrite2RDB(ctx) + err = persister.generateRDB(ctx) if err != nil { return err } @@ -39,15 +39,16 @@ func (persister *Persister) Rewrite2RDB(rdbFilename string) error { return nil } -// Rewrite2RDBForReplication asynchronously rewrite aof data into rdb and returns a channel to receive following data +// GenerateRDBForReplication asynchronously generates rdb file from aof file and returns a channel to receive following data // parameter listener would receive following updates of rdb // parameter hook allows you to do something during aof pausing -func (persister *Persister) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error { - ctx, err := persister.startRewrite2RDB(listener, hook) +func (persister *Persister) GenerateRDBForReplication(rdbFilename string, listener Listener, hook func()) error { + ctx, err := persister.startGenerateRDB(listener, hook) if err != nil { return err } - err = persister.rewrite2RDB(ctx) + + err = persister.generateRDB(ctx) if err != nil { return err } @@ -62,7 +63,7 @@ func (persister *Persister) Rewrite2RDBForReplication(rdbFilename string, listen return nil } -func (persister *Persister) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) { +func (persister *Persister) startGenerateRDB(newListener Listener, hook func()) (*RewriteCtx, error) { persister.pausingAof.Lock() // pausing aof defer persister.pausingAof.Unlock() @@ -76,7 +77,7 @@ func (persister *Persister) startRewrite2RDB(newListener Listener, hook func()) fileInfo, _ := os.Stat(persister.aofFilename) filesize := fileInfo.Size() // create tmp file - file, err := ioutil.TempFile("", "*.aof") + file, err := os.CreateTemp(config.GetTmpDir(), "*.aof") if err != nil { logger.Warn("tmp file create failed") return nil, err @@ -93,10 +94,12 @@ func (persister *Persister) startRewrite2RDB(newListener Listener, hook func()) }, nil } -func (persister *Persister) rewrite2RDB(ctx *RewriteCtx) error { +// generateRDB generates rdb file from aof file +func (persister *Persister) generateRDB(ctx *RewriteCtx) error { // load aof tmpFile tmpHandler := persister.newRewriteHandler() tmpHandler.LoadAof(int(ctx.fileSize)) + encoder := rdb.NewEncoder(ctx.tmpFile).EnableCompress() err := encoder.WriteHeader() if err != nil { @@ -108,6 +111,12 @@ func (persister *Persister) rewrite2RDB(ctx *RewriteCtx) error { "aof-preamble": "0", "ctime": strconv.FormatInt(time.Now().Unix(), 10), } + + // change aof preamble + if config.Properties.AofUseRdbPreamble { + auxMap["aof-preamble"] = "1" + } + for k, v := range auxMap { err := encoder.WriteAux(k, v) if err != nil { diff --git a/aof/rewrite.go b/aof/rewrite.go index 65377d7..8d3f762 100644 --- a/aof/rewrite.go +++ b/aof/rewrite.go @@ -1,16 +1,14 @@ package aof import ( + "io" + "os" + "strconv" + "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" - "io" - "io/ioutil" - "os" - "strconv" - "time" ) func (persister *Persister) newRewriteHandler() *Persister { @@ -22,7 +20,7 @@ func (persister *Persister) newRewriteHandler() *Persister { // RewriteCtx holds context of an AOF rewriting procedure type RewriteCtx struct { - tmpFile *os.File + tmpFile *os.File // tmpFile is the file handler of aof tmpFile fileSize int64 dbIdx int // selected db index when startRewrite } @@ -44,42 +42,22 @@ func (persister *Persister) Rewrite() error { // DoRewrite actually rewrite aof file // makes DoRewrite public for testing only, please use Rewrite instead -func (persister *Persister) DoRewrite(ctx *RewriteCtx) error { - tmpFile := ctx.tmpFile - - // load aof tmpFile - tmpAof := persister.newRewriteHandler() - tmpAof.LoadAof(int(ctx.fileSize)) - - // rewrite aof tmpFile - for i := 0; i < config.Properties.Databases; i++ { - // select db - data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes() - _, err := tmpFile.Write(data) - if err != nil { - return err - } - // dump db - tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool { - cmd := EntityToCmd(key, entity) - if cmd != nil { - _, _ = tmpFile.Write(cmd.ToBytes()) - } - if expiration != nil { - cmd := MakeExpireCmd(key, *expiration) - if cmd != nil { - _, _ = tmpFile.Write(cmd.ToBytes()) - } - } - return true - }) +func (persister *Persister) DoRewrite(ctx *RewriteCtx) (err error) { + // start rewrite + if !config.Properties.AofUseRdbPreamble { + logger.Info("generate aof preamble") + err = persister.generateAof(ctx) + } else { + logger.Info("generate rdb preamble") + err = persister.generateRDB(ctx) } - return nil + return err } // StartRewrite prepares rewrite procedure func (persister *Persister) StartRewrite() (*RewriteCtx, error) { - persister.pausingAof.Lock() // pausing aof + // pausing aof + persister.pausingAof.Lock() defer persister.pausingAof.Unlock() err := persister.aofFile.Sync() @@ -93,7 +71,7 @@ func (persister *Persister) StartRewrite() (*RewriteCtx, error) { filesize := fileInfo.Size() // create tmp file - file, err := ioutil.TempFile("", "*.aof") + file, err := os.CreateTemp(config.GetTmpDir(), "*.aof") if err != nil { logger.Warn("tmp file create failed") return nil, err @@ -109,42 +87,50 @@ func (persister *Persister) StartRewrite() (*RewriteCtx, error) { func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { persister.pausingAof.Lock() // pausing aof defer persister.pausingAof.Unlock() - tmpFile := ctx.tmpFile - // write commands executed during rewriting to tmp file - src, err := os.Open(persister.aofFilename) - if err != nil { - logger.Error("open aofFilename failed: " + err.Error()) - return - } - defer func() { - _ = src.Close() + + // copy commands executed during rewriting to tmpFile + errOccurs := func() bool { + /* read write commands executed during rewriting */ + src, err := os.Open(persister.aofFilename) + if err != nil { + logger.Error("open aofFilename failed: " + err.Error()) + return true + } + defer func() { + _ = src.Close() + _ = tmpFile.Close() + }() + + _, err = src.Seek(ctx.fileSize, 0) + if err != nil { + logger.Error("seek failed: " + err.Error()) + return true + } + // sync tmpFile's db index with online aofFile + data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes() + _, err = tmpFile.Write(data) + if err != nil { + logger.Error("tmp file rewrite failed: " + err.Error()) + return true + } + // copy data + _, err = io.Copy(tmpFile, src) + if err != nil { + logger.Error("copy aof filed failed: " + err.Error()) + return true + } + return false }() - _, err = src.Seek(ctx.fileSize, 0) - if err != nil { - logger.Error("seek failed: " + err.Error()) + if errOccurs { return } - // sync tmpFile's db index with online aofFile - data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes() - _, err = tmpFile.Write(data) - if err != nil { - logger.Error("tmp file rewrite failed: " + err.Error()) - return - } - // copy data - _, err = io.Copy(tmpFile, src) - if err != nil { - logger.Error("copy aof filed failed: " + err.Error()) - return - } - tmpFileName := tmpFile.Name() - _ = tmpFile.Close() // replace current aof file by tmp file _ = persister.aofFile.Close() - _ = os.Rename(tmpFileName, persister.aofFilename) - + if err := os.Rename(tmpFile.Name(), persister.aofFilename); err != nil { + logger.Warn(err) + } // reopen aof file for further write aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { @@ -152,8 +138,9 @@ func (persister *Persister) FinishRewrite(ctx *RewriteCtx) { } persister.aofFile = aofFile - // write select command again to ensure aof file has the same db index with persister.currentDB - data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes() + // write select command again to resume aof file selected db + // it should have the same db index with persister.currentDB + data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes() _, err = persister.aofFile.Write(data) if err != nil { panic(err) diff --git a/cluster/cluster.go b/cluster/cluster.go index 1594d66..9d80038 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,6 +3,11 @@ package cluster import ( "fmt" + "runtime/debug" + "strings" + + "github.com/hdt3213/rdb/core" + "github.com/hdt3213/godis/config" database2 "github.com/hdt3213/godis/database" "github.com/hdt3213/godis/datastruct/dict" @@ -15,8 +20,6 @@ import ( "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/client" "github.com/hdt3213/godis/redis/protocol" - "runtime/debug" - "strings" ) type PeerPicker interface { @@ -51,8 +54,7 @@ var allowFastTransaction = true // MakeCluster creates and starts a node of cluster func MakeCluster() *Cluster { cluster := &Cluster{ - self: config.Properties.Self, - + self: config.Properties.Self, db: database2.NewStandaloneServer(), transactions: dict.MakeSimple(), peerPicker: consistenthash.New(replicas, nil), @@ -61,6 +63,7 @@ func MakeCluster() *Cluster { idGenerator: idgenerator.MakeGenerator(config.Properties.Self), relayImpl: defaultRelayImpl, } + contains := make(map[string]struct{}) nodes := make([]string, 0, len(config.Properties.Peers)+1) for _, peer := range config.Properties.Peers { @@ -178,3 +181,7 @@ func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis func (cluster *Cluster) AfterClientClose(c redis.Connection) { cluster.db.AfterClientClose(c) } + +func (cluster *Cluster) LoadRDB(dec *core.Decoder) error { + return cluster.db.LoadRDB(dec) +} diff --git a/config/config.go b/config/config.go index ba323ca..b6dcc3e 100644 --- a/config/config.go +++ b/config/config.go @@ -2,7 +2,6 @@ package config import ( "bufio" - "github.com/hdt3213/godis/lib/utils" "io" "os" "path/filepath" @@ -11,6 +10,8 @@ import ( "strings" "time" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/lib/logger" ) @@ -25,9 +26,11 @@ type ServerProperties struct { RunID string `cfg:"runid"` // runID always different at every exec. Bind string `cfg:"bind"` Port int `cfg:"port"` + Dir string `cfg:"dir"` AppendOnly bool `cfg:"appendonly"` AppendFilename string `cfg:"appendfilename"` AppendFsync string `cfg:"appendfsync"` + AofUseRdbPreamble bool `cfg:"aof-use-rdb-preamble"` MaxClients int `cfg:"maxclients"` RequirePass string `cfg:"requirepass"` Databases int `cfg:"databases"` @@ -52,7 +55,6 @@ type ServerInfo struct { // Properties holds global config properties var Properties *ServerProperties - var EachTimeServerInfo *ServerInfo func init() { @@ -142,4 +144,11 @@ func SetupConfig(configFilename string) { return } Properties.CfPath = configFilePath + if Properties.Dir == "" { + Properties.Dir = "." + } +} + +func GetTmpDir() string { + return Properties.Dir + "/tmp" } diff --git a/database/aof_test.go b/database/aof_test.go index f187cd3..ace73cc 100644 --- a/database/aof_test.go +++ b/database/aof_test.go @@ -1,6 +1,15 @@ package database import ( + "io/ioutil" + "os" + "path" + "strconv" + "testing" + "time" + + "github.com/hdt3213/godis/aof" + "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/interface/redis" @@ -8,12 +17,6 @@ import ( "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol/asserts" - "io/ioutil" - "os" - "path" - "strconv" - "testing" - "time" ) func makeTestData(db database.DB, dbIndex int, prefix string, size int) { @@ -105,13 +108,16 @@ func TestAof(t *testing.T) { _ = os.Remove(aofFilename) }() config.Properties = &config.ServerProperties{ - AppendOnly: true, - AppendFilename: aofFilename, + AppendOnly: true, + AppendFilename: aofFilename, + AofUseRdbPreamble: false, + AppendFsync: aof.FsyncEverySec, } dbNum := 4 size := 10 var prefixes []string aofWriteDB := NewStandaloneServer() + // generate test data for i := 0; i < dbNum; i++ { prefix := utils.RandString(8) prefixes = append(prefixes, prefix) @@ -165,7 +171,7 @@ func TestRDB(t *testing.T) { } func TestRewriteAOF(t *testing.T) { - tmpFile, err := ioutil.TempFile("", "*.aof") + tmpFile, err := os.CreateTemp(config.GetTmpDir(), "*.aof") if err != nil { t.Error(err) return @@ -175,8 +181,10 @@ func TestRewriteAOF(t *testing.T) { _ = os.Remove(aofFilename) }() config.Properties = &config.ServerProperties{ - AppendOnly: true, - AppendFilename: aofFilename, + AppendOnly: true, + AppendFilename: aofFilename, + AofUseRdbPreamble: false, + AppendFsync: aof.FsyncEverySec, } aofWriteDB := NewStandaloneServer() size := 1 @@ -201,53 +209,79 @@ func TestRewriteAOF(t *testing.T) { // TestRewriteAOF2 tests execute commands during rewrite procedure func TestRewriteAOF2(t *testing.T) { - tmpFile, err := ioutil.TempFile("", "*.aof") + /* prepare */ + tmpFile, err := os.CreateTemp(config.GetTmpDir(), "*.aof") if err != nil { t.Error(err) return } aofFilename := tmpFile.Name() - defer func() { - _ = os.Remove(aofFilename) - }() config.Properties = &config.ServerProperties{ AppendOnly: true, AppendFilename: aofFilename, + // set Aof-use-rdb-preamble to true to make sure rewrite procedure + AppendFsync: aof.FsyncAlways, + AofUseRdbPreamble: true, } + + keySize1 := 100 + keySize2 := 250 + /* write data */ aofWriteDB := NewStandaloneServer() dbNum := 4 conn := connection.NewFakeConn() for i := 0; i < dbNum; i++ { conn.SelectDB(i) - key := strconv.Itoa(i) - aofWriteDB.Exec(conn, utils.ToCmdLine("SET", key, key)) + for j := 0; j < keySize1; j++ { + key := strconv.Itoa(j) + aofWriteDB.Exec(conn, utils.ToCmdLine("SET", key, key)) + } } + /* rewrite */ ctx, err := aofWriteDB.persister.StartRewrite() if err != nil { - t.Error(err) + t.Error(err, "start rewrite failed") return } - // add data during rewrite - for i := 0; i < dbNum; i++ { - conn.SelectDB(i) - key := "a" + strconv.Itoa(i) - aofWriteDB.Exec(conn, utils.ToCmdLine("SET", key, key)) + + /* add data during rewrite */ + ch := make(chan struct{}) + go func() { + for i := 0; i < dbNum; i++ { + conn.SelectDB(i) + for j := 0; j < keySize2; j++ { + key := "a" + strconv.Itoa(j) + aofWriteDB.Exec(conn, utils.ToCmdLine("SET", key, key)) + } + } + ch <- struct{}{} + }() + + doRewriteErr := aofWriteDB.persister.DoRewrite(ctx) + if doRewriteErr != nil { + t.Error(doRewriteErr, "do rewrite failed") + return } - aofWriteDB.persister.DoRewrite(ctx) aofWriteDB.persister.FinishRewrite(ctx) + <-ch + aofWriteDB.Close() // wait for aof finished - aofWriteDB.Close() // wait for aof finished - aofReadDB := NewStandaloneServer() // start new db and read aof file + // start new db and read aof file + aofReadDB := NewStandaloneServer() for i := 0; i < dbNum; i++ { conn.SelectDB(i) - key := strconv.Itoa(i) - ret := aofReadDB.Exec(conn, utils.ToCmdLine("GET", key)) - asserts.AssertBulkReply(t, ret, key) - key = "a" + strconv.Itoa(i) - ret = aofReadDB.Exec(conn, utils.ToCmdLine("GET", key)) - asserts.AssertBulkReply(t, ret, key) + for j := 0; j < keySize1; j++ { + key := strconv.Itoa(j) + ret := aofReadDB.Exec(conn, utils.ToCmdLine("GET", key)) + asserts.AssertBulkReply(t, ret, key) + } + for j := 0; j < keySize2; j++ { + key := "a" + strconv.Itoa(j) + ret := aofReadDB.Exec(conn, utils.ToCmdLine("GET", key)) + asserts.AssertBulkReply(t, ret, key) + } } aofReadDB.Close() } diff --git a/database/database.go b/database/database.go index 509039c..7f13cd2 100644 --- a/database/database.go +++ b/database/database.go @@ -2,6 +2,9 @@ package database import ( + "strings" + "time" + "github.com/hdt3213/godis/datastruct/dict" "github.com/hdt3213/godis/datastruct/lock" "github.com/hdt3213/godis/interface/database" @@ -9,8 +12,6 @@ import ( "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/timewheel" "github.com/hdt3213/godis/redis/protocol" - "strings" - "time" ) const ( @@ -32,6 +33,7 @@ type DB struct { // dict.Dict will ensure concurrent-safety of its method // use this mutex for complicated command only, eg. rpush, incr ... locker *lock.Locks + // addaof is used to add command to aof addAof func(CmdLine) } @@ -297,6 +299,7 @@ func (db *DB) ForEach(cb func(key string, data *database.DataEntity, expiration expireTime, _ := rawExpireTime.(time.Time) expiration = &expireTime } + return cb(key, entity, expiration) }) } diff --git a/database/persistence.go b/database/persistence.go index ed95714..6730c5f 100644 --- a/database/persistence.go +++ b/database/persistence.go @@ -2,6 +2,9 @@ package database import ( "fmt" + "os" + "sync/atomic" + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/datastruct/dict" @@ -10,10 +13,9 @@ import ( "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/rdb/core" rdb "github.com/hdt3213/rdb/parser" - "os" - "sync/atomic" ) +// loadRdbFile loads rdb file from disk func (server *Server) loadRdbFile() error { rdbFile, err := os.Open(config.Properties.RDBFilename) if err != nil { @@ -23,14 +25,15 @@ func (server *Server) loadRdbFile() error { _ = rdbFile.Close() }() decoder := rdb.NewDecoder(rdbFile) - err = server.loadRDB(decoder) + err = server.LoadRDB(decoder) if err != nil { return fmt.Errorf("dump rdb file failed " + err.Error()) } return nil } -func (server *Server) loadRDB(dec *core.Decoder) error { +// LoadRDB real implementation of loading rdb file +func (server *Server) LoadRDB(dec *core.Decoder) error { return dec.Parse(func(o rdb.RedisObject) bool { db := server.mustSelectDB(o.GetDBIndex()) var entity *database.DataEntity @@ -73,10 +76,12 @@ func (server *Server) loadRDB(dec *core.Decoder) error { if o.GetExpiration() != nil { db.Expire(o.GetKey(), *o.GetExpiration()) } + // add to aof db.addAof(aof.EntityToCmd(o.GetKey(), entity).Args) } return true }) + } func NewPersister(db database.DBEngine, filename string, load bool, fsync string) (*aof.Persister, error) { diff --git a/database/replication_master.go b/database/replication_master.go index 177ecce..c804115 100644 --- a/database/replication_master.go +++ b/database/replication_master.go @@ -3,11 +3,6 @@ package database import ( "errors" "fmt" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/sync/atomic" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/protocol" "io" "io/ioutil" "os" @@ -15,6 +10,12 @@ import ( "strings" "sync" "time" + + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/sync/atomic" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/protocol" ) const ( @@ -85,6 +86,7 @@ type masterStatus struct { rewriting atomic.Boolean } +// bgSaveForReplication does bg-save and send rdb to waiting slaves func (server *Server) bgSaveForReplication() { go func() { defer func() { @@ -116,7 +118,7 @@ func (server *Server) saveForReplication() error { server.masterStatus.aofListener = aofListener server.masterStatus.mu.Unlock() - err = server.persister.Rewrite2RDBForReplication(rdbFilename, aofListener, nil) + err = server.persister.GenerateRDBForReplication(rdbFilename, aofListener, nil) if err != nil { return err } @@ -132,6 +134,7 @@ func (server *Server) saveForReplication() error { server.masterStatus.waitSlaves = nil server.masterStatus.mu.Unlock() + // send rdb to waiting slaves for slave := range waitSlaves { err = server.masterFullReSyncWithSlave(slave) if err != nil { @@ -161,7 +164,7 @@ func (server *Server) rewriteRDB() error { defer server.masterStatus.mu.Unlock() newBacklog.beginOffset = server.masterStatus.backlog.currentOffset } - err = server.persister.Rewrite2RDBForReplication(rdbFilename, aofListener, hook) + err = server.persister.GenerateRDBForReplication(rdbFilename, aofListener, hook) if err != nil { // wait rdb result return err } diff --git a/database/replication_master_test.go b/database/replication_master_test.go index b5574d6..e8436ef 100644 --- a/database/replication_master_test.go +++ b/database/replication_master_test.go @@ -2,13 +2,6 @@ package database import ( "bytes" - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" - "github.com/hdt3213/godis/redis/protocol/asserts" - rdb "github.com/hdt3213/rdb/parser" "io/ioutil" "os" "path" @@ -17,6 +10,14 @@ import ( "sync/atomic" "testing" "time" + + "github.com/hdt3213/godis/config" + rdb "github.com/hdt3213/rdb/parser" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/parser" + "github.com/hdt3213/godis/redis/protocol" + "github.com/hdt3213/godis/redis/protocol/asserts" ) func mockServer() *Server { @@ -103,7 +104,7 @@ func TestReplicationMasterSide(t *testing.T) { } rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg)) - err = slave.loadRDB(rdbDec) + err = slave.LoadRDB(rdbDec) if err != nil { t.Error("import rdb failed: " + err.Error()) return @@ -276,7 +277,7 @@ func TestReplicationMasterRewriteRDB(t *testing.T) { } rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg)) - err = slave.loadRDB(rdbDec) + err = slave.LoadRDB(rdbDec) if err != nil { t.Error("import rdb failed: " + err.Error()) return diff --git a/database/replication_slave.go b/database/replication_slave.go index 5a3ffb3..d5bf735 100644 --- a/database/replication_slave.go +++ b/database/replication_slave.go @@ -5,15 +5,6 @@ import ( "context" "errors" "fmt" - "github.com/hdt3213/godis/aof" - "github.com/hdt3213/godis/config" - "github.com/hdt3213/godis/interface/redis" - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/utils" - "github.com/hdt3213/godis/redis/connection" - "github.com/hdt3213/godis/redis/parser" - "github.com/hdt3213/godis/redis/protocol" - rdb "github.com/hdt3213/rdb/parser" "io/ioutil" "net" "os" @@ -22,6 +13,16 @@ import ( "sync" "sync/atomic" "time" + + "github.com/hdt3213/godis/aof" + "github.com/hdt3213/godis/config" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/logger" + rdb "github.com/hdt3213/rdb/parser" + "github.com/hdt3213/godis/lib/utils" + "github.com/hdt3213/godis/redis/connection" + "github.com/hdt3213/godis/redis/parser" + "github.com/hdt3213/godis/redis/protocol" ) const ( @@ -114,6 +115,7 @@ func (repl *slaveStatus) close() error { return nil } +// setupMaster connects to master and starts full sync func (server *Server) setupMaster() { defer func() { if err := recover(); err != nil { @@ -342,7 +344,7 @@ func (server *Server) loadMasterRDB(configVersion int32) error { if err != nil { return err } - err = rdbLoader.loadRDB(rdbDec) + err = rdbLoader.LoadRDB(rdbDec) if err != nil { return errors.New("dump rdb failed: " + err.Error()) } diff --git a/database/server.go b/database/server.go index 6289ea3..61a0512 100644 --- a/database/server.go +++ b/database/server.go @@ -2,6 +2,12 @@ package database import ( "fmt" + "runtime/debug" + "strconv" + "strings" + "sync/atomic" + "time" + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/interface/database" @@ -10,11 +16,6 @@ import ( "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/pubsub" "github.com/hdt3213/godis/redis/protocol" - "runtime/debug" - "strconv" - "strings" - "sync/atomic" - "time" ) var godisVersion = "1.2.8" // do not modify @@ -49,6 +50,7 @@ func NewStandaloneServer() *Server { server.dbSet[i] = holder } server.hub = pubsub.MakeHub() + // record aof validAof := false if config.Properties.AppendOnly { aofHandler, err := NewPersister(server, @@ -218,6 +220,7 @@ func (server *Server) execFlushDB(dbIndex int) redis.Reply { return server.flushDB(dbIndex) } +// flushDB flushes the selected database func (server *Server) flushDB(dbIndex int) redis.Reply { if dbIndex >= len(server.dbSet) || dbIndex < 0 { return protocol.MakeErrReply("ERR DB index is out of range") @@ -238,6 +241,7 @@ func (server *Server) loadDB(dbIndex int, newDB *DB) redis.Reply { return &protocol.OkReply{} } +// flushAll flushes all databases. func (server *Server) flushAll() redis.Reply { for i := range server.dbSet { server.flushDB(i) @@ -248,6 +252,7 @@ func (server *Server) flushAll() redis.Reply { return &protocol.OkReply{} } +// selectDB returns the database with the given index, or an error if the index is out of range. func (server *Server) selectDB(dbIndex int) (*DB, *protocol.StandardErrReply) { if dbIndex >= len(server.dbSet) || dbIndex < 0 { return nil, protocol.MakeErrReply("ERR DB index is out of range") @@ -255,6 +260,7 @@ func (server *Server) selectDB(dbIndex int) (*DB, *protocol.StandardErrReply) { return server.dbSet[dbIndex].Load().(*DB), nil } +// mustSelectDB is like selectDB, but panics if an error occurs. func (server *Server) mustSelectDB(dbIndex int) *DB { selectedDB, err := server.selectDB(dbIndex) if err != nil { @@ -325,7 +331,7 @@ func SaveRDB(db *Server, args [][]byte) redis.Reply { if rdbFilename == "" { rdbFilename = "dump.rdb" } - err := db.persister.Rewrite2RDB(rdbFilename) + err := db.persister.GenerateRDB(rdbFilename) if err != nil { return protocol.MakeErrReply(err.Error()) } @@ -347,7 +353,7 @@ func BGSaveRDB(db *Server, args [][]byte) redis.Reply { if rdbFilename == "" { rdbFilename = "dump.rdb" } - err := db.persister.Rewrite2RDB(rdbFilename) + err := db.persister.GenerateRDB(rdbFilename) if err != nil { logger.Error(err) } diff --git a/database/string.go b/database/string.go index e69a7de..e65b46a 100644 --- a/database/string.go +++ b/database/string.go @@ -53,7 +53,6 @@ func execGetEX(db *DB, args [][]byte) redis.Reply { key := string(args[0]) bytes, err := db.getAsString(key) ttl := unlimitedTTL - if err != nil { return err } diff --git a/go.mod b/go.mod index fee6653..9d8a086 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/hdt3213/godis go 1.17 require ( - github.com/hdt3213/rdb v1.0.5 + github.com/hdt3213/rdb v1.0.9 github.com/shopspring/decimal v1.2.0 ) diff --git a/go.sum b/go.sum index 7ce635b..69f4431 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,26 @@ -github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= +github.com/bytedance/sonic v1.8.7/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= +github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= -github.com/hdt3213/rdb v1.0.5 h1:toBvrixNWOlK26bHR1Amch/9+ioguL2jJT+uaMPYtJc= -github.com/hdt3213/rdb v1.0.5/go.mod h1:dLJXf6wM7ZExH+PuEzbzUubTtkH61ilfAtPSSQgfs4w= +github.com/hdt3213/rdb v1.0.9 h1:x9uiLpgpLSgyKWo8WwYSc5hMg0vglo+u5i5dTnJW33Y= +github.com/hdt3213/rdb v1.0.9/go.mod h1:A1RWBSb4QGdX8fNs2bSoWxkzcWlWGbCC7OgOTFhPG+k= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/interface/database/db.go b/interface/database/db.go index 52859a6..6de606b 100644 --- a/interface/database/db.go +++ b/interface/database/db.go @@ -1,8 +1,10 @@ package database import ( - "github.com/hdt3213/godis/interface/redis" "time" + + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/rdb/core" ) // CmdLine is alias for [][]byte, represents a command line @@ -13,6 +15,7 @@ type DB interface { Exec(client redis.Connection, cmdLine [][]byte) redis.Reply AfterClientClose(c redis.Connection) Close() + LoadRDB(dec *core.Decoder) error } // DBEngine is the embedding storage engine exposing more methods for complex application diff --git a/main.go b/main.go index d19f9fa..c0b6b5a 100644 --- a/main.go +++ b/main.go @@ -2,12 +2,13 @@ package main import ( "fmt" + "os" + "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/utils" RedisServer "github.com/hdt3213/godis/redis/server" "github.com/hdt3213/godis/tcp" - "os" ) var banner = ` @@ -50,7 +51,6 @@ func main() { } else { config.SetupConfig(configFilename) } - err := tcp.ListenAndServeWithSignal(&tcp.Config{ Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), }, RedisServer.MakeHandler()) diff --git a/redis.conf b/redis.conf index c15e7d4..6e299ba 100644 --- a/redis.conf +++ b/redis.conf @@ -4,4 +4,7 @@ maxclients 128 appendonly yes appendfilename appendonly.aof +appendfsync everysec +aof-use-rdb-preamble yes + #dbfilename test.rdb diff --git a/redis/server/server.go b/redis/server/server.go index 3c81dbc..b00646c 100644 --- a/redis/server/server.go +++ b/redis/server/server.go @@ -6,6 +6,11 @@ package server import ( "context" + "io" + "net" + "strings" + "sync" + "github.com/hdt3213/godis/cluster" "github.com/hdt3213/godis/config" database2 "github.com/hdt3213/godis/database" @@ -15,10 +20,6 @@ import ( "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/parser" "github.com/hdt3213/godis/redis/protocol" - "io" - "net" - "strings" - "sync" ) var (