From 32a9667eb64248da3f265d2969daa646d1714007 Mon Sep 17 00:00:00 2001 From: finley Date: Tue, 8 Nov 2022 23:30:43 +0800 Subject: [PATCH] refactor syncWithMaster --- database/replication.go | 130 +++++++++++++++++++++------------------- 1 file changed, 67 insertions(+), 63 deletions(-) diff --git a/database/replication.go b/database/replication.go index e047fac..a0a2a38 100644 --- a/database/replication.go +++ b/database/replication.go @@ -27,10 +27,14 @@ const ( ) type replicationStatus struct { - mutex sync.Mutex - ctx context.Context - cancel context.CancelFunc - modCount int32 // execSlaveOf() or reconnect will cause modCount++ + mutex sync.Mutex + ctx context.Context + cancel context.CancelFunc + + // configVersion stands for the version of replication config. Any change of master host/port will cause configVersion increment + // If configVersion change has been found during replication current replication procedure will stop. + // It is designed to abort a running replication procedure + configVersion int32 masterHost string masterPort int @@ -43,6 +47,8 @@ type replicationStatus struct { running sync.WaitGroup } +var configChangedErr = errors.New("replication config changed") + func initReplStatus() *replicationStatus { repl := &replicationStatus{} // start cron @@ -79,7 +85,7 @@ func (mdb *MultiDB) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply { mdb.replication.masterHost = host mdb.replication.masterPort = port // use buffered channel in case receiver goroutine exited before controller send stop signal - atomic.AddInt32(&mdb.replication.modCount, 1) + atomic.AddInt32(&mdb.replication.configVersion, 1) mdb.replication.mutex.Unlock() go mdb.syncWithMaster() return protocol.MakeOkReply() @@ -98,8 +104,8 @@ func (mdb *MultiDB) slaveOfNone() { // stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF // invoker should have replication mutex func (repl *replicationStatus) stopSlaveWithMutex() { - // update modCount to stop connectWithMaster and fullSync - atomic.AddInt32(&repl.modCount, 1) + // update configVersion to stop connectWithMaster and fullSync + atomic.AddInt32(&repl.configVersion, 1) // send cancel to receiveAOF if repl.cancel != nil { repl.cancel() @@ -127,30 +133,31 @@ func (mdb *MultiDB) syncWithMaster() { logger.Error(err) } }() - mdb.replication.mutex.Lock() + var configVersion int32 ctx, cancel := context.WithCancel(context.Background()) + mdb.replication.mutex.Lock() mdb.replication.ctx = ctx mdb.replication.cancel = cancel + configVersion = mdb.replication.configVersion mdb.replication.mutex.Unlock() - err := mdb.connectWithMaster() + err := mdb.connectWithMaster(configVersion) if err != nil { // full sync failed, abort return } - err = mdb.doPsync() + err = mdb.loadMasterRDB(configVersion) if err != nil { - // full sync failed, abort return } - err = mdb.receiveAOF() + err = mdb.receiveAOF(ctx, configVersion) if err != nil { // full sync failed, abort return } } -func (mdb *MultiDB) connectWithMaster() error { - modCount := atomic.LoadInt32(&mdb.replication.modCount) +// connectWithMaster finishes handshake with master +func (mdb *MultiDB) connectWithMaster(configVersion int32) error { addr := mdb.replication.masterHost + ":" + strconv.Itoa(mdb.replication.masterPort) conn, err := net.Dial("tcp", addr) if err != nil { @@ -241,61 +248,38 @@ func (mdb *MultiDB) connectWithMaster() error { // update connection mdb.replication.mutex.Lock() - if mdb.replication.modCount != modCount { + defer mdb.replication.mutex.Unlock() + if mdb.replication.configVersion != configVersion { // replication conf changed during connecting and waiting mutex - return nil + return configChangedErr } mdb.replication.masterConn = conn mdb.replication.masterChan = masterChan - mdb.replication.mutex.Unlock() - return nil + return mdb.psyncHandshake() } -// doPsync send psync to master and sync repl-id then load rdb -func (mdb *MultiDB) doPsync() error { - modCount := atomic.LoadInt32(&mdb.replication.modCount) +// psyncHandshake send `psync` to master and sync repl-id/offset with master +// invoker should provide with replication.mutex +func (mdb *MultiDB) psyncHandshake() error { psyncCmdLine := utils.ToCmdLine("psync", "?", "-1") psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine) _, err := mdb.replication.masterConn.Write(psyncReq.ToBytes()) if err != nil { return errors.New("send failed " + err.Error()) } - ch := mdb.replication.masterChan - psyncPayload1 := <-ch - if psyncPayload1.Err != nil { - return errors.New("read response failed: " + psyncPayload1.Err.Error()) + psyncPayload := <-mdb.replication.masterChan + if psyncPayload.Err != nil { + return errors.New("read response failed: " + psyncPayload.Err.Error()) } - psyncHeader, ok := psyncPayload1.Data.(*protocol.StatusReply) + psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply) if !ok { - return errors.New("illegal payload header: " + string(psyncPayload1.Data.ToBytes())) + return errors.New("illegal payload header: " + string(psyncPayload.Data.ToBytes())) } headers := strings.Split(psyncHeader.Status, " ") if len(headers) != 3 { return errors.New("illegal payload header: " + psyncHeader.Status) } logger.Info("receive psync header from master") - psyncPayload2 := <-ch - if psyncPayload2.Err != nil { - return errors.New("read response failed: " + psyncPayload2.Err.Error()) - } - psyncData, ok := psyncPayload2.Data.(*protocol.BulkReply) - if !ok { - return errors.New("illegal payload header: " + string(psyncPayload2.Data.ToBytes())) - } - logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(psyncData.Arg))) - rdbDec := rdb.NewDecoder(bytes.NewReader(psyncData.Arg)) - rdbHolder := MakeBasicMultiDB() - err = importRDB(rdbDec, rdbHolder) - if err != nil { - return errors.New("dump rdb failed: " + err.Error()) - } - - mdb.replication.mutex.Lock() - defer mdb.replication.mutex.Unlock() - if mdb.replication.modCount != modCount { - // replication conf changed during connecting and waiting mutex - return nil - } mdb.replication.replId = headers[1] mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64) if err != nil { @@ -303,27 +287,48 @@ func (mdb *MultiDB) doPsync() error { } logger.Info("full resync from master: " + mdb.replication.replId) logger.Info("current offset:", mdb.replication.replOffset) + return nil +} + +// loadMasterRDB downloads rdb after handshake has been done +func (mdb *MultiDB) loadMasterRDB(configVersion int32) error { + rdbPayload := <-mdb.replication.masterChan + if rdbPayload.Err != nil { + return errors.New("read response failed: " + rdbPayload.Err.Error()) + } + rdbReply, ok := rdbPayload.Data.(*protocol.BulkReply) + if !ok { + return errors.New("illegal payload header: " + string(rdbPayload.Data.ToBytes())) + } + + logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(rdbReply.Arg))) + rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg)) + rdbHolder := MakeBasicMultiDB() + err := importRDB(rdbDec, rdbHolder) + if err != nil { + return errors.New("dump rdb failed: " + err.Error()) + } + + mdb.replication.mutex.Lock() + defer mdb.replication.mutex.Unlock() + if mdb.replication.configVersion != configVersion { + // replication conf changed during connecting and waiting mutex + return configChangedErr + } for i, h := range rdbHolder.dbSet { newDB := h.Load().(*DB) mdb.loadDB(i, newDB) } + // there is no CRLF between RDB and following AOF, reset stream to avoid parser error mdb.replication.masterChan = parser.ParseStream(mdb.replication.masterConn) // fixme: update aof file return nil } -func (mdb *MultiDB) receiveAOF() error { +func (mdb *MultiDB) receiveAOF(ctx context.Context, configVersion int32) error { conn := connection.NewConn(mdb.replication.masterConn) conn.SetRole(connection.ReplicationRecvCli) - mdb.replication.mutex.Lock() - modCount := mdb.replication.modCount - done := mdb.replication.ctx.Done() - mdb.replication.mutex.Unlock() - if done == nil { - // stopSlaveWithMutex() is called during waiting mutex - return nil - } mdb.replication.running.Add(1) defer mdb.replication.running.Done() for { @@ -340,18 +345,17 @@ func (mdb *MultiDB) receiveAOF() error { return errors.New("unexpected payload: " + string(payload.Data.ToBytes())) } mdb.replication.mutex.Lock() - if mdb.replication.modCount != modCount { + if mdb.replication.configVersion != configVersion { // replication conf changed during connecting and waiting mutex - return nil + return configChangedErr } mdb.Exec(conn, cmdLine.Args) - // todo: directly get size from socket - n := len(cmdLine.ToBytes()) + n := len(cmdLine.ToBytes()) // todo: directly get size from socket mdb.replication.replOffset += int64(n) logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d", n, mdb.replication.replOffset)) mdb.replication.mutex.Unlock() - case <-done: + case <-ctx.Done(): return nil } }