diff --git a/database/database.go b/database/database.go index 0e0a7d5..936661a 100644 --- a/database/database.go +++ b/database/database.go @@ -30,7 +30,7 @@ type MultiDB struct { // store master node address slaveOf string role int32 - replication *replicationStatus + replication *slaveStatus } // NewStandaloneServer creates a standalone redis server, with multi database and all other funtions diff --git a/database/replication.go b/database/replication.go index a0a2a38..d811ba7 100644 --- a/database/replication.go +++ b/database/replication.go @@ -26,7 +26,7 @@ const ( slaveRole ) -type replicationStatus struct { +type slaveStatus struct { mutex sync.Mutex ctx context.Context cancel context.CancelFunc @@ -49,8 +49,8 @@ type replicationStatus struct { var configChangedErr = errors.New("replication config changed") -func initReplStatus() *replicationStatus { - repl := &replicationStatus{} +func initReplStatus() *slaveStatus { + repl := &slaveStatus{} // start cron return repl } @@ -87,7 +87,7 @@ func (mdb *MultiDB) execSlaveOf(c redis.Connection, args [][]byte) redis.Reply { // use buffered channel in case receiver goroutine exited before controller send stop signal atomic.AddInt32(&mdb.replication.configVersion, 1) mdb.replication.mutex.Unlock() - go mdb.syncWithMaster() + go mdb.setupMaster() return protocol.MakeOkReply() } @@ -103,7 +103,7 @@ func (mdb *MultiDB) slaveOfNone() { // stopSlaveWithMutex stops in-progress connectWithMaster/fullSync/receiveAOF // invoker should have replication mutex -func (repl *replicationStatus) stopSlaveWithMutex() { +func (repl *slaveStatus) stopSlaveWithMutex() { // update configVersion to stop connectWithMaster and fullSync atomic.AddInt32(&repl.configVersion, 1) // send cancel to receiveAOF @@ -120,14 +120,14 @@ func (repl *replicationStatus) stopSlaveWithMutex() { repl.masterChan = nil } -func (repl *replicationStatus) close() error { +func (repl *slaveStatus) close() error { repl.mutex.Lock() defer repl.mutex.Unlock() repl.stopSlaveWithMutex() return nil } -func (mdb *MultiDB) syncWithMaster() { +func (mdb *MultiDB) setupMaster() { defer func() { if err := recover(); err != nil { logger.Error(err) @@ -140,29 +140,38 @@ func (mdb *MultiDB) syncWithMaster() { mdb.replication.cancel = cancel configVersion = mdb.replication.configVersion mdb.replication.mutex.Unlock() - err := mdb.connectWithMaster(configVersion) + isFullReSync, err := mdb.connectWithMaster(configVersion) if err != nil { - // full sync failed, abort + // connect failed, abort master + logger.Error(err) + mdb.slaveOfNone() return } - err = mdb.loadMasterRDB(configVersion) - if err != nil { - return + if isFullReSync { + err = mdb.loadMasterRDB(configVersion) + if err != nil { + // load failed, abort master + logger.Error(err) + mdb.slaveOfNone() + return + } } err = mdb.receiveAOF(ctx, configVersion) if err != nil { // full sync failed, abort + logger.Error(err) return } } // connectWithMaster finishes handshake with master -func (mdb *MultiDB) connectWithMaster(configVersion int32) error { +// returns: isFullReSync, error +func (mdb *MultiDB) connectWithMaster(configVersion int32) (bool, error) { addr := mdb.replication.masterHost + ":" + strconv.Itoa(mdb.replication.masterPort) conn, err := net.Dial("tcp", addr) if err != nil { mdb.slaveOfNone() // abort - return errors.New("connect master failed " + err.Error()) + return false, errors.New("connect master failed " + err.Error()) } masterChan := parser.ParseStream(conn) @@ -171,11 +180,11 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { pingReq := protocol.MakeMultiBulkReply(pingCmdLine) _, err = conn.Write(pingReq.ToBytes()) if err != nil { - return errors.New("send failed " + err.Error()) + return false, errors.New("send failed " + err.Error()) } pingResp := <-masterChan if pingResp.Err != nil { - return errors.New("read response failed: " + pingResp.Err.Error()) + return false, errors.New("read response failed: " + pingResp.Err.Error()) } switch reply := pingResp.Data.(type) { case *protocol.StandardErrReply: @@ -184,7 +193,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { !strings.HasPrefix(reply.Error(), "ERR operation not permitted") { logger.Error("Error reply to PING from master: " + string(reply.ToBytes())) mdb.slaveOfNone() // abort - return nil + return false, nil } } @@ -213,7 +222,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { authCmdLine := utils.ToCmdLine("auth", config.Properties.MasterAuth) err = sendCmdToMaster(conn, authCmdLine, masterChan) if err != nil { - return err + return false, err } } @@ -227,7 +236,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { portCmdLine := utils.ToCmdLine("REPLCONF", "listening-port", strconv.Itoa(port)) err = sendCmdToMaster(conn, portCmdLine, masterChan) if err != nil { - return err + return false, err } // announce ip @@ -235,7 +244,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { ipCmdLine := utils.ToCmdLine("REPLCONF", "ip-address", config.Properties.SlaveAnnounceIP) err = sendCmdToMaster(conn, ipCmdLine, masterChan) if err != nil { - return err + return false, err } } @@ -243,7 +252,7 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { capaCmdLine := utils.ToCmdLine("REPLCONF", "capa", "psync2") err = sendCmdToMaster(conn, capaCmdLine, masterChan) if err != nil { - return err + return false, err } // update connection @@ -251,43 +260,62 @@ func (mdb *MultiDB) connectWithMaster(configVersion int32) error { defer mdb.replication.mutex.Unlock() if mdb.replication.configVersion != configVersion { // replication conf changed during connecting and waiting mutex - return configChangedErr + return false, configChangedErr } mdb.replication.masterConn = conn mdb.replication.masterChan = masterChan + mdb.replication.lastRecvTime = time.Now() return mdb.psyncHandshake() } // psyncHandshake send `psync` to master and sync repl-id/offset with master // invoker should provide with replication.mutex -func (mdb *MultiDB) psyncHandshake() error { - psyncCmdLine := utils.ToCmdLine("psync", "?", "-1") +func (mdb *MultiDB) psyncHandshake() (bool, error) { + replId := "?" + var replOffset int64 = -1 + if mdb.replication.replId != "" { + replId = mdb.replication.replId + replOffset = mdb.replication.replOffset + } + psyncCmdLine := utils.ToCmdLine("psync", replId, strconv.FormatInt(replOffset, 10)) psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine) _, err := mdb.replication.masterConn.Write(psyncReq.ToBytes()) if err != nil { - return errors.New("send failed " + err.Error()) + return false, errors.New("send failed " + err.Error()) } psyncPayload := <-mdb.replication.masterChan if psyncPayload.Err != nil { - return errors.New("read response failed: " + psyncPayload.Err.Error()) + return false, errors.New("read response failed: " + psyncPayload.Err.Error()) } psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply) if !ok { - return errors.New("illegal payload header: " + string(psyncPayload.Data.ToBytes())) + return false, errors.New("illegal payload header not a status reply: " + string(psyncPayload.Data.ToBytes())) } headers := strings.Split(psyncHeader.Status, " ") - if len(headers) != 3 { - return errors.New("illegal payload header: " + psyncHeader.Status) + if len(headers) != 3 && len(headers) != 2 { + return false, errors.New("illegal payload header: " + psyncHeader.Status) } + logger.Info("receive psync header from master") - mdb.replication.replId = headers[1] - mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64) - if err != nil { - return errors.New("get illegal repl offset: " + headers[2]) + var isFullReSync bool + if headers[0] == "FULLRESYNC" { + logger.Info("full re-sync with master") + mdb.replication.replId = headers[1] + mdb.replication.replOffset, err = strconv.ParseInt(headers[2], 10, 64) + isFullReSync = true + } else if headers[0] == "CONTINUE" { + logger.Info("continue partial sync") + mdb.replication.replId = headers[1] + isFullReSync = false + } else { + return false, errors.New("illegal psync resp: " + psyncHeader.Status) } - logger.Info("full resync from master: " + mdb.replication.replId) - logger.Info("current offset:", mdb.replication.replOffset) - return nil + + if err != nil { + return false, errors.New("get illegal repl offset: " + headers[2]) + } + logger.Info(fmt.Sprintf("repl id: %s, current offset: %d", mdb.replication.replId, mdb.replication.replOffset)) + return isFullReSync, nil } // loadMasterRDB downloads rdb after handshake has been done @@ -320,8 +348,6 @@ func (mdb *MultiDB) loadMasterRDB(configVersion int32) error { mdb.loadDB(i, newDB) } - // there is no CRLF between RDB and following AOF, reset stream to avoid parser error - mdb.replication.masterChan = parser.ParseStream(mdb.replication.masterConn) // fixme: update aof file return nil } @@ -352,8 +378,9 @@ func (mdb *MultiDB) receiveAOF(ctx context.Context, configVersion int32) error { mdb.Exec(conn, cmdLine.Args) n := len(cmdLine.ToBytes()) // todo: directly get size from socket mdb.replication.replOffset += int64(n) - logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d", - n, mdb.replication.replOffset)) + mdb.replication.lastRecvTime = time.Now() + logger.Info(fmt.Sprintf("receive %d bytes from master, current offset %d, %s", + n, mdb.replication.replOffset, strconv.Quote(string(cmdLine.ToBytes())))) mdb.replication.mutex.Unlock() case <-ctx.Done(): return nil @@ -366,6 +393,7 @@ func (mdb *MultiDB) slaveCron() { if repl.masterConn == nil { return } + replTimeout := 60 * time.Second if config.Properties.ReplTimeout != 0 { replTimeout = time.Duration(config.Properties.ReplTimeout) * time.Second @@ -387,7 +415,7 @@ func (mdb *MultiDB) slaveCron() { } // Send a REPLCONF ACK command to the master to inform it about the current processed offset -func (repl *replicationStatus) sendAck2Master() error { +func (repl *slaveStatus) sendAck2Master() error { psyncCmdLine := utils.ToCmdLine("REPLCONF", "ACK", strconv.FormatInt(repl.replOffset, 10)) psyncReq := protocol.MakeMultiBulkReply(psyncCmdLine) @@ -401,6 +429,6 @@ func (mdb *MultiDB) reconnectWithMaster() error { mdb.replication.mutex.Lock() defer mdb.replication.mutex.Unlock() mdb.replication.stopSlaveWithMutex() - go mdb.syncWithMaster() + go mdb.setupMaster() return nil } diff --git a/database/replication_test.go b/database/replication_test.go index 537631b..d37fb05 100644 --- a/database/replication_test.go +++ b/database/replication_test.go @@ -79,13 +79,16 @@ func TestReplication(t *testing.T) { t.Error(err) return } + time.Sleep(3 * time.Second) // test reconnect config.Properties.ReplTimeout = 1 _ = mdb.replication.masterConn.Close() + mdb.replication.lastRecvTime = time.Now().Add(-time.Hour) // mock timeout + mdb.slaveCron() + time.Sleep(3 * time.Second) ret = masterCli.Send(utils.ToCmdLine("set", "1", "3")) asserts.AssertStatusReply(t, ret, "OK") - mdb.slaveCron() success = false for i := 0; i < 10; i++ { // wait for sync diff --git a/redis/parser/parser.go b/redis/parser/parser.go index 424edcb..a5702af 100644 --- a/redis/parser/parser.go +++ b/redis/parser/parser.go @@ -4,13 +4,13 @@ import ( "bufio" "bytes" "errors" - "fmt" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/redis/protocol" "io" "runtime/debug" "strconv" + "strings" ) // Payload stores redis.Reply or error @@ -75,14 +75,24 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { } length := len(line) if length <= 2 || line[length-2] != '\r' { - protocolError(ch, line) + // there are some empty lines within replication traffic, ignore this error + //protocolError(ch, "empty line") continue } line = bytes.TrimSuffix(line, []byte{'\r', '\n'}) switch line[0] { case '+': + content := string(line[1:]) ch <- &Payload{ - Data: protocol.MakeStatusReply(string(line[1:])), + Data: protocol.MakeStatusReply(content), + } + if strings.HasPrefix(content, "FULLRESYNC") { + err = parseRDBBulkString(reader, ch) + if err != nil { + ch <- &Payload{Err: err} + close(ch) + return + } } case '-': ch <- &Payload{ @@ -91,7 +101,7 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { case ':': value, err := strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { - protocolError(ch, line) + protocolError(ch, "illegal number "+string(line[1:])) continue } ch <- &Payload{ @@ -123,7 +133,7 @@ func parse0(rawReader io.Reader, ch chan<- *Payload) { func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || strLen < -1 { - protocolError(ch, header) + protocolError(ch, "illegal bulk string header: "+string(header)) return nil } else if strLen == -1 { ch <- &Payload{ @@ -142,10 +152,29 @@ func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) er return nil } +// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently +func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { + header, err := reader.ReadBytes('\n') + header = bytes.TrimSuffix(header, []byte{'\r', '\n'}) + strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) + if err != nil || strLen <= 0 { + return errors.New("illegal bulk header: " + string(header)) + } + body := make([]byte, strLen) + _, err = io.ReadFull(reader, body) + if err != nil { + return err + } + ch <- &Payload{ + Data: protocol.MakeBulkReply(body[:len(body)]), + } + return nil +} + func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64) if err != nil || nStrs < 0 { - protocolError(ch, header) + protocolError(ch, "illegal array header "+string(header[1:])) return nil } else if nStrs == 0 { ch <- &Payload{ @@ -162,12 +191,12 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { } length := len(line) if length < 4 || line[length-2] != '\r' || line[0] != '$' { - protocolError(ch, line) + protocolError(ch, "illegal bulk string header "+string(line)) break } strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64) if err != nil || strLen < -1 { - protocolError(ch, header) + protocolError(ch, "illegal bulk string length "+string(line)) break } else if strLen == -1 { lines = append(lines, []byte{}) @@ -186,7 +215,7 @@ func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error { return nil } -func protocolError(ch chan<- *Payload, msg []byte) { - err := errors.New(fmt.Sprintf("Protocol error: %s", string(msg))) +func protocolError(ch chan<- *Payload, msg string) { + err := errors.New("protocol error: " + msg) ch <- &Payload{Err: err} }