improve replication; support partial sync

This commit is contained in:
finley
2022-11-17 23:27:13 +08:00
parent 60a8559739
commit febb83cb5c
4 changed files with 114 additions and 54 deletions

View File

@@ -30,7 +30,7 @@ type MultiDB struct {
// store master node address // store master node address
slaveOf string slaveOf string
role int32 role int32
replication *replicationStatus replication *slaveStatus
} }
// NewStandaloneServer creates a standalone redis server, with multi database and all other funtions // NewStandaloneServer creates a standalone redis server, with multi database and all other funtions

View File

@@ -26,7 +26,7 @@ const (
slaveRole slaveRole
) )
type replicationStatus struct { type slaveStatus struct {
mutex sync.Mutex mutex sync.Mutex
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@@ -49,8 +49,8 @@ type replicationStatus struct {
var configChangedErr = errors.New("replication config changed") var configChangedErr = errors.New("replication config changed")
func initReplStatus() *replicationStatus { func initReplStatus() *slaveStatus {
repl := &replicationStatus{} repl := &slaveStatus{}
// start cron // start cron
return repl return repl
} }
@@ -87,7 +87,7 @@ func (mdb *MultiDB) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply {
// use buffered channel in case receiver goroutine exited before controller send stop signal // use buffered channel in case receiver goroutine exited before controller send stop signal
atomic.AddInt32(&mdb.replication.configVersion, 1) atomic.AddInt32(&mdb.replication.configVersion, 1)
mdb.replication.mutex.Unlock() mdb.replication.mutex.Unlock()
go mdb.syncWithMaster() go mdb.setupMaster()
return protocol.MakeOkReply() return protocol.MakeOkReply()
} }
@@ -103,7 +103,7 @@ func (mdb *MultiDB) slaveOfNone() {
// stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF // stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF
// invoker should have replication mutex // invoker should have replication mutex
func (repl *replicationStatus) stopSlaveWithMutex() { func (repl *slaveStatus) stopSlaveWithMutex() {
// update configVersion to stop connectWithMaster and fullSync // update configVersion to stop connectWithMaster and fullSync
atomic.AddInt32(&repl.configVersion, 1) atomic.AddInt32(&repl.configVersion, 1)
// send cancel to receiveAOF // send cancel to receiveAOF
@@ -120,14 +120,14 @@ func (repl *replicationStatus) stopSlaveWithMutex() {
repl.masterChan = nil repl.masterChan = nil
} }
func (repl *replicationStatus) close() error { func (repl *slaveStatus) close() error {
repl.mutex.Lock() repl.mutex.Lock()
defer repl.mutex.Unlock() defer repl.mutex.Unlock()
repl.stopSlaveWithMutex() repl.stopSlaveWithMutex()
return nil return nil
} }
func (mdb *MultiDB) syncWithMaster() { func (mdb *MultiDB) setupMaster() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logger.Error(err) logger.Error(err)
@@ -140,29 +140,38 @@ func (mdb *MultiDB) syncWithMaster() {
mdb.replication.cancel = cancel mdb.replication.cancel = cancel
configVersion = mdb.replication.configVersion configVersion = mdb.replication.configVersion
mdb.replication.mutex.Unlock() mdb.replication.mutex.Unlock()
err := mdb.connectWithMaster(configVersion) isFullReSync, err := mdb.connectWithMaster(configVersion)
if err != nil { if err != nil {
// full sync failed, abort // connect failed, abort master
logger.Error(err)
mdb.slaveOfNone()
return return
} }
if isFullReSync {
err = mdb.loadMasterRDB(configVersion) err = mdb.loadMasterRDB(configVersion)
if err != nil { if err != nil {
// load failed, abort master
logger.Error(err)
mdb.slaveOfNone()
return return
} }
}
err = mdb.receiveAOF(ctx, configVersion) err = mdb.receiveAOF(ctx, configVersion)
if err != nil { if err != nil {
// full sync failed, abort // full sync failed, abort
logger.Error(err)
return return
} }
} }
// connectWithMaster finishes handshake with master // connectWithMaster finishes handshake with master
func (mdb *MultiDB) connectWithMaster(configVersion int32) error { // returns: isFullReSync, error
func (mdb *MultiDB) connectWithMaster(configVersion int32) (bool, error) {
addr := mdb.replication.masterHost + ":" + strconv.Itoa(mdb.replication.masterPort) addr := mdb.replication.masterHost + ":" + strconv.Itoa(mdb.replication.masterPort)
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
if err != nil { if err != nil {
mdb.slaveOfNone() // abort mdb.slaveOfNone() // abort
return errors.New("connect master failed " + err.Error()) return false, errors.New("connect master failed " + err.Error())
} }
masterChan := parser.ParseStream(conn) masterChan := parser.ParseStream(conn)
@@ -171,11 +180,11 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
pingReq := protocol.MakeMultiBulkReply(pingCmdLine) pingReq := protocol.MakeMultiBulkReply(pingCmdLine)
_, err = conn.Write(pingReq.ToBytes()) _, err = conn.Write(pingReq.ToBytes())
if err != nil { if err != nil {
return errors.New("send failed " + err.Error()) return false, errors.New("send failed " + err.Error())
} }
pingResp := <-masterChan pingResp := <-masterChan
if pingResp.Err != nil { if pingResp.Err != nil {
return errors.New("read response failed: " + pingResp.Err.Error()) return false, errors.New("read response failed: " + pingResp.Err.Error())
} }
switch reply := pingResp.Data.(type) { switch reply := pingResp.Data.(type) {
case *protocol.StandardErrReply: case *protocol.StandardErrReply:
@@ -184,7 +193,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
!strings.HasPrefix(reply.Error(), "ERR operation not permitted") { !strings.HasPrefix(reply.Error(), "ERR operation not permitted") {
logger.Error("Error reply to PING from master: " + string(reply.ToBytes())) logger.Error("Error reply to PING from master: " + string(reply.ToBytes()))
mdb.slaveOfNone() // abort mdb.slaveOfNone() // abort
return nil return false, nil
} }
} }
@@ -213,7 +222,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
authCmdLine := utils.ToCmdLine("auth", config.Properties.MasterAuth) authCmdLine := utils.ToCmdLine("auth", config.Properties.MasterAuth)
err = sendCmdToMaster(conn, authCmdLine, masterChan) err = sendCmdToMaster(conn, authCmdLine, masterChan)
if err != nil { if err != nil {
return err return false, err
} }
} }
@@ -227,7 +236,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
portCmdLine := utils.ToCmdLine("REPLCONF", "listening-port", strconv.Itoa(port)) portCmdLine := utils.ToCmdLine("REPLCONF", "listening-port", strconv.Itoa(port))
err = sendCmdToMaster(conn, portCmdLine, masterChan) err = sendCmdToMaster(conn, portCmdLine, masterChan)
if err != nil { if err != nil {
return err return false, err
} }
// announce ip // announce ip
@@ -235,7 +244,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
ipCmdLine := utils.ToCmdLine("REPLCONF", "ip-address", config.Properties.SlaveAnnounceIP) ipCmdLine := utils.ToCmdLine("REPLCONF", "ip-address", config.Properties.SlaveAnnounceIP)
err = sendCmdToMaster(conn, ipCmdLine, masterChan) err = sendCmdToMaster(conn, ipCmdLine, masterChan)
if err != nil { if err != nil {
return err return false, err
} }
} }
@@ -243,7 +252,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
capaCmdLine := utils.ToCmdLine("REPLCONF", "capa", "psync2") capaCmdLine := utils.ToCmdLine("REPLCONF", "capa", "psync2")
err = sendCmdToMaster(conn, capaCmdLine, masterChan) err = sendCmdToMaster(conn, capaCmdLine, masterChan)
if err != nil { if err != nil {
return err return false, err
} }
// update connection // update connection
@@ -251,43 +260,62 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error {
defer mdb.replication.mutex.Unlock() defer mdb.replication.mutex.Unlock()
if mdb.replication.configVersion != configVersion { if mdb.replication.configVersion != configVersion {
// replication conf changed during connecting and waiting mutex // replication conf changed during connecting and waiting mutex
return configChangedErr return false, configChangedErr
} }
mdb.replication.masterConn = conn mdb.replication.masterConn = conn
mdb.replication.masterChan = masterChan mdb.replication.masterChan = masterChan
mdb.replication.lastRecvTime = time.Now()
return mdb.psyncHandshake() return mdb.psyncHandshake()
} }
// psyncHandshake send `psync` to master and sync repl-id/offset with master // psyncHandshake send `psync` to master and sync repl-id/offset with master
// invoker should provide with replication.mutex // invoker should provide with replication.mutex
func (mdb *MultiDB) psyncHandshake() error { func (mdb *MultiDB) psyncHandshake() (bool, error) {
psyncCmdLine := utils.ToCmdLine("psync", "?", "-1") replId := "?"
var replOffset int64 = -1
if mdb.replication.replId != "" {
replId = mdb.replication.replId
replOffset = mdb.replication.replOffset
}
psyncCmdLine := utils.ToCmdLine("psync", replId, strconv.FormatInt(replOffset, 10))
psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine) psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
_, err := mdb.replication.masterConn.Write(psyncReq.ToBytes()) _, err := mdb.replication.masterConn.Write(psyncReq.ToBytes())
if err != nil { if err != nil {
return errors.New("send failed " + err.Error()) return false, errors.New("send failed " + err.Error())
} }
psyncPayload := <-mdb.replication.masterChan psyncPayload := <-mdb.replication.masterChan
if psyncPayload.Err != nil { if psyncPayload.Err != nil {
return errors.New("read response failed: " + psyncPayload.Err.Error()) return false, errors.New("read response failed: " + psyncPayload.Err.Error())
} }
psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply) psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply)
if !ok { if !ok {
return errors.New("illegal payload header: " + string(psyncPayload.Data.ToBytes())) return false, errors.New("illegal payload header not a status reply: " + string(psyncPayload.Data.ToBytes()))
} }
headers := strings.Split(psyncHeader.Status, " ") headers := strings.Split(psyncHeader.Status, " ")
if len(headers) != 3 { if len(headers) != 3 && len(headers) != 2 {
return errors.New("illegal payload header: " + psyncHeader.Status) return false, errors.New("illegal payload header: " + psyncHeader.Status)
} }
logger.Info("receive psync header from master") logger.Info("receive psync header from master")
var isFullReSync bool
if headers[0] == "FULLRESYNC" {
logger.Info("full re-sync with master")
mdb.replication.replId = headers[1] mdb.replication.replId = headers[1]
mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64) mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64)
if err != nil { isFullReSync = true
return errors.New("get illegal repl offset: " + headers[2]) } else if headers[0] == "CONTINUE" {
logger.Info("continue partial sync")
mdb.replication.replId = headers[1]
isFullReSync = false
} else {
return false, errors.New("illegal psync resp: " + psyncHeader.Status)
} }
logger.Info("full resync from master: " + mdb.replication.replId)
logger.Info("current offset:", mdb.replication.replOffset) if err != nil {
return nil return false, errors.New("get illegal repl offset: " + headers[2])
}
logger.Info(fmt.Sprintf("repl id: %s, current offset: %d", mdb.replication.replId, mdb.replication.replOffset))
return isFullReSync, nil
} }
// loadMasterRDB downloads rdb after handshake has been done // loadMasterRDB downloads rdb after handshake has been done
@@ -320,8 +348,6 @@ func (mdb *MultiDB) loadMasterRDB(configVersion int32) error {
mdb.loadDB(i, newDB) mdb.loadDB(i, newDB)
} }
// there is no CRLF between RDB and following AOF, reset stream to avoid parser error
mdb.replication.masterChan = parser.ParseStream(mdb.replication.masterConn)
// fixme: update aof file // fixme: update aof file
return nil return nil
} }
@@ -352,8 +378,9 @@ func (mdb *MultiDB) receiveAOF(ctx context.Context, configVersion int32) error {
mdb.Exec(conn, cmdLine.Args) mdb.Exec(conn, cmdLine.Args)
n := len(cmdLine.ToBytes()) // todo: directly get size from socket n := len(cmdLine.ToBytes()) // todo: directly get size from socket
mdb.replication.replOffset += int64(n) mdb.replication.replOffset += int64(n)
logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d", mdb.replication.lastRecvTime = time.Now()
n, mdb.replication.replOffset)) logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d, %s",
n, mdb.replication.replOffset, strconv.Quote(string(cmdLine.ToBytes()))))
mdb.replication.mutex.Unlock() mdb.replication.mutex.Unlock()
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
@@ -366,6 +393,7 @@ func (mdb *MultiDB) slaveCron() {
if repl.masterConn == nil { if repl.masterConn == nil {
return return
} }
replTimeout := 60 * time.Second replTimeout := 60 * time.Second
if config.Properties.ReplTimeout != 0 { if config.Properties.ReplTimeout != 0 {
replTimeout = time.Duration(config.Properties.ReplTimeout) * time.Second replTimeout = time.Duration(config.Properties.ReplTimeout) * time.Second
@@ -387,7 +415,7 @@ func (mdb *MultiDB) slaveCron() {
} }
// Send a REPLCONF ACK command to the master to inform it about the current processed offset // Send a REPLCONF ACK command to the master to inform it about the current processed offset
func (repl *replicationStatus) sendAck2Master() error { func (repl *slaveStatus) sendAck2Master() error {
psyncCmdLine := utils.ToCmdLine("REPLCONF", "ACK", psyncCmdLine := utils.ToCmdLine("REPLCONF", "ACK",
strconv.FormatInt(repl.replOffset, 10)) strconv.FormatInt(repl.replOffset, 10))
psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine) psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
@@ -401,6 +429,6 @@ func (mdb *MultiDB) reconnectWithMaster() error {
mdb.replication.mutex.Lock() mdb.replication.mutex.Lock()
defer mdb.replication.mutex.Unlock() defer mdb.replication.mutex.Unlock()
mdb.replication.stopSlaveWithMutex() mdb.replication.stopSlaveWithMutex()
go mdb.syncWithMaster() go mdb.setupMaster()
return nil return nil
} }

View File

@@ -79,13 +79,16 @@ func TestReplication(t *testing.T) {
t.Error(err) t.Error(err)
return return
} }
time.Sleep(3 * time.Second)
// test reconnect // test reconnect
config.Properties.ReplTimeout = 1 config.Properties.ReplTimeout = 1
_ = mdb.replication.masterConn.Close() _ = mdb.replication.masterConn.Close()
mdb.replication.lastRecvTime = time.Now().Add(-time.Hour) // mock timeout
mdb.slaveCron()
time.Sleep(3 * time.Second)
ret = masterCli.Send(utils.ToCmdLine("set", "1", "3")) ret = masterCli.Send(utils.ToCmdLine("set", "1", "3"))
asserts.AssertStatusReply(t, ret, "OK") asserts.AssertStatusReply(t, ret, "OK")
mdb.slaveCron()
success = false success = false
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
// wait for sync // wait for sync

View File

@@ -4,13 +4,13 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"errors" "errors"
"fmt"
"github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol"
"io" "io"
"runtime/debug" "runtime/debug"
"strconv" "strconv"
"strings"
) )
// Payload stores redis.Reply or error // Payload stores redis.Reply or error
@@ -75,14 +75,24 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) {
} }
length := len(line) length := len(line)
if length <= 2 || line[length-2] != '\r' { if length <= 2 || line[length-2] != '\r' {
protocolError(ch, line) // there are some empty lines within replication traffic, ignore this error
//protocolError(ch, "empty line")
continue continue
} }
line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) line = bytes.TrimSuffix(line, []byte{'\r', '\n'})
switch line[0] { switch line[0] {
case '+': case '+':
content := string(line[1:])
ch <- &Payload{ ch <- &Payload{
Data: protocol.MakeStatusReply(string(line[1:])), Data: protocol.MakeStatusReply(content),
}
if strings.HasPrefix(content, "FULLRESYNC") {
err = parseRDBBulkString(reader, ch)
if err != nil {
ch <- &Payload{Err: err}
close(ch)
return
}
} }
case '-': case '-':
ch <- &Payload{ ch <- &Payload{
@@ -91,7 +101,7 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) {
case ':': case ':':
value, err := strconv.ParseInt(string(line[1:]), 10, 64) value, err := strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil { if err != nil {
protocolError(ch, line) protocolError(ch, "illegal number "+string(line[1:]))
continue continue
} }
ch <- &Payload{ ch <- &Payload{
@@ -123,7 +133,7 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) {
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || strLen < -1 { if err != nil || strLen < -1 {
protocolError(ch, header) protocolError(ch, "illegal bulk string header: "+string(header))
return nil return nil
} else if strLen == -1 { } else if strLen == -1 {
ch <- &Payload{ ch <- &Payload{
@@ -142,10 +152,29 @@ func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) er
return nil return nil
} }
// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently
func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error {
header, err := reader.ReadBytes('\n')
header = bytes.TrimSuffix(header, []byte{'\r', '\n'})
strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || strLen <= 0 {
return errors.New("illegal bulk header: " + string(header))
}
body := make([]byte, strLen)
_, err = io.ReadFull(reader, body)
if err != nil {
return err
}
ch <- &Payload{
Data: protocol.MakeBulkReply(body[:len(body)]),
}
return nil
}
func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64) nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || nStrs < 0 { if err != nil || nStrs < 0 {
protocolError(ch, header) protocolError(ch, "illegal array header "+string(header[1:]))
return nil return nil
} else if nStrs == 0 { } else if nStrs == 0 {
ch <- &Payload{ ch <- &Payload{
@@ -162,12 +191,12 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
} }
length := len(line) length := len(line)
if length < 4 || line[length-2] != '\r' || line[0] != '$' { if length < 4 || line[length-2] != '\r' || line[0] != '$' {
protocolError(ch, line) protocolError(ch, "illegal bulk string header "+string(line))
break break
} }
strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64) strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)
if err != nil || strLen < -1 { if err != nil || strLen < -1 {
protocolError(ch, header) protocolError(ch, "illegal bulk string length "+string(line))
break break
} else if strLen == -1 { } else if strLen == -1 {
lines = append(lines, []byte{}) lines = append(lines, []byte{})
@@ -186,7 +215,7 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {
return nil return nil
} }
func protocolError(ch chan<- *Payload, msg []byte) { func protocolError(ch chan<- *Payload, msg string) {
err := errors.New(fmt.Sprintf("Protocol error: %s", string(msg))) err := errors.New("protocol error: " + msg)
ch <- &Payload{Err: err} ch <- &Payload{Err: err}
} }