Files
redis-go/database/replication_slave.go
2023-04-29 20:49:42 +08:00

462 lines
13 KiB
Go

package database
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hdt3213/godis/aof"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger"
rdb "github.com/hdt3213/rdb/parser"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/protocol"
)
const (
masterRole = iota
slaveRole
)
type slaveStatus struct {
mutex sync.Mutex
ctx context.Context
cancel context.CancelFunc
// configVersion stands for the version of slaveStatus config. Any change of master host/port will cause configVersion increment
// If configVersion change has been found during slaveStatus current slaveStatus procedure will stop.
// It is designed to abort a running slaveStatus procedure
configVersion int32
masterHost string
masterPort int
masterConn net.Conn
masterChan <-chan *parser.Payload
replId string
replOffset int64
lastRecvTime time.Time
running sync.WaitGroup
}
var configChangedErr = errors.New("slaveStatus config changed")
func initReplSlaveStatus() *slaveStatus {
return &slaveStatus{}
}
func (server *Server) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply {
if strings.ToLower(string(args[0])) == "no" &&
strings.ToLower(string(args[1])) == "one" {
server.slaveOfNone()
return protocol.MakeOkReply()
}
host := string(args[0])
port, err := strconv.Atoi(string(args[1]))
if err != nil {
return protocol.MakeErrReply("ERR value is not an integer or out of range")
}
server.slaveStatus.mutex.Lock()
atomic.StoreInt32(&server.role, slaveRole)
server.slaveStatus.masterHost = host
server.slaveStatus.masterPort = port
atomic.AddInt32(&server.slaveStatus.configVersion, 1)
server.slaveStatus.mutex.Unlock()
go server.setupMaster()
return protocol.MakeOkReply()
}
func (server *Server) slaveOfNone() {
server.slaveStatus.mutex.Lock()
defer server.slaveStatus.mutex.Unlock()
server.slaveStatus.masterHost = ""
server.slaveStatus.masterPort = 0
server.slaveStatus.replId = ""
server.slaveStatus.replOffset = -1
server.slaveStatus.stopSlaveWithMutex()
server.role = masterRole
}
// stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF
// invoker should have slaveStatus mutex
func (repl *slaveStatus) stopSlaveWithMutex() {
// update configVersion to stop connectWithMaster and fullSync
atomic.AddInt32(&repl.configVersion, 1)
// send cancel to receiveAOF
if repl.cancel != nil {
repl.cancel()
repl.running.Wait()
}
repl.ctx = context.Background()
repl.cancel = nil
if repl.masterConn != nil {
_ = repl.masterConn.Close() // parser.ParseStream will close masterChan
}
repl.masterConn = nil
repl.masterChan = nil
}
func (repl *slaveStatus) close() error {
repl.mutex.Lock()
defer repl.mutex.Unlock()
repl.stopSlaveWithMutex()
return nil
}
// setupMaster connects to master and starts full sync
func (server *Server) setupMaster() {
defer func() {
if err := recover(); err != nil {
logger.Error(err)
}
}()
var configVersion int32
ctx, cancel := context.WithCancel(context.Background())
server.slaveStatus.mutex.Lock()
server.slaveStatus.ctx = ctx
server.slaveStatus.cancel = cancel
configVersion = server.slaveStatus.configVersion
server.slaveStatus.mutex.Unlock()
isFullReSync, err := server.connectWithMaster(configVersion)
if err != nil {
// connect failed, abort master
logger.Error(err)
server.slaveOfNone()
return
}
if isFullReSync {
err = server.loadMasterRDB(configVersion)
if err != nil {
// load failed, abort master
logger.Error(err)
server.slaveOfNone()
return
}
}
err = server.receiveAOF(ctx, configVersion)
if err != nil {
// full sync failed, abort
logger.Error(err)
return
}
}
// connectWithMaster finishes handshake with master
// returns: isFullReSync, error
func (server *Server) connectWithMaster(configVersion int32) (bool, error) {
addr := server.slaveStatus.masterHost + ":" + strconv.Itoa(server.slaveStatus.masterPort)
conn, err := net.Dial("tcp", addr)
if err != nil {
server.slaveOfNone() // abort
return false, errors.New("connect master failed " + err.Error())
}
masterChan := parser.ParseStream(conn)
// ping
pingCmdLine := utils.ToCmdLine("ping")
pingReq := protocol.MakeMultiBulkReply(pingCmdLine)
_, err = conn.Write(pingReq.ToBytes())
if err != nil {
return false, errors.New("send failed " + err.Error())
}
pingResp := <-masterChan
if pingResp.Err != nil {
return false, errors.New("read response failed: " + pingResp.Err.Error())
}
switch reply := pingResp.Data.(type) {
case *protocol.StandardErrReply:
if !strings.HasPrefix(reply.Error(), "NOAUTH") &&
!strings.HasPrefix(reply.Error(), "NOPERM") &&
!strings.HasPrefix(reply.Error(), "ERR operation not permitted") {
logger.Error("Error reply to PING from master: " + string(reply.ToBytes()))
server.slaveOfNone() // abort
return false, nil
}
}
// just to reduce duplication of code
sendCmdToMaster := func(conn net.Conn, cmdLine CmdLine, masterChan <-chan *parser.Payload) error {
req := protocol.MakeMultiBulkReply(cmdLine)
_, err := conn.Write(req.ToBytes())
if err != nil {
server.slaveOfNone() // abort
return errors.New("send failed " + err.Error())
}
resp := <-masterChan
if resp.Err != nil {
server.slaveOfNone() // abort
return errors.New("read response failed: " + resp.Err.Error())
}
if !protocol.IsOKReply(resp.Data) {
server.slaveOfNone() // abort
return errors.New("unexpected auth response: " + string(resp.Data.ToBytes()))
}
return nil
}
// auth
if config.Properties.MasterAuth != "" {
authCmdLine := utils.ToCmdLine("auth", config.Properties.MasterAuth)
err = sendCmdToMaster(conn, authCmdLine, masterChan)
if err != nil {
return false, err
}
}
// announce port
var port int
if config.Properties.SlaveAnnouncePort != 0 {
port = config.Properties.SlaveAnnouncePort
} else {
port = config.Properties.Port
}
portCmdLine := utils.ToCmdLine("REPLCONF", "listening-port", strconv.Itoa(port))
err = sendCmdToMaster(conn, portCmdLine, masterChan)
if err != nil {
return false, err
}
// announce ip
if config.Properties.SlaveAnnounceIP != "" {
ipCmdLine := utils.ToCmdLine("REPLCONF", "ip-address", config.Properties.SlaveAnnounceIP)
err = sendCmdToMaster(conn, ipCmdLine, masterChan)
if err != nil {
return false, err
}
}
// announce capacity
capaCmdLine := utils.ToCmdLine("REPLCONF", "capa", "psync2")
err = sendCmdToMaster(conn, capaCmdLine, masterChan)
if err != nil {
return false, err
}
// update connection
server.slaveStatus.mutex.Lock()
defer server.slaveStatus.mutex.Unlock()
if server.slaveStatus.configVersion != configVersion {
// slaveStatus conf changed during connecting and waiting mutex
return false, configChangedErr
}
server.slaveStatus.masterConn = conn
server.slaveStatus.masterChan = masterChan
server.slaveStatus.lastRecvTime = time.Now()
return server.psyncHandshake()
}
// psyncHandshake send `psync` to master and sync repl-id/offset with master
// invoker should provide with slaveStatus.mutex
func (server *Server) psyncHandshake() (bool, error) {
replId := "?"
var replOffset int64 = -1
if server.slaveStatus.replId != "" {
replId = server.slaveStatus.replId
replOffset = server.slaveStatus.replOffset
}
psyncCmdLine := utils.ToCmdLine("psync", replId, strconv.FormatInt(replOffset, 10))
psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
_, err := server.slaveStatus.masterConn.Write(psyncReq.ToBytes())
if err != nil {
return false, errors.New("send failed " + err.Error())
}
psyncPayload := <-server.slaveStatus.masterChan
if psyncPayload.Err != nil {
return false, errors.New("read response failed: " + psyncPayload.Err.Error())
}
psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply)
if !ok {
return false, errors.New("illegal payload header not a status reply: " + string(psyncPayload.Data.ToBytes()))
}
headers := strings.Split(psyncHeader.Status, " ")
if len(headers) != 3 && len(headers) != 2 {
return false, errors.New("illegal payload header: " + psyncHeader.Status)
}
logger.Info("receive psync header from master")
var isFullReSync bool
if headers[0] == "FULLRESYNC" {
logger.Info("full re-sync with master")
server.slaveStatus.replId = headers[1]
server.slaveStatus.replOffset, err = strconv.ParseInt(headers[2], 10, 64)
isFullReSync = true
} else if headers[0] == "CONTINUE" {
logger.Info("continue partial sync")
server.slaveStatus.replId = headers[1]
isFullReSync = false
} else {
return false, errors.New("illegal psync resp: " + psyncHeader.Status)
}
if err != nil {
return false, errors.New("get illegal repl offset: " + headers[2])
}
logger.Info(fmt.Sprintf("repl id: %s, current offset: %d", server.slaveStatus.replId, server.slaveStatus.replOffset))
return isFullReSync, nil
}
func makeRdbLoader(upgradeAof bool) (*Server, string, error) {
rdbLoader := MakeAuxiliaryServer()
if !upgradeAof {
return rdbLoader, "", nil
}
// make aof handler to generate new aof file during loading rdb
newAofFile, err := ioutil.TempFile("", "*.aof")
if err != nil {
return nil, "", fmt.Errorf("create temp rdb failed: %v", err)
}
newAofFilename := newAofFile.Name()
aofHandler, err := NewPersister(rdbLoader, newAofFilename, false, aof.FsyncNo)
if err != nil {
return nil, "", err
}
rdbLoader.bindPersister(aofHandler)
return rdbLoader, newAofFilename, nil
}
// loadMasterRDB downloads rdb after handshake has been done
func (server *Server) loadMasterRDB(configVersion int32) error {
rdbPayload := <-server.slaveStatus.masterChan
if rdbPayload.Err != nil {
return errors.New("read response failed: " + rdbPayload.Err.Error())
}
rdbReply, ok := rdbPayload.Data.(*protocol.BulkReply)
if !ok {
return errors.New("illegal payload header: " + string(rdbPayload.Data.ToBytes()))
}
logger.Info(fmt.Sprintf("receive %d bytes of rdb from master", len(rdbReply.Arg)))
rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg))
rdbLoader, newAofFilename, err := makeRdbLoader(config.Properties.AppendOnly)
if err != nil {
return err
}
err = rdbLoader.LoadRDB(rdbDec)
if err != nil {
return errors.New("dump rdb failed: " + err.Error())
}
server.slaveStatus.mutex.Lock()
defer server.slaveStatus.mutex.Unlock()
if server.slaveStatus.configVersion != configVersion {
// slaveStatus conf changed during connecting and waiting mutex
return configChangedErr
}
for i, h := range rdbLoader.dbSet {
newDB := h.Load().(*DB)
server.loadDB(i, newDB)
}
if config.Properties.AppendOnly {
// use new aof file
server.persister.Close()
err = os.Rename(newAofFilename, config.Properties.AppendFilename)
if err != nil {
return err
}
persister, err := NewPersister(server, config.Properties.AppendFilename, false, config.Properties.AppendFsync)
if err != nil {
return err
}
server.bindPersister(persister)
}
return nil
}
func (server *Server) receiveAOF(ctx context.Context, configVersion int32) error {
conn := connection.NewConn(server.slaveStatus.masterConn)
conn.SetMaster()
server.slaveStatus.running.Add(1)
defer server.slaveStatus.running.Done()
for {
select {
case payload, open := <-server.slaveStatus.masterChan:
if !open {
return errors.New("master channel unexpected close")
}
if payload.Err != nil {
return payload.Err
}
cmdLine, ok := payload.Data.(*protocol.MultiBulkReply)
if !ok {
return errors.New("unexpected payload: " + string(payload.Data.ToBytes()))
}
server.slaveStatus.mutex.Lock()
if server.slaveStatus.configVersion != configVersion {
// slaveStatus conf changed during connecting and waiting mutex
return configChangedErr
}
server.Exec(conn, cmdLine.Args)
n := len(cmdLine.ToBytes()) // todo: directly get size from socket
server.slaveStatus.replOffset += int64(n)
server.slaveStatus.lastRecvTime = time.Now()
logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d, %s",
n, server.slaveStatus.replOffset, strconv.Quote(string(cmdLine.ToBytes()))))
server.slaveStatus.mutex.Unlock()
case <-ctx.Done():
_ = conn.Close()
return nil
}
}
}
func (server *Server) slaveCron() {
repl := server.slaveStatus
if repl.masterConn == nil {
return
}
// check master timeout
replTimeout := 60 * time.Second
if config.Properties.ReplTimeout != 0 {
replTimeout = time.Duration(config.Properties.ReplTimeout) * time.Second
}
minLastRecvTime := time.Now().Add(-replTimeout)
if repl.lastRecvTime.Before(minLastRecvTime) {
// reconnect with master
err := server.reconnectWithMaster()
if err != nil {
logger.Error("send failed " + err.Error())
}
return
}
// send ack to master
err := repl.sendAck2Master()
if err != nil {
logger.Error("send failed " + err.Error())
}
}
// Send a REPLCONF ACK command to the master to inform it about the current processed offset
func (repl *slaveStatus) sendAck2Master() error {
psyncCmdLine := utils.ToCmdLine("REPLCONF", "ACK",
strconv.FormatInt(repl.replOffset, 10))
psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine)
_, err := repl.masterConn.Write(psyncReq.ToBytes())
// logger.Info("send ack to master")
return err
}
func (server *Server) reconnectWithMaster() error {
logger.Info("reconnecting with master")
server.slaveStatus.mutex.Lock()
defer server.slaveStatus.mutex.Unlock()
server.slaveStatus.stopSlaveWithMutex()
go server.setupMaster()
return nil
}