diff --git a/aof/aof.go b/aof/aof.go index 8d58acb..7a3d33c 100644 --- a/aof/aof.go +++ b/aof/aof.go @@ -26,9 +26,12 @@ type payload struct { dbIndex int } -// Listener is a channel receive a replication of all aof payloads +// Listener will be called-back after receiving a aof payload // with a listener we can forward the updates to slave nodes etc. -type Listener chan<- CmdLine +type Listener interface { + // Callback will be called-back after receiving a aof payload + Callback([]CmdLine) +} // Handler receive msgs from channel and write to AOF file type Handler struct { @@ -112,9 +115,7 @@ func (handler *Handler) handleAof() { } handler.pausingAof.RUnlock() for listener := range handler.listeners { - for _, line := range cmdLines { - listener <- line - } + listener.Callback(cmdLines) } } handler.aofFinished <- struct{}{} diff --git a/aof/rdb.go b/aof/rdb.go index 893981b..797b920 100644 --- a/aof/rdb.go +++ b/aof/rdb.go @@ -19,9 +19,8 @@ import ( // todo: forbid concurrent rewrite // Rewrite2RDB rewrite aof data into rdb -// if extraListener is not nil, it will be appended to Handler.listeners, it will receive all updates after rdb -func (handler *Handler) Rewrite2RDB(rdbFilename string, extraListener Listener) error { - ctx, err := handler.startRewrite2RDB(extraListener) +func (handler *Handler) Rewrite2RDB(rdbFilename string) error { + ctx, err := handler.startRewrite2RDB(nil, nil) if err != nil { return err } @@ -40,7 +39,30 @@ func (handler *Handler) Rewrite2RDB(rdbFilename string, extraListener Listener) return nil } -func (handler *Handler) startRewrite2RDB(extraListener Listener) (*RewriteCtx, error) { +// Rewrite2RDBForReplication asynchronously rewrite aof data into rdb and returns a channel to receive following data +// parameter listener would receive following updates of rdb +// parameter hook allows you to do something during aof pausing +func (handler *Handler) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error { + ctx, err := handler.startRewrite2RDB(listener, hook) + if err != nil { + return err + } + err = handler.rewrite2RDB(ctx) + if err != nil { + return err + } + err = ctx.tmpFile.Close() + if err != nil { + return err + } + err = os.Rename(ctx.tmpFile.Name(), rdbFilename) + if err != nil { + return err + } + return nil +} + +func (handler *Handler) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) { handler.pausingAof.Lock() // pausing aof defer handler.pausingAof.Unlock() @@ -59,8 +81,11 @@ func (handler *Handler) startRewrite2RDB(extraListener Listener) (*RewriteCtx, e logger.Warn("tmp file create failed") return nil, err } - if extraListener != nil { - handler.listeners[extraListener] = struct{}{} + if newListener != nil { + handler.listeners[newListener] = struct{}{} + } + if hook != nil { + hook() } return &RewriteCtx{ tmpFile: file, diff --git a/database/database.go b/database/database.go index ca7a35d..24f7b2f 100644 --- a/database/database.go +++ b/database/database.go @@ -319,7 +319,7 @@ func SaveRDB(db *MultiDB, args [][]byte) redis.Reply { if rdbFilename == "" { rdbFilename = "dump.rdb" } - err := db.aofHandler.Rewrite2RDB(rdbFilename, nil) + err := db.aofHandler.Rewrite2RDB(rdbFilename) if err != nil { return protocol.MakeErrReply(err.Error()) } @@ -341,7 +341,7 @@ func BGSaveRDB(db *MultiDB, args [][]byte) redis.Reply { if rdbFilename == "" { rdbFilename = "dump.rdb" } - err := db.aofHandler.Rewrite2RDB(rdbFilename, nil) + err := db.aofHandler.Rewrite2RDB(rdbFilename) if err != nil { logger.Error(err) } diff --git a/database/replication_master.go b/database/replication_master.go index 29a2ce3..06208fe 100644 --- a/database/replication_master.go +++ b/database/replication_master.go @@ -1,11 +1,11 @@ package database import ( - "context" "errors" "fmt" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/sync/atomic" "github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/redis/protocol" "io" @@ -47,23 +47,42 @@ type slaveClient struct { capacity uint8 } -type masterStatus struct { - ctx context.Context - mu sync.RWMutex - replId string - backlog []byte // backlog can be appended or replaced as a whole, cannot be modified(insert/set/delete) +// aofListener 只负责更新 backlog +type replBacklog struct { + buf []byte beginOffset int64 currentOffset int64 - slaveMap map[redis.Connection]*slaveClient - waitSlaves map[*slaveClient]struct{} - onlineSlaves map[*slaveClient]struct{} - bgSaveState uint8 - rdbFilename string } -func (master *masterStatus) appendBacklog(bin []byte) { - master.backlog = append(master.backlog, bin...) - master.currentOffset += int64(len(bin)) +func (backlog *replBacklog) appendBytes(bin []byte) { + backlog.buf = append(backlog.buf, bin...) + backlog.currentOffset += int64(len(bin)) +} + +func (backlog *replBacklog) getSnapshot() ([]byte, int64) { + return backlog.buf[:], backlog.currentOffset +} + +func (backlog *replBacklog) getSnapshotAfter(beginOffset int64) ([]byte, int64) { + beg := beginOffset - backlog.beginOffset + return backlog.buf[beg:], backlog.currentOffset +} + +func (backlog *replBacklog) isValidOffset(offset int64) bool { + return offset >= backlog.beginOffset && offset < backlog.currentOffset +} + +type masterStatus struct { + mu sync.RWMutex + replId string + backlog *replBacklog + slaveMap map[redis.Connection]*slaveClient + waitSlaves map[*slaveClient]struct{} + onlineSlaves map[*slaveClient]struct{} + bgSaveState uint8 + rdbFilename string + aofListener *replAofListener + rewriting atomic.Boolean } func (mdb *MultiDB) bgSaveForReplication() { @@ -90,16 +109,18 @@ func (mdb *MultiDB) saveForReplication() error { mdb.masterStatus.mu.Lock() mdb.masterStatus.bgSaveState = bgSaveRunning mdb.masterStatus.rdbFilename = rdbFilename // todo: can reuse config.Properties.RDBFilename? + aofListener := &replAofListener{ + mdb: mdb, + backlog: mdb.masterStatus.backlog, + } + mdb.masterStatus.aofListener = aofListener mdb.masterStatus.mu.Unlock() - aofListener := make(chan CmdLine, 1024) // give channel enough capacity to store all updates during rewrite to db - err = mdb.aofHandler.Rewrite2RDB(rdbFilename, aofListener) + err = mdb.aofHandler.Rewrite2RDBForReplication(rdbFilename, aofListener, nil) if err != nil { return err } - go func() { - mdb.masterListenAof(aofListener) - }() + aofListener.readyToSend = true // change bgSaveState and get waitSlaves for sending waitSlaves := make(map[*slaveClient]struct{}) @@ -122,11 +143,43 @@ func (mdb *MultiDB) saveForReplication() error { return nil } +func (mdb *MultiDB) rewriteRDB() error { + rdbFile, err := ioutil.TempFile("", "*.rdb") + if err != nil { + return fmt.Errorf("create temp rdb failed: %v", err) + } + rdbFilename := rdbFile.Name() + newBacklog := &replBacklog{} + aofListener := &replAofListener{ + backlog: newBacklog, + mdb: mdb, + } + hook := func() { + // pausing aof first, then lock masterStatus. + // use the same order as replAofListener to avoid dead lock + mdb.masterStatus.mu.Lock() + defer mdb.masterStatus.mu.Unlock() + newBacklog.beginOffset = mdb.masterStatus.backlog.currentOffset + } + err = mdb.aofHandler.Rewrite2RDBForReplication(rdbFilename, aofListener, hook) + if err != nil { // wait rdb result + return err + } + mdb.masterStatus.mu.Lock() + mdb.masterStatus.rdbFilename = rdbFilename + mdb.masterStatus.backlog = newBacklog + mdb.masterStatus.mu.Unlock() + // It is ok to know that new backlog is ready later, so we change readyToSend without sync + // But setting readyToSend=true must after new backlog is really ready (that means master.mu.Unlock) + aofListener.readyToSend = true + return nil +} + // masterFullReSyncWithSlave send replication header, rdb file and all backlogs to slave func (mdb *MultiDB) masterFullReSyncWithSlave(slave *slaveClient) error { // write replication header header := "+FULLRESYNC " + mdb.masterStatus.replId + " " + - strconv.FormatInt(mdb.masterStatus.beginOffset, 10) + protocol.CRLF + strconv.FormatInt(mdb.masterStatus.backlog.beginOffset, 10) + protocol.CRLF _, err := slave.conn.Write([]byte(header)) if err != nil { return fmt.Errorf("write replication header to slave failed: %v", err) @@ -151,8 +204,7 @@ func (mdb *MultiDB) masterFullReSyncWithSlave(slave *slaveClient) error { // send backlog mdb.masterStatus.mu.RLock() - currentOffset := mdb.masterStatus.currentOffset - backlog := mdb.masterStatus.backlog[:currentOffset-mdb.masterStatus.beginOffset] + backlog, currentOffset := mdb.masterStatus.backlog.getSnapshot() mdb.masterStatus.mu.RUnlock() _, err = slave.conn.Write(backlog) if err != nil { @@ -172,12 +224,11 @@ func (mdb *MultiDB) masterTryPartialSyncWithSlave(slave *slaveClient, replId str mdb.masterStatus.mu.RUnlock() return cannotPartialSync } - if slaveOffset < mdb.masterStatus.beginOffset || slaveOffset > mdb.masterStatus.currentOffset { + if !mdb.masterStatus.backlog.isValidOffset(slaveOffset) { mdb.masterStatus.mu.RUnlock() return cannotPartialSync } - currentOffset := mdb.masterStatus.currentOffset - backlog := mdb.masterStatus.backlog[slaveOffset-mdb.masterStatus.beginOffset : currentOffset-mdb.masterStatus.beginOffset] + backlog, currentOffset := mdb.masterStatus.backlog.getSnapshotAfter(slaveOffset) mdb.masterStatus.mu.RUnlock() // send replication header @@ -197,12 +248,13 @@ func (mdb *MultiDB) masterTryPartialSyncWithSlave(slave *slaveClient, replId str return nil } +// masterSendUpdatesToSlave only sends data to online slaves after bgSave is finished +// if bgSave is running, updates will be sent after the saving finished func (mdb *MultiDB) masterSendUpdatesToSlave() error { onlineSlaves := make(map[*slaveClient]struct{}) mdb.masterStatus.mu.RLock() - currentOffset := mdb.masterStatus.currentOffset - beginOffset := mdb.masterStatus.beginOffset - backlog := mdb.masterStatus.backlog[:currentOffset-beginOffset] + beginOffset := mdb.masterStatus.backlog.beginOffset + backlog, currentOffset := mdb.masterStatus.backlog.getSnapshot() for slave := range mdb.masterStatus.onlineSlaves { onlineSlaves[slave] = struct{}{} } @@ -211,7 +263,7 @@ func (mdb *MultiDB) masterSendUpdatesToSlave() error { slaveBeginOffset := slave.offset - beginOffset _, err := slave.conn.Write(backlog[slaveBeginOffset:]) if err != nil { - logger.Errorf("send updates write backlog to slave failed: %v", err) + logger.Errorf("send updates backlog to slave failed: %v", err) mdb.removeSlave(slave) continue } @@ -313,50 +365,64 @@ func (mdb *MultiDB) setSlaveOnline(slave *slaveClient, currentOffset int64) { var pingBytes = protocol.MakeMultiBulkReply(utils.ToCmdLine("ping")).ToBytes() +const maxBacklogSize = 10 * 1024 * 1024 // 10MB + func (mdb *MultiDB) masterCron() { if mdb.role != masterRole { return } mdb.masterStatus.mu.Lock() if mdb.masterStatus.bgSaveState == bgSaveFinish { - mdb.masterStatus.appendBacklog(pingBytes) + mdb.masterStatus.backlog.appendBytes(pingBytes) } + backlogSize := len(mdb.masterStatus.backlog.buf) mdb.masterStatus.mu.Unlock() if err := mdb.masterSendUpdatesToSlave(); err != nil { logger.Errorf("masterSendUpdatesToSlave error: %v", err) } + if backlogSize > maxBacklogSize && !mdb.masterStatus.rewriting.Get() { + go func() { + mdb.masterStatus.rewriting.Set(true) + defer mdb.masterStatus.rewriting.Set(false) + if err := mdb.rewriteRDB(); err != nil { + mdb.masterStatus.rewriting.Set(false) + logger.Errorf("rewrite error: %v", err) + } + }() + } } -func (mdb *MultiDB) masterListenAof(listener chan CmdLine) { - for { - select { - case cmdLine := <-listener: - mdb.masterStatus.mu.Lock() - reply := protocol.MakeMultiBulkReply(cmdLine) - mdb.masterStatus.appendBacklog(reply.ToBytes()) - mdb.masterStatus.mu.Unlock() - if err := mdb.masterSendUpdatesToSlave(); err != nil { - logger.Errorf("masterSendUpdatesToSlave after receive aof error: %v", err) - } - // if bgSave is running, updates will be sent after the save finished - case <-mdb.masterStatus.ctx.Done(): - break +type replAofListener struct { + mdb *MultiDB + backlog *replBacklog // may NOT be mdb.masterStatus.backlog + readyToSend bool +} + +func (listener *replAofListener) Callback(cmdLines []CmdLine) { + listener.mdb.masterStatus.mu.Lock() + for _, cmdLine := range cmdLines { + reply := protocol.MakeMultiBulkReply(cmdLine) + listener.backlog.appendBytes(reply.ToBytes()) + } + listener.mdb.masterStatus.mu.Unlock() + // listener could receive updates generated during rdb saving in progress + // Do not send updates to slave before rdb saving is finished + if listener.readyToSend { + if err := listener.mdb.masterSendUpdatesToSlave(); err != nil { + logger.Errorf("masterSendUpdatesToSlave after receive aof error: %v", err) } } } func (mdb *MultiDB) startAsMaster() { mdb.masterStatus = &masterStatus{ - ctx: context.Background(), - mu: sync.RWMutex{}, - replId: utils.RandHexString(40), - backlog: nil, - beginOffset: 0, - currentOffset: 0, - slaveMap: make(map[redis.Connection]*slaveClient), - waitSlaves: make(map[*slaveClient]struct{}), - onlineSlaves: make(map[*slaveClient]struct{}), - bgSaveState: bgSaveIdle, - rdbFilename: "", + mu: sync.RWMutex{}, + replId: utils.RandHexString(40), + backlog: &replBacklog{}, + slaveMap: make(map[redis.Connection]*slaveClient), + waitSlaves: make(map[*slaveClient]struct{}), + onlineSlaves: make(map[*slaveClient]struct{}), + bgSaveState: bgSaveIdle, + rdbFilename: "", } } diff --git a/database/replication_master_test.go b/database/replication_master_test.go index 5b361d4..3e0c344 100644 --- a/database/replication_master_test.go +++ b/database/replication_master_test.go @@ -192,3 +192,114 @@ func TestReplicationMasterSide(t *testing.T) { resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "c")) asserts.AssertBulkReply(t, resp, "c") } + +func TestReplicationMasterRewriteRDB(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "godis") + if err != nil { + t.Error(err) + return + } + aofFilename := path.Join(tmpDir, "a.aof") + defer func() { + _ = os.Remove(aofFilename) + }() + config.Properties = &config.ServerProperties{ + Databases: 16, + AppendOnly: true, + AppendFilename: aofFilename, + } + master := mockServer() + master.initAof() + master.startAsMaster() + + masterConn := connection.NewFakeConn() + resp := master.Exec(masterConn, utils.ToCmdLine("SET", "a", "a")) + asserts.AssertNotError(t, resp) + resp = master.Exec(masterConn, utils.ToCmdLine("SET", "b", "b")) + asserts.AssertNotError(t, resp) + time.Sleep(time.Millisecond * 100) // wait write aof + + err = master.rewriteRDB() + if err != nil { + t.Error(err) + return + } + resp = master.Exec(masterConn, utils.ToCmdLine("SET", "c", "c")) + asserts.AssertNotError(t, resp) + time.Sleep(time.Millisecond * 100) // wait write aof + + // set slave + slave := mockServer() + replConn := connection.NewFakeConn() + master.Exec(replConn, utils.ToCmdLine("psync", "?", "-1")) + masterChan := parser.ParseStream(replConn) + psyncPayload := <-masterChan + if psyncPayload.Err != nil { + t.Errorf("master bad protocol: %v", psyncPayload.Err) + return + } + psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply) + if !ok { + t.Error("psync header is not a status reply") + return + } + headers := strings.Split(psyncHeader.Status, " ") + if len(headers) != 3 { + t.Errorf("illegal psync header: %s", psyncHeader.Status) + return + } + + replId := headers[1] + replOffset, err := strconv.ParseInt(headers[2], 10, 64) + if err != nil { + t.Errorf("illegal offset: %s", headers[2]) + return + } + t.Logf("repl id: %s, offset: %d", replId, replOffset) + + rdbPayload := <-masterChan + if rdbPayload.Err != nil { + t.Error("read response failed: " + rdbPayload.Err.Error()) + return + } + rdbReply, ok := rdbPayload.Data.(*protocol.BulkReply) + if !ok { + t.Error("illegal payload header: " + string(rdbPayload.Data.ToBytes())) + return + } + + rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg)) + err = importRDB(rdbDec, slave) + if err != nil { + t.Error("import rdb failed: " + err.Error()) + return + } + + slaveConn := connection.NewFakeConn() + resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "a")) + asserts.AssertBulkReply(t, resp, "a") + resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "b")) + asserts.AssertBulkReply(t, resp, "b") + + master.masterCron() + for { + payload := <-masterChan + if payload.Err != nil { + t.Error(payload.Err) + return + } + cmdLine, ok := payload.Data.(*protocol.MultiBulkReply) + if !ok { + t.Error("unexpected payload: " + string(payload.Data.ToBytes())) + return + } + slave.Exec(replConn, cmdLine.Args) + n := len(cmdLine.ToBytes()) + slave.slaveStatus.replOffset += int64(n) + if string(cmdLine.Args[0]) != "ping" { + break + } + } + resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "c")) + asserts.AssertBulkReply(t, resp, "c") +} diff --git a/redis/connection/conn.go b/redis/connection/conn.go index 66fbfbd..5f52383 100644 --- a/redis/connection/conn.go +++ b/redis/connection/conn.go @@ -21,7 +21,7 @@ const ( type Connection struct { conn net.Conn - // waiting until finish sending data, used for graceful shutdown + // wait until finish sending data, used for graceful shutdown sendingData wait.Wait // lock while server sending response diff --git a/redis/connection/fake.go b/redis/connection/fake.go index 4718164..0b28c2f 100644 --- a/redis/connection/fake.go +++ b/redis/connection/fake.go @@ -1,7 +1,8 @@ package connection import ( - "bytes" + "fmt" + "github.com/hdt3213/godis/lib/logger" "io" "sync" ) @@ -9,8 +10,9 @@ import ( // FakeConn implements redis.Connection for test type FakeConn struct { Connection - buf bytes.Buffer - wait chan struct{} + buf []byte + offset int + waitOn chan struct{} closed bool mu sync.Mutex } @@ -25,51 +27,76 @@ func (c *FakeConn) Write(b []byte) (int, error) { if c.closed { return 0, io.EOF } - n, _ := c.buf.Write(b) + c.mu.Lock() + c.buf = append(c.buf, b...) + c.mu.Unlock() c.notify() - return n, nil + return len(b), nil } func (c *FakeConn) notify() { - if c.wait != nil { + if c.waitOn != nil { c.mu.Lock() - if c.wait != nil { - close(c.wait) - c.wait = nil + if c.waitOn != nil { + logger.Debug(fmt.Sprintf("notify %p", c.waitOn)) + close(c.waitOn) + c.waitOn = nil } c.mu.Unlock() } } -func (c *FakeConn) waiting() { +func (c *FakeConn) wait(offset int) { c.mu.Lock() - c.wait = make(chan struct{}) + if c.offset != offset { // new data during waiting lock + return + } + if c.waitOn == nil { + c.waitOn = make(chan struct{}) + } + waitOn := c.waitOn + logger.Debug(fmt.Sprintf("wait on %p", waitOn)) c.mu.Unlock() - <-c.wait + <-waitOn + logger.Debug(fmt.Sprintf("wait on %p finish", waitOn)) } // Read reads data from buffer func (c *FakeConn) Read(p []byte) (int, error) { - n, err := c.buf.Read(p) - if err == io.EOF { + c.mu.Lock() + n := copy(p, c.buf[c.offset:]) + c.offset += n + offset := c.offset + c.mu.Unlock() + if n == 0 { if c.closed { - return 0, io.EOF + return n, io.EOF } - c.waiting() - return c.buf.Read(p) + c.wait(offset) + // after notify + if c.closed { + return n, io.EOF + } + n = copy(p, c.buf[c.offset:]) + c.offset += n + return n, nil } - return n, err + if c.closed { + return n, io.EOF + } + return n, nil } // Clean resets the buffer func (c *FakeConn) Clean() { - c.wait = make(chan struct{}) - c.buf.Reset() + c.waitOn = make(chan struct{}) + c.buf = nil + c.offset = 0 } // Bytes returns written data func (c *FakeConn) Bytes() []byte { - return c.buf.Bytes() + return c.buf } func (c *FakeConn) Close() error { diff --git a/redis/parser/parser.go b/redis/parser/parser.go index a5702af..61d4c03 100644 --- a/redis/parser/parser.go +++ b/redis/parser/parser.go @@ -156,6 +156,9 @@ func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) er func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { header, err := reader.ReadBytes('\n') header = bytes.TrimSuffix(header, []byte{'\r', '\n'}) + if len(header) == 0 { + return errors.New("empty header") + } strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || strLen <= 0 { return errors.New("illegal bulk header: " + string(header))