refactor syncWithMaster

This commit is contained in:
finley
2022-11-08 23:30:43 +08:00
parent 72c263969c
commit 32a9667eb6

View File

@@ -27,10 +27,14 @@ const (
) )
type replicationStatus struct { type replicationStatus struct {
mutex sync.Mutex mutex sync.Mutex
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
modCount int32 // execSlaveOf() or reconnect will cause modCount++
// configVersion stands for the version of replication config. Any change of master host/port will cause configVersion increment
// If configVersion change has been found during replication current replication procedure will stop.
// It is designed to abort a running replication procedure
configVersion int32
masterHost string masterHost string
masterPort int masterPort int
@@ -43,6 +47,8 @@ type replicationStatus struct {
running sync.WaitGroup running sync.WaitGroup
} }
var configChangedErr = errors.New("replication config changed")
func initReplStatus() *replicationStatus { func initReplStatus() *replicationStatus {
repl := &replicationStatus{} repl := &replicationStatus{}
// start cron // start cron
@@ -79,7 +85,7 @@ func (mdb *MultiDB) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply {
mdb.replication.masterHost = host mdb.replication.masterHost = host
mdb.replication.masterPort = port mdb.replication.masterPort = port
// use buffered channel in case receiver goroutine exited before controller send stop signal // use buffered channel in case receiver goroutine exited before controller send stop signal
atomic.AddInt32(&mdb.replication.modCount, 1) atomic.AddInt32(&mdb.replication.configVersion, 1)
mdb.replication.mutex.Unlock() mdb.replication.mutex.Unlock()
go mdb.syncWithMaster() go mdb.syncWithMaster()
return protocol.MakeOkReply() return protocol.MakeOkReply()
@@ -98,8 +104,8 @@ func (mdb *MultiDB) slaveOfNone() {
// stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF // stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF
// invoker should have replication mutex // invoker should have replication mutex
func (repl *replicationStatus) stopSlaveWithMutex() { func (repl *replicationStatus) stopSlaveWithMutex() {
// update modCount to stop connectWithMaster and fullSync // update configVersion to stop connectWithMaster and fullSync
atomic.AddInt32(&repl.modCount, 1) atomic.AddInt32(&repl.configVersion, 1)
// send cancel to receiveAOF // send cancel to receiveAOF
if repl.cancel != nil { if repl.cancel != nil {
repl.cancel() repl.cancel()
@@ -127,30 +133,31 @@ func (mdb *MultiDB) syncWithMaster() {
logger.Error(err) logger.Error(err)
} }
}() }()
mdb.replication.mutex.Lock() var configVersion int32
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
mdb.replication.mutex.Lock()
mdb.replication.ctx = ctx mdb.replication.ctx = ctx
mdb.replication.cancel = cancel mdb.replication.cancel = cancel
configVersion = mdb.replication.configVersion
mdb.replication.mutex.Unlock() mdb.replication.mutex.Unlock()
err := mdb.connectWithMaster() err := mdb.connectWithMaster(configVersion)
if err != nil { if err != nil {
// full sync failed, abort // full sync failed, abort
return return
} }
err = mdb.doPsync() err = mdb.loadMasterRDB(configVersion)
if err != nil { if err != nil {
// full sync failed, abort
return return
} }
err = mdb.receiveAOF() err = mdb.receiveAOF(ctx, configVersion)
if err != nil { if err != nil {
// full sync failed, abort // full sync failed, abort
return return
} }
} }
func (mdb *MultiDB) connectWithMaster() error { // connectWithMaster finishes handshake with master
modCount := atomic.LoadInt32(&mdb.replication.modCount) func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
addr := mdb.replication.masterHost + ":" + strconv.Itoa(mdb.replication.masterPort) addr := mdb.replication.masterHost + ":" + strconv.Itoa(mdb.replication.masterPort)
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
if err != nil { if err != nil {
@@ -241,61 +248,38 @@ func (mdb *MultiDB) connectWithMaster() error {
// update connection // update connection
mdb.replication.mutex.Lock() mdb.replication.mutex.Lock()
if mdb.replication.modCount != modCount { defer mdb.replication.mutex.Unlock()
if mdb.replication.configVersion != configVersion {
// replication conf changed during connecting and waiting mutex // replication conf changed during connecting and waiting mutex
return nil return configChangedErr
} }
mdb.replication.masterConn = conn mdb.replication.masterConn = conn
mdb.replication.masterChan = masterChan mdb.replication.masterChan = masterChan
mdb.replication.mutex.Unlock() return mdb.psyncHandshake()
return nil
} }
// doPsync send psync to master and sync repl-id then load rdb // psyncHandshake send `psync` to master and sync repl-id/offset with master
func (mdb *MultiDB) doPsync() error { // invoker should provide with replication.mutex
modCount := atomic.LoadInt32(&mdb.replication.modCount) func (mdb *MultiDB) psyncHandshake() error {
psyncCmdLine := utils.ToCmdLine("psync", "?", "-1") psyncCmdLine := utils.ToCmdLine("psync", "?", "-1")
psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine) psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
_, err := mdb.replication.masterConn.Write(psyncReq.ToBytes()) _, err := mdb.replication.masterConn.Write(psyncReq.ToBytes())
if err != nil { if err != nil {
return errors.New("send failed " + err.Error()) return errors.New("send failed " + err.Error())
} }
ch := mdb.replication.masterChan psyncPayload := <-mdb.replication.masterChan
psyncPayload1 := <-ch if psyncPayload.Err != nil {
if psyncPayload1.Err != nil { return errors.New("read response failed: " + psyncPayload.Err.Error())
return errors.New("read response failed: " + psyncPayload1.Err.Error())
} }
psyncHeader, ok := psyncPayload1.Data.(*protocol.StatusReply) psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply)
if !ok { if !ok {
return errors.New("illegal payload header: " + string(psyncPayload1.Data.ToBytes())) return errors.New("illegal payload header: " + string(psyncPayload.Data.ToBytes()))
} }
headers := strings.Split(psyncHeader.Status, " ") headers := strings.Split(psyncHeader.Status, " ")
if len(headers) != 3 { if len(headers) != 3 {
return errors.New("illegal payload header: " + psyncHeader.Status) return errors.New("illegal payload header: " + psyncHeader.Status)
} }
logger.Info("receive psync header from master") logger.Info("receive psync header from master")
psyncPayload2 := <-ch
if psyncPayload2.Err != nil {
return errors.New("read response failed: " + psyncPayload2.Err.Error())
}
psyncData, ok := psyncPayload2.Data.(*protocol.BulkReply)
if !ok {
return errors.New("illegal payload header: " + string(psyncPayload2.Data.ToBytes()))
}
logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(psyncData.Arg)))
rdbDec := rdb.NewDecoder(bytes.NewReader(psyncData.Arg))
rdbHolder := MakeBasicMultiDB()
err = importRDB(rdbDec, rdbHolder)
if err != nil {
return errors.New("dump rdb failed: " + err.Error())
}
mdb.replication.mutex.Lock()
defer mdb.replication.mutex.Unlock()
if mdb.replication.modCount != modCount {
// replication conf changed during connecting and waiting mutex
return nil
}
mdb.replication.replId = headers[1] mdb.replication.replId = headers[1]
mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64) mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64)
if err != nil { if err != nil {
@@ -303,27 +287,48 @@ func (mdb *MultiDB) doPsync() error {
} }
logger.Info("full resync from master: " + mdb.replication.replId) logger.Info("full resync from master: " + mdb.replication.replId)
logger.Info("current offset:", mdb.replication.replOffset) logger.Info("current offset:", mdb.replication.replOffset)
return nil
}
// loadMasterRDB downloads rdb after handshake has been done
func (mdb *MultiDB) loadMasterRDB(configVersion int32) error {
rdbPayload := <-mdb.replication.masterChan
if rdbPayload.Err != nil {
return errors.New("read response failed: " + rdbPayload.Err.Error())
}
rdbReply, ok := rdbPayload.Data.(*protocol.BulkReply)
if !ok {
return errors.New("illegal payload header: " + string(rdbPayload.Data.ToBytes()))
}
logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(rdbReply.Arg)))
rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg))
rdbHolder := MakeBasicMultiDB()
err := importRDB(rdbDec, rdbHolder)
if err != nil {
return errors.New("dump rdb failed: " + err.Error())
}
mdb.replication.mutex.Lock()
defer mdb.replication.mutex.Unlock()
if mdb.replication.configVersion != configVersion {
// replication conf changed during connecting and waiting mutex
return configChangedErr
}
for i, h := range rdbHolder.dbSet { for i, h := range rdbHolder.dbSet {
newDB := h.Load().(*DB) newDB := h.Load().(*DB)
mdb.loadDB(i, newDB) mdb.loadDB(i, newDB)
} }
// there is no CRLF between RDB and following AOF, reset stream to avoid parser error // there is no CRLF between RDB and following AOF, reset stream to avoid parser error
mdb.replication.masterChan = parser.ParseStream(mdb.replication.masterConn) mdb.replication.masterChan = parser.ParseStream(mdb.replication.masterConn)
// fixme: update aof file // fixme: update aof file
return nil return nil
} }
func (mdb *MultiDB) receiveAOF() error { func (mdb *MultiDB) receiveAOF(ctx context.Context, configVersion int32) error {
conn := connection.NewConn(mdb.replication.masterConn) conn := connection.NewConn(mdb.replication.masterConn)
conn.SetRole(connection.ReplicationRecvCli) conn.SetRole(connection.ReplicationRecvCli)
mdb.replication.mutex.Lock()
modCount := mdb.replication.modCount
done := mdb.replication.ctx.Done()
mdb.replication.mutex.Unlock()
if done == nil {
// stopSlaveWithMutex() is called during waiting mutex
return nil
}
mdb.replication.running.Add(1) mdb.replication.running.Add(1)
defer mdb.replication.running.Done() defer mdb.replication.running.Done()
for { for {
@@ -340,18 +345,17 @@ func (mdb *MultiDB) receiveAOF() error {
return errors.New("unexpected payload: " + string(payload.Data.ToBytes())) return errors.New("unexpected payload: " + string(payload.Data.ToBytes()))
} }
mdb.replication.mutex.Lock() mdb.replication.mutex.Lock()
if mdb.replication.modCount != modCount { if mdb.replication.configVersion != configVersion {
// replication conf changed during connecting and waiting mutex // replication conf changed during connecting and waiting mutex
return nil return configChangedErr
} }
mdb.Exec(conn, cmdLine.Args) mdb.Exec(conn, cmdLine.Args)
// todo: directly get size from socket n := len(cmdLine.ToBytes()) // todo: directly get size from socket
n := len(cmdLine.ToBytes())
mdb.replication.replOffset += int64(n) mdb.replication.replOffset += int64(n)
logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d", logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d",
n, mdb.replication.replOffset)) n, mdb.replication.replOffset))
mdb.replication.mutex.Unlock() mdb.replication.mutex.Unlock()
case <-done: case <-ctx.Done():
return nil return nil
} }
} }