diff --git a/.gitignore b/.gitignore index 0e04dbc..6bac98b 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,5 @@ logs # jetbrains .idea *.iml +redis2.conf diff --git a/aof/aof.go b/aof/aof.go index 7a3d33c..415cb01 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -1,7 +1,6 @@ package aof import ( - "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/utils" @@ -49,12 +48,14 @@ type Handler struct { } // NewAOFHandler creates a new aof.Handler -func NewAOFHandler(db database.EmbedDB, tmpDBMaker func() database.EmbedDB) (*Handler, error) { +func NewAOFHandler(db database.EmbedDB, filename string, load bool, tmpDBMaker func() database.EmbedDB) (*Handler, error) { handler := &Handler{} - handler.aofFilename = config.Properties.AppendFilename + handler.aofFilename = filename handler.db = db handler.tmpDBMaker = tmpDBMaker - handler.LoadAof(0) + if load { + handler.LoadAof(0) + } aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err @@ -78,7 +79,7 @@ func (handler *Handler) RemoveListener(listener Listener) { // AddAof send command to aof goroutine through channel func (handler *Handler) AddAof(dbIndex int, cmdLine CmdLine) { - if config.Properties.AppendOnly && handler.aofChan != nil { + if handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, @@ -121,9 +122,10 @@ func (handler *Handler) handleAof() { handler.aofFinished <- struct{}{} } -// LoadAof read aof file +// LoadAof read aof file, can only be used before Handler.handleAof started func (handler *Handler) LoadAof(maxBytes int) { - // delete aofChan to prevent write again + // handler.db.Exec may call handler.addAof + // delete aofChan to prevent loaded commands back into aofChan aofChan := handler.aofChan handler.aofChan = nil defer func(aofChan chan *payload) { diff --git a/database/database.go b/database/database.go index 24f7b2f..2f61c0f 100644 --- a/database/database.go +++ b/database/database.go @@ -49,7 +49,11 @@ func NewStandaloneServer() *MultiDB { mdb.hub = pubsub.MakeHub() validAof := false if config.Properties.AppendOnly { - mdb.initAof() + aofHandler, err := NewAofHandler(mdb, config.Properties.AppendFilename, true) + if err != nil { + panic(err) + } + mdb.bindAofHandler(aofHandler) validAof = true } if config.Properties.RDBFilename != "" && !validAof { @@ -57,24 +61,33 @@ func NewStandaloneServer() *MultiDB { loadRdbFile(mdb) } mdb.slaveStatus = initReplSlaveStatus() - mdb.startAsMaster() + mdb.initMaster() mdb.startReplCron() mdb.role = masterRole // The initialization process does not require atomicity return mdb } -func (mdb *MultiDB) initAof() { - aofHandler, err := aof.NewAOFHandler(mdb, func() database.EmbedDB { +func NewAofHandler(db database.EmbedDB, filename string, load bool) (*aof.Handler, error) { + return aof.NewAOFHandler(db, filename, load, func() database.EmbedDB { return MakeBasicMultiDB() }) - if err != nil { - panic(err) +} + +func (mdb *MultiDB) AddAof(dbIndex int, cmdLine CmdLine) { + if mdb.aofHandler != nil { + mdb.aofHandler.AddAof(dbIndex, cmdLine) } +} + +func (mdb *MultiDB) bindAofHandler(aofHandler *aof.Handler) { mdb.aofHandler = aofHandler + // bind AddAof for _, db := range mdb.dbSet { singleDB := db.Load().(*DB) singleDB.addAof = func(line CmdLine) { - mdb.aofHandler.AddAof(singleDB.index, line) + if config.Properties.AppendOnly { // config may be changed during runtime + mdb.aofHandler.AddAof(singleDB.index, line) + } } } } @@ -198,6 +211,7 @@ func (mdb *MultiDB) Close() { if mdb.aofHandler != nil { mdb.aofHandler.Close() } + mdb.stopMaster() } func execSelect(c redis.Connection, mdb *MultiDB, args [][]byte) redis.Reply { @@ -356,16 +370,11 @@ func (mdb *MultiDB) GetDBSize(dbIndex int) (int, int) { } func (mdb *MultiDB) startReplCron() { - go func() { - defer func() { - if err := recover(); err != nil { - logger.Error("panic", err) - } - }() + go func(mdb *MultiDB) { ticker := time.Tick(time.Second * 10) for range ticker { mdb.slaveCron() mdb.masterCron() } - }() + }(mdb) } diff --git a/database/keys.go b/database/keys.go index 8896ab7..9c7f419 100644 --- a/database/keys.go +++ b/database/keys.go @@ -371,7 +371,7 @@ func execCopy(mdb *MultiDB, conn redis.Connection, args [][]byte) redis.Reply { expire := raw.(time.Time) destDB.Expire(destKey, expire) } - mdb.aofHandler.AddAof(conn.GetDBIndex(), utils.ToCmdLine3("copy", args...)) + mdb.AddAof(conn.GetDBIndex(), utils.ToCmdLine3("copy", args...)) return protocol.MakeIntReply(1) } diff --git a/database/rdb.go b/database/rdb.go index 969e3ba..12f0ee0 100644 --- a/database/rdb.go +++ b/database/rdb.go @@ -1,6 +1,7 @@ package database import ( + "github.com/hdt3213/godis/aof" "github.com/hdt3213/godis/config" "github.com/hdt3213/godis/datastruct/dict" List "github.com/hdt3213/godis/datastruct/list" @@ -32,42 +33,47 @@ func loadRdbFile(mdb *MultiDB) { func importRDB(dec *core.Decoder, mdb *MultiDB) error { return dec.Parse(func(o rdb.RedisObject) bool { db := mdb.mustSelectDB(o.GetDBIndex()) + var entity *database.DataEntity switch o.GetType() { case rdb.StringType: str := o.(*rdb.StringObject) - db.PutEntity(o.GetKey(), &database.DataEntity{ + entity = &database.DataEntity{ Data: str.Value, - }) + } case rdb.ListType: listObj := o.(*rdb.ListObject) list := List.NewQuickList() for _, v := range listObj.Values { list.Add(v) } - db.PutEntity(o.GetKey(), &database.DataEntity{ + entity = &database.DataEntity{ Data: list, - }) + } case rdb.HashType: hashObj := o.(*rdb.HashObject) hash := dict.MakeSimple() for k, v := range hashObj.Hash { hash.Put(k, v) } - db.PutEntity(o.GetKey(), &database.DataEntity{ + entity = &database.DataEntity{ Data: hash, - }) + } case rdb.ZSetType: zsetObj := o.(*rdb.ZSetObject) zSet := SortedSet.Make() for _, e := range zsetObj.Entries { zSet.Add(e.Member, e.Score) } - db.PutEntity(o.GetKey(), &database.DataEntity{ + entity = &database.DataEntity{ Data: zSet, - }) + } } - if o.GetExpiration() != nil { - db.Expire(o.GetKey(), *o.GetExpiration()) + if entity != nil { + db.PutEntity(o.GetKey(), entity) + if o.GetExpiration() != nil { + db.Expire(o.GetKey(), *o.GetExpiration()) + } + db.addAof(aof.EntityToCmd(o.GetKey(), entity).Args) } return true }) diff --git a/database/replication_master.go b/database/replication_master.go index 06208fe..c4c8d21 100644 --- a/database/replication_master.go +++ b/database/replication_master.go @@ -168,6 +168,8 @@ func (mdb *MultiDB) rewriteRDB() error { mdb.masterStatus.mu.Lock() mdb.masterStatus.rdbFilename = rdbFilename mdb.masterStatus.backlog = newBacklog + mdb.aofHandler.RemoveListener(mdb.masterStatus.aofListener) + mdb.masterStatus.aofListener = aofListener mdb.masterStatus.mu.Unlock() // It is ok to know that new backlog is ready later, so we change readyToSend without sync // But setting readyToSend=true must after new backlog is really ready (that means master.mu.Unlock) @@ -353,6 +355,7 @@ func (mdb *MultiDB) removeSlave(slave *slaveClient) { delete(mdb.masterStatus.slaveMap, slave.conn) delete(mdb.masterStatus.waitSlaves, slave) delete(mdb.masterStatus.onlineSlaves, slave) + logger.Info("disconnect with slave " + slave.conn.Name()) } func (mdb *MultiDB) setSlaveOnline(slave *slaveClient, currentOffset int64) { @@ -414,7 +417,7 @@ func (listener *replAofListener) Callback(cmdLines []CmdLine) { } } -func (mdb *MultiDB) startAsMaster() { +func (mdb *MultiDB) initMaster() { mdb.masterStatus = &masterStatus{ mu: sync.RWMutex{}, replId: utils.RandHexString(40), @@ -426,3 +429,27 @@ func (mdb *MultiDB) startAsMaster() { rdbFilename: "", } } + +func (mdb *MultiDB) stopMaster() { + mdb.masterStatus.mu.Lock() + defer mdb.masterStatus.mu.Unlock() + + // disconnect with slave + for _, slave := range mdb.masterStatus.slaveMap { + _ = slave.conn.Close() + delete(mdb.masterStatus.slaveMap, slave.conn) + delete(mdb.masterStatus.waitSlaves, slave) + delete(mdb.masterStatus.onlineSlaves, slave) + } + + // clean master status + mdb.aofHandler.RemoveListener(mdb.masterStatus.aofListener) + _ = os.Remove(mdb.masterStatus.rdbFilename) + mdb.masterStatus.rdbFilename = "" + mdb.masterStatus.replId = "" + mdb.masterStatus.backlog = nil + mdb.masterStatus.slaveMap = make(map[redis.Connection]*slaveClient) + mdb.masterStatus.waitSlaves = make(map[*slaveClient]struct{}) + mdb.masterStatus.onlineSlaves = make(map[*slaveClient]struct{}) + mdb.masterStatus.bgSaveState = bgSaveIdle +} diff --git a/database/replication_master_test.go b/database/replication_master_test.go index 3e0c344..a0732d7 100644 --- a/database/replication_master_test.go +++ b/database/replication_master_test.go @@ -30,6 +30,7 @@ func mockServer() *MultiDB { server.dbSet[i] = holder } server.slaveStatus = initReplSlaveStatus() + server.initMaster() return server } @@ -49,8 +50,11 @@ func TestReplicationMasterSide(t *testing.T) { AppendFilename: aofFilename, } master := mockServer() - master.initAof() - master.startAsMaster() + aofHandler, err := NewAofHandler(master, config.Properties.AppendFilename, true) + if err != nil { + panic(err) + } + master.bindAofHandler(aofHandler) slave := mockServer() replConn := connection.NewFakeConn() @@ -209,8 +213,11 @@ func TestReplicationMasterRewriteRDB(t *testing.T) { AppendFilename: aofFilename, } master := mockServer() - master.initAof() - master.startAsMaster() + aofHandler, err := NewAofHandler(master, config.Properties.AppendFilename, true) + if err != nil { + panic(err) + } + master.bindAofHandler(aofHandler) masterConn := connection.NewFakeConn() resp := master.Exec(masterConn, utils.ToCmdLine("SET", "a", "a")) diff --git a/database/replication_slave.go b/database/replication_slave.go index d02f66c..94c9f42 100644 --- a/database/replication_slave.go +++ b/database/replication_slave.go @@ -13,7 +13,9 @@ import ( "github.com/hdt3213/godis/redis/parser" "github.com/hdt3213/godis/redis/protocol" rdb "github.com/hdt3213/rdb/parser" + "io/ioutil" "net" + "os" "strconv" "strings" "sync" @@ -83,6 +85,7 @@ func (mdb *MultiDB) slaveOfNone() { mdb.slaveStatus.replId = "" mdb.slaveStatus.replOffset = -1 mdb.slaveStatus.stopSlaveWithMutex() + mdb.role = masterRole } // stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF @@ -302,6 +305,25 @@ func (mdb *MultiDB) psyncHandshake() (bool, error) { return isFullReSync, nil } +func makeRdbLoader(upgradeAof bool) (*MultiDB, string, error) { + rdbLoader := MakeBasicMultiDB() + if !upgradeAof { + return rdbLoader, "", nil + } + // make aof handler to generate new aof file during loading rdb + newAofFile, err := ioutil.TempFile("", "*.aof") + if err != nil { + return nil, "", fmt.Errorf("create temp rdb failed: %v", err) + } + newAofFilename := newAofFile.Name() + aofHandler, err := NewAofHandler(rdbLoader, newAofFilename, false) + if err != nil { + return nil, "", err + } + rdbLoader.bindAofHandler(aofHandler) + return rdbLoader, newAofFilename, nil +} + // loadMasterRDB downloads rdb after handshake has been done func (mdb *MultiDB) loadMasterRDB(configVersion int32) error { rdbPayload := <-mdb.slaveStatus.masterChan @@ -315,8 +337,12 @@ func (mdb *MultiDB) loadMasterRDB(configVersion int32) error { logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(rdbReply.Arg))) rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg)) - rdbHolder := MakeBasicMultiDB() - err := importRDB(rdbDec, rdbHolder) + + rdbLoader, newAofFilename, err := makeRdbLoader(config.Properties.AppendOnly) + if err != nil { + return err + } + err = importRDB(rdbDec, rdbLoader) if err != nil { return errors.New("dump rdb failed: " + err.Error()) } @@ -327,12 +353,25 @@ func (mdb *MultiDB) loadMasterRDB(configVersion int32) error { // slaveStatus conf changed during connecting and waiting mutex return configChangedErr } - for i, h := range rdbHolder.dbSet { + for i, h := range rdbLoader.dbSet { newDB := h.Load().(*DB) mdb.loadDB(i, newDB) } - // fixme: update aof file + if config.Properties.AppendOnly { + // use new aof file + mdb.aofHandler.Close() + err = os.Rename(newAofFilename, config.Properties.AppendFilename) + if err != nil { + return err + } + aofHandler, err := NewAofHandler(mdb, config.Properties.AppendFilename, false) + if err != nil { + return err + } + mdb.bindAofHandler(aofHandler) + } + return nil } diff --git a/database/replication_slave_test.go b/database/replication_slave_test.go index 107e919..f25d9e3 100644 --- a/database/replication_slave_test.go +++ b/database/replication_slave_test.go @@ -8,29 +8,54 @@ import ( "github.com/hdt3213/godis/redis/connection" "github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol/asserts" + "io/ioutil" + "os" + "path" "testing" "time" ) func TestReplicationSlaveSide(t *testing.T) { - mdb := mockServer() + tmpDir, err := ioutil.TempDir("", "godis") + if err != nil { + t.Error(err) + return + } + aofFilename := path.Join(tmpDir, "a.aof") + defer func() { + _ = os.Remove(aofFilename) + }() + config.Properties = &config.ServerProperties{ + Databases: 16, + AppendOnly: true, + AppendFilename: aofFilename, + } + conn := connection.NewFakeConn() + server := mockServer() masterCli, err := client.MakeClient("127.0.0.1:6379") if err != nil { t.Error(err) + return } + aofHandler, err := NewAofHandler(server, config.Properties.AppendFilename, true) + if err != nil { + t.Error(err) + return + } + server.bindAofHandler(aofHandler) + server.Exec(conn, utils.ToCmdLine("set", "zz", "zz")) masterCli.Start() // sync with master ret := masterCli.Send(utils.ToCmdLine("set", "1", "1")) asserts.AssertStatusReply(t, ret, "OK") - conn := connection.NewFakeConn() - ret = mdb.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379")) + ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379")) asserts.AssertStatusReply(t, ret, "OK") success := false for i := 0; i < 30; i++ { // wait for sync time.Sleep(time.Second) - ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1")) + ret = server.Exec(conn, utils.ToCmdLine("GET", "1")) bulkRet, ok := ret.(*protocol.BulkReply) if ok { if bytes.Equal(bulkRet.Arg, []byte("1")) { @@ -51,7 +76,7 @@ func TestReplicationSlaveSide(t *testing.T) { for i := 0; i < 10; i++ { // wait for sync time.Sleep(time.Second) - ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1")) + ret = server.Exec(conn, utils.ToCmdLine("GET", "1")) bulkRet, ok := ret.(*protocol.BulkReply) if ok { if bytes.Equal(bulkRet.Arg, []byte("2")) { @@ -64,7 +89,7 @@ func TestReplicationSlaveSide(t *testing.T) { t.Error("sync failed") return } - err = mdb.slaveStatus.sendAck2Master() + err = server.slaveStatus.sendAck2Master() if err != nil { t.Error(err) return @@ -73,9 +98,9 @@ func TestReplicationSlaveSide(t *testing.T) { // test reconnect config.Properties.ReplTimeout = 1 - _ = mdb.slaveStatus.masterConn.Close() - mdb.slaveStatus.lastRecvTime = time.Now().Add(-time.Hour) // mock timeout - mdb.slaveCron() + _ = server.slaveStatus.masterConn.Close() + server.slaveStatus.lastRecvTime = time.Now().Add(-time.Hour) // mock timeout + server.slaveCron() time.Sleep(3 * time.Second) ret = masterCli.Send(utils.ToCmdLine("set", "1", "3")) asserts.AssertStatusReply(t, ret, "OK") @@ -83,7 +108,7 @@ func TestReplicationSlaveSide(t *testing.T) { for i := 0; i < 10; i++ { // wait for sync time.Sleep(time.Second) - ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1")) + ret = server.Exec(conn, utils.ToCmdLine("GET", "1")) bulkRet, ok := ret.(*protocol.BulkReply) if ok { if bytes.Equal(bulkRet.Arg, []byte("3")) { @@ -98,19 +123,19 @@ func TestReplicationSlaveSide(t *testing.T) { } // test slave of no one - ret = mdb.Exec(conn, utils.ToCmdLine("SLAVEOF", "NO", "ONE")) + ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "NO", "ONE")) asserts.AssertStatusReply(t, ret, "OK") ret = masterCli.Send(utils.ToCmdLine("set", "1", "4")) asserts.AssertStatusReply(t, ret, "OK") - ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1")) + ret = server.Exec(conn, utils.ToCmdLine("GET", "1")) asserts.AssertBulkReply(t, ret, "3") - ret = mdb.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379")) + ret = server.Exec(conn, utils.ToCmdLine("SLAVEOF", "127.0.0.1", "6379")) asserts.AssertStatusReply(t, ret, "OK") success = false for i := 0; i < 30; i++ { // wait for sync time.Sleep(time.Second) - ret = mdb.Exec(conn, utils.ToCmdLine("GET", "1")) + ret = server.Exec(conn, utils.ToCmdLine("GET", "1")) bulkRet, ok := ret.(*protocol.BulkReply) if ok { if bytes.Equal(bulkRet.Arg, []byte("4")) { @@ -124,7 +149,16 @@ func TestReplicationSlaveSide(t *testing.T) { return } - err = mdb.slaveStatus.close() + // check slave aof file + aofLoader := MakeBasicMultiDB() + aofHandler2, err := NewAofHandler(aofLoader, config.Properties.AppendFilename, true) + aofLoader.bindAofHandler(aofHandler2) + ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "zz")) + asserts.AssertNullBulk(t, ret) + ret = aofLoader.Exec(conn, utils.ToCmdLine("get", "1")) + asserts.AssertBulkReply(t, ret, "4") + + err = server.slaveStatus.close() if err != nil { t.Error("cannot close") } diff --git a/interface/redis/conn.go b/interface/redis/conn.go index 785538f..0f8f920 100644 --- a/interface/redis/conn.go +++ b/interface/redis/conn.go @@ -31,4 +31,6 @@ type Connection interface { SetMaster() IsMaster() bool + + Name() string } diff --git a/redis/connection/conn.go b/redis/connection/conn.go index 5f52383..0292f42 100644 --- a/redis/connection/conn.go +++ b/redis/connection/conn.go @@ -94,6 +94,13 @@ func (c *Connection) Write(b []byte) (int, error) { return c.conn.Write(b) } +func (c *Connection) Name() string { + if c.conn != nil { + return c.conn.RemoteAddr().String() + } + return "" +} + // Subscribe add current connection into subscribers of the given channel func (c *Connection) Subscribe(channel string) { c.mu.Lock()