re-generate aof file during load database from replication master; Allow master-slave role switching

This commit is contained in:
finley
2022-12-08 03:49:23 +08:00
parent a5908f5ea4
commit 0d96ad295d
11 changed files with 190 additions and 56 deletions

1
.gitignore vendored
View File

@@ -20,4 +20,5 @@ logs
# jetbrains
.idea
*.iml
redis2.conf

View File

@@ -1,7 +1,6 @@
package aof
import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
@@ -49,12 +48,14 @@ type Handler struct {
}
// NewAOFHandler creates a new aof.Handler
func NewAOFHandler(db database.EmbedDB, tmpDBMaker func() database.EmbedDB) (*Handler, error) {
func NewAOFHandler(db database.EmbedDB, filename string, load bool, tmpDBMaker func() database.EmbedDB) (*Handler, error) {
handler := &Handler{}
handler.aofFilename = config.Properties.AppendFilename
handler.aofFilename = filename
handler.db = db
handler.tmpDBMaker = tmpDBMaker
handler.LoadAof(0)
if load {
handler.LoadAof(0)
}
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
@@ -78,7 +79,7 @@ func (handler *Handler) RemoveListener(listener Listener) {
// AddAof send command to aof goroutine through channel
func (handler *Handler) AddAof(dbIndex int, cmdLine CmdLine) {
if config.Properties.AppendOnly && handler.aofChan != nil {
if handler.aofChan != nil {
handler.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
@@ -121,9 +122,10 @@ func (handler *Handler) handleAof() {
handler.aofFinished <- struct{}{}
}
// LoadAof read aof file
// LoadAof read aof file, can only be used before Handler.handleAof started
func (handler *Handler) LoadAof(maxBytes int) {
// delete aofChan to prevent write again
// handler.db.Exec may call handler.addAof
// delete aofChan to prevent loaded commands back into aofChan
aofChan := handler.aofChan
handler.aofChan = nil
defer func(aofChan chan *payload) {

View File

@@ -49,7 +49,11 @@ func NewStandaloneServer() *MultiDB {
mdb.hub = pubsub.MakeHub()
validAof := false
if config.Properties.AppendOnly {
mdb.initAof()
aofHandler, err := NewAofHandler(mdb, config.Properties.AppendFilename, true)
if err != nil {
panic(err)
}
mdb.bindAofHandler(aofHandler)
validAof = true
}
if config.Properties.RDBFilename != "" && !validAof {
@@ -57,24 +61,33 @@ func NewStandaloneServer() *MultiDB {
loadRdbFile(mdb)
}
mdb.slaveStatus = initReplSlaveStatus()
mdb.startAsMaster()
mdb.initMaster()
mdb.startReplCron()
mdb.role = masterRole // The initialization process does not require atomicity
return mdb
}
func (mdb *MultiDB) initAof() {
aofHandler, err := aof.NewAOFHandler(mdb, func() database.EmbedDB {
func NewAofHandler(db database.EmbedDB, filename string, load bool) (*aof.Handler, error) {
return aof.NewAOFHandler(db, filename, load, func() database.EmbedDB {
return MakeBasicMultiDB()
})
if err != nil {
panic(err)
}
func (mdb *MultiDB) AddAof(dbIndex int, cmdLine CmdLine) {
if mdb.aofHandler != nil {
mdb.aofHandler.AddAof(dbIndex, cmdLine)
}
}
func (mdb *MultiDB) bindAofHandler(aofHandler *aof.Handler) {
mdb.aofHandler = aofHandler
// bind AddAof
for _, db := range mdb.dbSet {
singleDB := db.Load().(*DB)
singleDB.addAof = func(line CmdLine) {
mdb.aofHandler.AddAof(singleDB.index, line)
if config.Properties.AppendOnly { // config may be changed during runtime
mdb.aofHandler.AddAof(singleDB.index, line)
}
}
}
}
@@ -198,6 +211,7 @@ func (mdb *MultiDB) Close() {
if mdb.aofHandler != nil {
mdb.aofHandler.Close()
}
mdb.stopMaster()
}
func execSelect(c redis.Connection, mdb *MultiDB, args [][]byte) redis.Reply {
@@ -356,16 +370,11 @@ func (mdb *MultiDB) GetDBSize(dbIndex int) (int, int) {
}
func (mdb *MultiDB) startReplCron() {
go func() {
defer func() {
if err := recover(); err != nil {
logger.Error("panic", err)
}
}()
go func(mdb *MultiDB) {
ticker := time.Tick(time.Second * 10)
for range ticker {
mdb.slaveCron()
mdb.masterCron()
}
}()
}(mdb)
}

View File

@@ -371,7 +371,7 @@ func execCopy(mdb *MultiDB, conn redis.Connection, args [][]byte) redis.Reply {
expire := raw.(time.Time)
destDB.Expire(destKey, expire)
}
mdb.aofHandler.AddAof(conn.GetDBIndex(), utils.ToCmdLine3("copy", args...))
mdb.AddAof(conn.GetDBIndex(), utils.ToCmdLine3("copy", args...))
return protocol.MakeIntReply(1)
}

View File

@@ -1,6 +1,7 @@
package database
import (
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/datastruct/dict"
List "github.com/hdt3213/godis/datastruct/list"
@@ -32,42 +33,47 @@ func loadRdbFile(mdb *MultiDB) {
func importRDB(dec *core.Decoder, mdb *MultiDB) error {
return dec.Parse(func(o rdb.RedisObject) bool {
db := mdb.mustSelectDB(o.GetDBIndex())
var entity *database.DataEntity
switch o.GetType() {
case rdb.StringType:
str := o.(*rdb.StringObject)
db.PutEntity(o.GetKey(), &database.DataEntity{
entity = &database.DataEntity{
Data: str.Value,
})
}
case rdb.ListType:
listObj := o.(*rdb.ListObject)
list := List.NewQuickList()
for _, v := range listObj.Values {
list.Add(v)
}
db.PutEntity(o.GetKey(), &database.DataEntity{
entity = &database.DataEntity{
Data: list,
})
}
case rdb.HashType:
hashObj := o.(*rdb.HashObject)
hash := dict.MakeSimple()
for k, v := range hashObj.Hash {
hash.Put(k, v)
}
db.PutEntity(o.GetKey(), &database.DataEntity{
entity = &database.DataEntity{
Data: hash,
})
}
case rdb.ZSetType:
zsetObj := o.(*rdb.ZSetObject)
zSet := SortedSet.Make()
for _, e := range zsetObj.Entries {
zSet.Add(e.Member, e.Score)
}
db.PutEntity(o.GetKey(), &database.DataEntity{
entity = &database.DataEntity{
Data: zSet,
})
}
}
if o.GetExpiration() != nil {
db.Expire(o.GetKey(), *o.GetExpiration())
if entity != nil {
db.PutEntity(o.GetKey(), entity)
if o.GetExpiration() != nil {
db.Expire(o.GetKey(), *o.GetExpiration())
}
db.addAof(aof.EntityToCmd(o.GetKey(), entity).Args)
}
return true
})

View File

@@ -168,6 +168,8 @@ func (mdb *MultiDB) rewriteRDB() error {
mdb.masterStatus.mu.Lock()
mdb.masterStatus.rdbFilename = rdbFilename
mdb.masterStatus.backlog = newBacklog
mdb.aofHandler.RemoveListener(mdb.masterStatus.aofListener)
mdb.masterStatus.aofListener = aofListener
mdb.masterStatus.mu.Unlock()
// It is ok to know that new backlog is ready later, so we change readyToSend without sync
// But setting readyToSend=true must after new backlog is really ready (that means master.mu.Unlock)
@@ -353,6 +355,7 @@ func (mdb *MultiDB) removeSlave(slave *slaveClient) {
delete(mdb.masterStatus.slaveMap, slave.conn)
delete(mdb.masterStatus.waitSlaves, slave)
delete(mdb.masterStatus.onlineSlaves, slave)
logger.Info("disconnect with slave " + slave.conn.Name())
}
func (mdb *MultiDB) setSlaveOnline(slave *slaveClient, currentOffset int64) {
@@ -414,7 +417,7 @@ func (listener *replAofListener) Callback(cmdLines []CmdLine) {
}
}
func (mdb *MultiDB) startAsMaster() {
func (mdb *MultiDB) initMaster() {
mdb.masterStatus = &masterStatus{
mu: sync.RWMutex{},
replId: utils.RandHexString(40),
@@ -426,3 +429,27 @@ func (mdb *MultiDB) startAsMaster() {
rdbFilename: "",
}
}
func (mdb *MultiDB) stopMaster() {
mdb.masterStatus.mu.Lock()
defer mdb.masterStatus.mu.Unlock()
// disconnect with slave
for _, slave := range mdb.masterStatus.slaveMap {
_ = slave.conn.Close()
delete(mdb.masterStatus.slaveMap, slave.conn)
delete(mdb.masterStatus.waitSlaves, slave)
delete(mdb.masterStatus.onlineSlaves, slave)
}
// clean master status
mdb.aofHandler.RemoveListener(mdb.masterStatus.aofListener)
_ = os.Remove(mdb.masterStatus.rdbFilename)
mdb.masterStatus.rdbFilename = ""
mdb.masterStatus.replId = ""
mdb.masterStatus.backlog = nil
mdb.masterStatus.slaveMap = make(map[redis.Connection]*slaveClient)
mdb.masterStatus.waitSlaves = make(map[*slaveClient]struct{})
mdb.masterStatus.onlineSlaves = make(map[*slaveClient]struct{})
mdb.masterStatus.bgSaveState = bgSaveIdle
}

View File

@@ -30,6 +30,7 @@ func mockServer() *MultiDB {
server.dbSet[i] = holder
}
server.slaveStatus = initReplSlaveStatus()
server.initMaster()
return server
}
@@ -49,8 +50,11 @@ func TestReplicationMasterSide(t *testing.T) {
AppendFilename: aofFilename,
}
master := mockServer()
master.initAof()
master.startAsMaster()
aofHandler, err := NewAofHandler(master, config.Properties.AppendFilename, true)
if err != nil {
panic(err)
}
master.bindAofHandler(aofHandler)
slave := mockServer()
replConn := connection.NewFakeConn()
@@ -209,8 +213,11 @@ func TestReplicationMasterRewriteRDB(t *testing.T) {
AppendFilename: aofFilename,
}
master := mockServer()
master.initAof()
master.startAsMaster()
aofHandler, err := NewAofHandler(master, config.Properties.AppendFilename, true)
if err != nil {
panic(err)
}
master.bindAofHandler(aofHandler)
masterConn := connection.NewFakeConn()
resp := master.Exec(masterConn, utils.ToCmdLine("SET", "a", "a"))

View File

@@ -13,7 +13,9 @@ import (
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
rdb "github.com/hdt3213/rdb/parser"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"sync"
@@ -83,6 +85,7 @@ func (mdb *MultiDB) slaveOfNone() {
mdb.slaveStatus.replId = ""
mdb.slaveStatus.replOffset = -1
mdb.slaveStatus.stopSlaveWithMutex()
mdb.role = masterRole
}
// stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF
@@ -302,6 +305,25 @@ func (mdb *MultiDB) psyncHandshake() (bool, error) {
return isFullReSync, nil
}
func makeRdbLoader(upgradeAof bool) (*MultiDB, string, error) {
rdbLoader := MakeBasicMultiDB()
if !upgradeAof {
return rdbLoader, "", nil
}
// make aof handler to generate new aof file during loading rdb
newAofFile, err := ioutil.TempFile("", "*.aof")
if err != nil {
return nil, "", fmt.Errorf("create temp rdb failed: %v", err)
}
newAofFilename := newAofFile.Name()
aofHandler, err := NewAofHandler(rdbLoader, newAofFilename, false)
if err != nil {
return nil, "", err
}
rdbLoader.bindAofHandler(aofHandler)
return rdbLoader, newAofFilename, nil
}
// loadMasterRDB downloads rdb after handshake has been done
func (mdb *MultiDB) loadMasterRDB(configVersion int32) error {
rdbPayload := <-mdb.slaveStatus.masterChan
@@ -315,8 +337,12 @@ func (mdb *MultiDB) loadMasterRDB(configVersion int32) error {
logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(rdbReply.Arg)))
rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg))
rdbHolder := MakeBasicMultiDB()
err := importRDB(rdbDec, rdbHolder)
rdbLoader, newAofFilename, err := makeRdbLoader(config.Properties.AppendOnly)
if err != nil {
return err
}
err = importRDB(rdbDec, rdbLoader)
if err != nil {
return errors.New("dump rdb failed: " + err.Error())
}
@@ -327,12 +353,25 @@ func (mdb *MultiDB) loadMasterRDB(configVersion int32) error {
// slaveStatus conf changed during connecting and waiting mutex
return configChangedErr
}
for i, h := range rdbHolder.dbSet {
for i, h := range rdbLoader.dbSet {
newDB := h.Load().(*DB)
mdb.loadDB(i, newDB)
}
// fixme: update aof file
if config.Properties.AppendOnly {
// use new aof file
mdb.aofHandler.Close()
err = os.Rename(newAofFilename, config.Properties.AppendFilename)
if err != nil {
return err
}
aofHandler, err := NewAofHandler(mdb, config.Properties.AppendFilename, false)
if err != nil {
return err
}
mdb.bindAofHandler(aofHandler)
}
return nil
}

View File

@@ -8,29 +8,54 @@ import (
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/protocol"
"github.com/hdt3213/godis/redis/protocol/asserts"
"io/ioutil"
"os"
"path"
"testing"
"time"
)
func TestReplicationSlaveSide(t *testing.T) {
mdb := mockServer()
tmpDir, err := ioutil.TempDir("", "godis")
if err != nil {
t.Error(err)
return
}
aofFilename := path.Join(tmpDir, "a.aof")
defer func() {
_ = os.Remove(aofFilename)
}()
config.Properties = &config.ServerProperties{
Databases: 16,
AppendOnly: true,
AppendFilename: aofFilename,
}
conn := connection.NewFakeConn()
server := mockServer()
masterCli, err := client.MakeClient("127.0.0.1:6379")
if err != nil {
t.Error(err)
return
}
aofHandler, err := NewAofHandler(server, config.Properties.AppendFilename, true)
if err != nil {
t.Error(err)
return
}
server.bindAofHandler(aofHandler)
server.Exec(conn, utils.ToCmdLine("set", "zz", "zz"))
masterCli.Start()
// sync with master
ret := masterCli.Send(utils.ToCmdLine("set", "1", "1"))
asserts.AssertStatusReply(t, ret, "OK")
conn := connection.NewFakeConn()
ret = mdb.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379"))
ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379"))
asserts.AssertStatusReply(t, ret, "OK")
success := false
for i := 0; i < 30; i++ {
// wait for sync
time.Sleep(time.Second)
ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1"))
ret = server.Exec(conn, utils.ToCmdLine("GET", "1"))
bulkRet, ok := ret.(*protocol.BulkReply)
if ok {
if bytes.Equal(bulkRet.Arg, []byte("1")) {
@@ -51,7 +76,7 @@ func TestReplicationSlaveSide(t *testing.T) {
for i := 0; i < 10; i++ {
// wait for sync
time.Sleep(time.Second)
ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1"))
ret = server.Exec(conn, utils.ToCmdLine("GET", "1"))
bulkRet, ok := ret.(*protocol.BulkReply)
if ok {
if bytes.Equal(bulkRet.Arg, []byte("2")) {
@@ -64,7 +89,7 @@ func TestReplicationSlaveSide(t *testing.T) {
t.Error("sync failed")
return
}
err = mdb.slaveStatus.sendAck2Master()
err = server.slaveStatus.sendAck2Master()
if err != nil {
t.Error(err)
return
@@ -73,9 +98,9 @@ func TestReplicationSlaveSide(t *testing.T) {
// test reconnect
config.Properties.ReplTimeout = 1
_ = mdb.slaveStatus.masterConn.Close()
mdb.slaveStatus.lastRecvTime = time.Now().Add(-time.Hour) // mock timeout
mdb.slaveCron()
_ = server.slaveStatus.masterConn.Close()
server.slaveStatus.lastRecvTime = time.Now().Add(-time.Hour) // mock timeout
server.slaveCron()
time.Sleep(3 * time.Second)
ret = masterCli.Send(utils.ToCmdLine("set", "1", "3"))
asserts.AssertStatusReply(t, ret, "OK")
@@ -83,7 +108,7 @@ func TestReplicationSlaveSide(t *testing.T) {
for i := 0; i < 10; i++ {
// wait for sync
time.Sleep(time.Second)
ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1"))
ret = server.Exec(conn, utils.ToCmdLine("GET", "1"))
bulkRet, ok := ret.(*protocol.BulkReply)
if ok {
if bytes.Equal(bulkRet.Arg, []byte("3")) {
@@ -98,19 +123,19 @@ func TestReplicationSlaveSide(t *testing.T) {
}
// test slave of no one
ret = mdb.Exec(conn, utils.ToCmdLine("SLAVEOF", "NO", "ONE"))
ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "NO", "ONE"))
asserts.AssertStatusReply(t, ret, "OK")
ret = masterCli.Send(utils.ToCmdLine("set", "1", "4"))
asserts.AssertStatusReply(t, ret, "OK")
ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1"))
ret = server.Exec(conn, utils.ToCmdLine("GET", "1"))
asserts.AssertBulkReply(t, ret, "3")
ret = mdb.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379"))
ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379"))
asserts.AssertStatusReply(t, ret, "OK")
success = false
for i := 0; i < 30; i++ {
// wait for sync
time.Sleep(time.Second)
ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1"))
ret = server.Exec(conn, utils.ToCmdLine("GET", "1"))
bulkRet, ok := ret.(*protocol.BulkReply)
if ok {
if bytes.Equal(bulkRet.Arg, []byte("4")) {
@@ -124,7 +149,16 @@ func TestReplicationSlaveSide(t *testing.T) {
return
}
err = mdb.slaveStatus.close()
// check slave aof file
aofLoader := MakeBasicMultiDB()
aofHandler2, err := NewAofHandler(aofLoader, config.Properties.AppendFilename, true)
aofLoader.bindAofHandler(aofHandler2)
ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "zz"))
asserts.AssertNullBulk(t, ret)
ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "1"))
asserts.AssertBulkReply(t, ret, "4")
err = server.slaveStatus.close()
if err != nil {
t.Error("cannot close")
}

View File

@@ -31,4 +31,6 @@ type Connection interface {
SetMaster()
IsMaster() bool
Name() string
}

View File

@@ -94,6 +94,13 @@ func (c *Connection) Write(b []byte) (int, error) {
return c.conn.Write(b)
}
func (c *Connection) Name() string {
if c.conn != nil {
return c.conn.RemoteAddr().String()
}
return ""
}
// Subscribe add current connection into subscribers of the given channel
func (c *Connection) Subscribe(channel string) {
c.mu.Lock()