mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-05 16:57:06 +08:00
load RDB file
This commit is contained in:
@@ -19,6 +19,7 @@ type ServerProperties struct {
|
|||||||
MaxClients int `cfg:"maxclients"`
|
MaxClients int `cfg:"maxclients"`
|
||||||
RequirePass string `cfg:"requirepass"`
|
RequirePass string `cfg:"requirepass"`
|
||||||
Databases int `cfg:"databases"`
|
Databases int `cfg:"databases"`
|
||||||
|
RDBFilename string `cfg:"dbfilename"`
|
||||||
|
|
||||||
Peers []string `cfg:"peers"`
|
Peers []string `cfg:"peers"`
|
||||||
Self string `cfg:"self"`
|
Self string `cfg:"self"`
|
||||||
|
@@ -39,6 +39,7 @@ func NewStandaloneServer() *MultiDB {
|
|||||||
mdb.dbSet[i] = singleDB
|
mdb.dbSet[i] = singleDB
|
||||||
}
|
}
|
||||||
mdb.hub = pubsub.MakeHub()
|
mdb.hub = pubsub.MakeHub()
|
||||||
|
validAof := false
|
||||||
if config.Properties.AppendOnly {
|
if config.Properties.AppendOnly {
|
||||||
aofHandler, err := aof.NewAOFHandler(mdb, func() database.EmbedDB {
|
aofHandler, err := aof.NewAOFHandler(mdb, func() database.EmbedDB {
|
||||||
return MakeBasicMultiDB()
|
return MakeBasicMultiDB()
|
||||||
@@ -54,6 +55,11 @@ func NewStandaloneServer() *MultiDB {
|
|||||||
mdb.aofHandler.AddAof(singleDB.index, line)
|
mdb.aofHandler.AddAof(singleDB.index, line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
validAof = true
|
||||||
|
}
|
||||||
|
if config.Properties.RDBFilename != "" && !validAof {
|
||||||
|
// load rdb
|
||||||
|
loadRdb(mdb)
|
||||||
}
|
}
|
||||||
return mdb
|
return mdb
|
||||||
}
|
}
|
||||||
@@ -158,13 +164,16 @@ func (mdb *MultiDB) flushAll() redis.Reply {
|
|||||||
return &protocol.OkReply{}
|
return &protocol.OkReply{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mdb *MultiDB) selectDB(dbIndex int) *DB {
|
||||||
|
if dbIndex >= len(mdb.dbSet) {
|
||||||
|
panic("ERR DB index is out of range")
|
||||||
|
}
|
||||||
|
return mdb.dbSet[dbIndex]
|
||||||
|
}
|
||||||
|
|
||||||
// ForEach traverses all the keys in the given database
|
// ForEach traverses all the keys in the given database
|
||||||
func (mdb *MultiDB) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
|
func (mdb *MultiDB) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) {
|
||||||
if dbIndex >= len(mdb.dbSet) {
|
mdb.selectDB(dbIndex).ForEach(cb)
|
||||||
return
|
|
||||||
}
|
|
||||||
db := mdb.dbSet[dbIndex]
|
|
||||||
db.ForEach(cb)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecMulti executes multi commands transaction Atomically and Isolated
|
// ExecMulti executes multi commands transaction Atomically and Isolated
|
||||||
@@ -178,29 +187,17 @@ func (mdb *MultiDB) ExecMulti(conn redis.Connection, watching map[string]uint32,
|
|||||||
|
|
||||||
// RWLocks lock keys for writing and reading
|
// RWLocks lock keys for writing and reading
|
||||||
func (mdb *MultiDB) RWLocks(dbIndex int, writeKeys []string, readKeys []string) {
|
func (mdb *MultiDB) RWLocks(dbIndex int, writeKeys []string, readKeys []string) {
|
||||||
if dbIndex >= len(mdb.dbSet) {
|
mdb.selectDB(dbIndex).RWLocks(writeKeys, readKeys)
|
||||||
panic("ERR DB index is out of range")
|
|
||||||
}
|
|
||||||
db := mdb.dbSet[dbIndex]
|
|
||||||
db.RWLocks(writeKeys, readKeys)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RWUnLocks unlock keys for writing and reading
|
// RWUnLocks unlock keys for writing and reading
|
||||||
func (mdb *MultiDB) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) {
|
func (mdb *MultiDB) RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) {
|
||||||
if dbIndex >= len(mdb.dbSet) {
|
mdb.selectDB(dbIndex).RWUnLocks(writeKeys, readKeys)
|
||||||
panic("ERR DB index is out of range")
|
|
||||||
}
|
|
||||||
db := mdb.dbSet[dbIndex]
|
|
||||||
db.RWUnLocks(writeKeys, readKeys)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetUndoLogs return rollback commands
|
// GetUndoLogs return rollback commands
|
||||||
func (mdb *MultiDB) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine {
|
func (mdb *MultiDB) GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine {
|
||||||
if dbIndex >= len(mdb.dbSet) {
|
return mdb.selectDB(dbIndex).GetUndoLogs(cmdLine)
|
||||||
panic("ERR DB index is out of range")
|
|
||||||
}
|
|
||||||
db := mdb.dbSet[dbIndex]
|
|
||||||
return db.GetUndoLogs(cmdLine)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecWithLock executes normal commands, invoker should provide locks
|
// ExecWithLock executes normal commands, invoker should provide locks
|
||||||
|
65
database/rdb.go
Normal file
65
database/rdb.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/config"
|
||||||
|
"github.com/hdt3213/godis/datastruct/dict"
|
||||||
|
List "github.com/hdt3213/godis/datastruct/list"
|
||||||
|
SortedSet "github.com/hdt3213/godis/datastruct/sortedset"
|
||||||
|
"github.com/hdt3213/godis/interface/database"
|
||||||
|
"github.com/hdt3213/godis/lib/logger"
|
||||||
|
rdb "github.com/hdt3213/rdb/parser"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func loadRdb(mdb *MultiDB) {
|
||||||
|
rdbFile, err := os.Open(config.Properties.RDBFilename)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("open rdb file failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
_ = rdbFile.Close()
|
||||||
|
}()
|
||||||
|
decoder := rdb.NewDecoder(rdbFile)
|
||||||
|
err = decoder.Parse(func(o rdb.RedisObject) bool {
|
||||||
|
db := mdb.selectDB(o.GetDBIndex())
|
||||||
|
switch o.GetType() {
|
||||||
|
case rdb.StringType:
|
||||||
|
str := o.(*rdb.StringObject)
|
||||||
|
db.PutEntity(o.GetKey(), &database.DataEntity{
|
||||||
|
Data: str.Value,
|
||||||
|
})
|
||||||
|
case rdb.ListType:
|
||||||
|
listObj := o.(*rdb.ListObject)
|
||||||
|
list := &List.LinkedList{}
|
||||||
|
for _, v := range listObj.Values {
|
||||||
|
list.Add(v)
|
||||||
|
}
|
||||||
|
db.PutEntity(o.GetKey(), &database.DataEntity{
|
||||||
|
Data: list,
|
||||||
|
})
|
||||||
|
case rdb.HashType:
|
||||||
|
hashObj := o.(*rdb.HashObject)
|
||||||
|
hash := dict.MakeSimple()
|
||||||
|
for k, v := range hashObj.Hash {
|
||||||
|
hash.Put(k, v)
|
||||||
|
}
|
||||||
|
db.PutEntity(o.GetKey(), &database.DataEntity{
|
||||||
|
Data: hash,
|
||||||
|
})
|
||||||
|
case rdb.ZSetType:
|
||||||
|
zsetObj := o.(*rdb.ZSetObject)
|
||||||
|
zSet := SortedSet.Make()
|
||||||
|
for _, e := range zsetObj.Entries {
|
||||||
|
zSet.Add(e.Member, e.Score)
|
||||||
|
}
|
||||||
|
db.PutEntity(o.GetKey(), &database.DataEntity{
|
||||||
|
Data: zSet,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if o.GetExpiration() != nil {
|
||||||
|
db.Expire(o.GetKey(), *o.GetExpiration())
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
40
database/rdb_test.go
Normal file
40
database/rdb_test.go
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hdt3213/godis/config"
|
||||||
|
"github.com/hdt3213/godis/lib/utils"
|
||||||
|
"github.com/hdt3213/godis/redis/connection"
|
||||||
|
"github.com/hdt3213/godis/redis/protocol/asserts"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLoadRDB(t *testing.T) {
|
||||||
|
_, b, _, _ := runtime.Caller(0)
|
||||||
|
projectRoot := filepath.Dir(filepath.Dir(b))
|
||||||
|
config.Properties = &config.ServerProperties{
|
||||||
|
AppendOnly: false,
|
||||||
|
RDBFilename: filepath.Join(projectRoot, "test.rdb"), // set working directory to project root
|
||||||
|
}
|
||||||
|
conn := &connection.FakeConn{}
|
||||||
|
rdbDB := NewStandaloneServer()
|
||||||
|
result := rdbDB.Exec(conn, utils.ToCmdLine("Get", "str"))
|
||||||
|
asserts.AssertBulkReply(t, result, "str")
|
||||||
|
result = rdbDB.Exec(conn, utils.ToCmdLine("TTL", "str"))
|
||||||
|
asserts.AssertIntReplyGreaterThan(t, result, 0)
|
||||||
|
result = rdbDB.Exec(conn, utils.ToCmdLine("LRange", "list", "0", "-1"))
|
||||||
|
asserts.AssertMultiBulkReply(t, result, []string{"1", "2", "3", "4"})
|
||||||
|
result = rdbDB.Exec(conn, utils.ToCmdLine("HGetAll", "hash"))
|
||||||
|
asserts.AssertMultiBulkReply(t, result, []string{"1", "1"})
|
||||||
|
result = rdbDB.Exec(conn, utils.ToCmdLine("ZRange", "zset", "0", "1", "WITHSCORES"))
|
||||||
|
asserts.AssertMultiBulkReply(t, result, []string{"1", "1"})
|
||||||
|
|
||||||
|
config.Properties = &config.ServerProperties{
|
||||||
|
AppendOnly: false,
|
||||||
|
RDBFilename: filepath.Join(projectRoot, "none", "test.rdb"), // set working directory to project root
|
||||||
|
}
|
||||||
|
rdbDB = NewStandaloneServer()
|
||||||
|
result = rdbDB.Exec(conn, utils.ToCmdLine("Get", "str"))
|
||||||
|
asserts.AssertNullBulk(t, result)
|
||||||
|
}
|
2
go.mod
2
go.mod
@@ -3,6 +3,8 @@ module github.com/hdt3213/godis
|
|||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/emirpasic/gods v1.16.0 // indirect
|
||||||
|
github.com/hdt3213/rdb v1.0.0 // indirect
|
||||||
github.com/jolestar/go-commons-pool/v2 v2.1.1
|
github.com/jolestar/go-commons-pool/v2 v2.1.1
|
||||||
github.com/shopspring/decimal v1.2.0
|
github.com/shopspring/decimal v1.2.0
|
||||||
)
|
)
|
||||||
|
5
go.sum
5
go.sum
@@ -1,8 +1,13 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
|
||||||
|
github.com/emirpasic/gods v1.16.0 h1:K8GFZcq7YD5BL7IuQULdIKMWxVmqiEBUBaN+v/Ku214=
|
||||||
|
github.com/emirpasic/gods v1.16.0/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
|
||||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||||
|
github.com/hdt3213/rdb v1.0.0 h1:rG8pRz6Y+2XtZw4C35rize3nXByClkFmwfM5ffj7sFs=
|
||||||
|
github.com/hdt3213/rdb v1.0.0/go.mod h1:m2CaP16oqYROIQMUUjB3WkqQWfDi/VebnHUDVRl4cIM=
|
||||||
github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA=
|
github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA=
|
||||||
github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg=
|
github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
@@ -2,5 +2,6 @@ bind 0.0.0.0
|
|||||||
port 6399
|
port 6399
|
||||||
maxclients 128
|
maxclients 128
|
||||||
|
|
||||||
appendonly yes
|
appendonly no
|
||||||
appendfilename appendonly.aof
|
appendfilename appendonly.aof
|
||||||
|
dbfilename test.rdb
|
||||||
|
Reference in New Issue
Block a user