rewrite backlog

This commit is contained in:
finley
2022-12-04 02:17:22 +08:00
parent ba7ea942cb
commit a5908f5ea4
8 changed files with 323 additions and 90 deletions

View File

@@ -26,9 +26,12 @@ type payload struct {
dbIndex int dbIndex int
} }
// Listener is a channel receive a replication of all aof payloads // Listener will be called-back after receiving a aof payload
// with a listener we can forward the updates to slave nodes etc. // with a listener we can forward the updates to slave nodes etc.
type Listener chan<- CmdLine type Listener interface {
// Callback will be called-back after receiving a aof payload
Callback([]CmdLine)
}
// Handler receive msgs from channel and write to AOF file // Handler receive msgs from channel and write to AOF file
type Handler struct { type Handler struct {
@@ -112,9 +115,7 @@ func (handler *Handler) handleAof() {
} }
handler.pausingAof.RUnlock() handler.pausingAof.RUnlock()
for listener := range handler.listeners { for listener := range handler.listeners {
for _, line := range cmdLines { listener.Callback(cmdLines)
listener <- line
}
} }
} }
handler.aofFinished <- struct{}{} handler.aofFinished <- struct{}{}

View File

@@ -19,9 +19,8 @@ import (
// todo: forbid concurrent rewrite // todo: forbid concurrent rewrite
// Rewrite2RDB rewrite aof data into rdb // Rewrite2RDB rewrite aof data into rdb
// if extraListener is not nil, it will be appended to Handler.listeners, it will receive all updates after rdb func (handler *Handler) Rewrite2RDB(rdbFilename string) error {
func (handler *Handler) Rewrite2RDB(rdbFilename string, extraListener Listener) error { ctx, err := handler.startRewrite2RDB(nil, nil)
ctx, err := handler.startRewrite2RDB(extraListener)
if err != nil { if err != nil {
return err return err
} }
@@ -40,7 +39,30 @@ func (handler *Handler) Rewrite2RDB(rdbFilename string, extraListener Listener)
return nil return nil
} }
func (handler *Handler) startRewrite2RDB(extraListener Listener) (*RewriteCtx, error) { // Rewrite2RDBForReplication asynchronously rewrite aof data into rdb and returns a channel to receive following data
// parameter listener would receive following updates of rdb
// parameter hook allows you to do something during aof pausing
func (handler *Handler) Rewrite2RDBForReplication(rdbFilename string, listener Listener, hook func()) error {
ctx, err := handler.startRewrite2RDB(listener, hook)
if err != nil {
return err
}
err = handler.rewrite2RDB(ctx)
if err != nil {
return err
}
err = ctx.tmpFile.Close()
if err != nil {
return err
}
err = os.Rename(ctx.tmpFile.Name(), rdbFilename)
if err != nil {
return err
}
return nil
}
func (handler *Handler) startRewrite2RDB(newListener Listener, hook func()) (*RewriteCtx, error) {
handler.pausingAof.Lock() // pausing aof handler.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock() defer handler.pausingAof.Unlock()
@@ -59,8 +81,11 @@ func (handler *Handler) startRewrite2RDB(extraListener Listener) (*RewriteCtx, e
logger.Warn("tmp file create failed") logger.Warn("tmp file create failed")
return nil, err return nil, err
} }
if extraListener != nil { if newListener != nil {
handler.listeners[extraListener] = struct{}{} handler.listeners[newListener] = struct{}{}
}
if hook != nil {
hook()
} }
return &RewriteCtx{ return &RewriteCtx{
tmpFile: file, tmpFile: file,

View File

@@ -319,7 +319,7 @@ func SaveRDB(db *MultiDB, args [][]byte) redis.Reply {
if rdbFilename == "" { if rdbFilename == "" {
rdbFilename = "dump.rdb" rdbFilename = "dump.rdb"
} }
err := db.aofHandler.Rewrite2RDB(rdbFilename, nil) err := db.aofHandler.Rewrite2RDB(rdbFilename)
if err != nil { if err != nil {
return protocol.MakeErrReply(err.Error()) return protocol.MakeErrReply(err.Error())
} }
@@ -341,7 +341,7 @@ func BGSaveRDB(db *MultiDB, args [][]byte) redis.Reply {
if rdbFilename == "" { if rdbFilename == "" {
rdbFilename = "dump.rdb" rdbFilename = "dump.rdb"
} }
err := db.aofHandler.Rewrite2RDB(rdbFilename, nil) err := db.aofHandler.Rewrite2RDB(rdbFilename)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
} }

View File

@@ -1,11 +1,11 @@
package database package database
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/sync/atomic"
"github.com/hdt3213/godis/lib/utils" "github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol" "github.com/hdt3213/godis/redis/protocol"
"io" "io"
@@ -47,23 +47,42 @@ type slaveClient struct {
capacity uint8 capacity uint8
} }
type masterStatus struct { // aofListener 只负责更新 backlog
ctx context.Context type replBacklog struct {
mu sync.RWMutex buf []byte
replId string
backlog []byte // backlog can be appended or replaced as a whole, cannot be modified(insert/set/delete)
beginOffset int64 beginOffset int64
currentOffset int64 currentOffset int64
slaveMap map[redis.Connection]*slaveClient
waitSlaves map[*slaveClient]struct{}
onlineSlaves map[*slaveClient]struct{}
bgSaveState uint8
rdbFilename string
} }
func (master *masterStatus) appendBacklog(bin []byte) { func (backlog *replBacklog) appendBytes(bin []byte) {
master.backlog = append(master.backlog, bin...) backlog.buf = append(backlog.buf, bin...)
master.currentOffset += int64(len(bin)) backlog.currentOffset += int64(len(bin))
}
func (backlog *replBacklog) getSnapshot() ([]byte, int64) {
return backlog.buf[:], backlog.currentOffset
}
func (backlog *replBacklog) getSnapshotAfter(beginOffset int64) ([]byte, int64) {
beg := beginOffset - backlog.beginOffset
return backlog.buf[beg:], backlog.currentOffset
}
func (backlog *replBacklog) isValidOffset(offset int64) bool {
return offset >= backlog.beginOffset && offset < backlog.currentOffset
}
type masterStatus struct {
mu sync.RWMutex
replId string
backlog *replBacklog
slaveMap map[redis.Connection]*slaveClient
waitSlaves map[*slaveClient]struct{}
onlineSlaves map[*slaveClient]struct{}
bgSaveState uint8
rdbFilename string
aofListener *replAofListener
rewriting atomic.Boolean
} }
func (mdb *MultiDB) bgSaveForReplication() { func (mdb *MultiDB) bgSaveForReplication() {
@@ -90,16 +109,18 @@ func (mdb *MultiDB) saveForReplication() error {
mdb.masterStatus.mu.Lock() mdb.masterStatus.mu.Lock()
mdb.masterStatus.bgSaveState = bgSaveRunning mdb.masterStatus.bgSaveState = bgSaveRunning
mdb.masterStatus.rdbFilename = rdbFilename // todo: can reuse config.Properties.RDBFilename? mdb.masterStatus.rdbFilename = rdbFilename // todo: can reuse config.Properties.RDBFilename?
aofListener := &replAofListener{
mdb: mdb,
backlog: mdb.masterStatus.backlog,
}
mdb.masterStatus.aofListener = aofListener
mdb.masterStatus.mu.Unlock() mdb.masterStatus.mu.Unlock()
aofListener := make(chan CmdLine, 1024) // give channel enough capacity to store all updates during rewrite to db err = mdb.aofHandler.Rewrite2RDBForReplication(rdbFilename, aofListener, nil)
err = mdb.aofHandler.Rewrite2RDB(rdbFilename, aofListener)
if err != nil { if err != nil {
return err return err
} }
go func() { aofListener.readyToSend = true
mdb.masterListenAof(aofListener)
}()
// change bgSaveState and get waitSlaves for sending // change bgSaveState and get waitSlaves for sending
waitSlaves := make(map[*slaveClient]struct{}) waitSlaves := make(map[*slaveClient]struct{})
@@ -122,11 +143,43 @@ func (mdb *MultiDB) saveForReplication() error {
return nil return nil
} }
func (mdb *MultiDB) rewriteRDB() error {
rdbFile, err := ioutil.TempFile("", "*.rdb")
if err != nil {
return fmt.Errorf("create temp rdb failed: %v", err)
}
rdbFilename := rdbFile.Name()
newBacklog := &replBacklog{}
aofListener := &replAofListener{
backlog: newBacklog,
mdb: mdb,
}
hook := func() {
// pausing aof first, then lock masterStatus.
// use the same order as replAofListener to avoid dead lock
mdb.masterStatus.mu.Lock()
defer mdb.masterStatus.mu.Unlock()
newBacklog.beginOffset = mdb.masterStatus.backlog.currentOffset
}
err = mdb.aofHandler.Rewrite2RDBForReplication(rdbFilename, aofListener, hook)
if err != nil { // wait rdb result
return err
}
mdb.masterStatus.mu.Lock()
mdb.masterStatus.rdbFilename = rdbFilename
mdb.masterStatus.backlog = newBacklog
mdb.masterStatus.mu.Unlock()
// It is ok to know that new backlog is ready later, so we change readyToSend without sync
// But setting readyToSend=true must after new backlog is really ready (that means master.mu.Unlock)
aofListener.readyToSend = true
return nil
}
// masterFullReSyncWithSlave send replication header, rdb file and all backlogs to slave // masterFullReSyncWithSlave send replication header, rdb file and all backlogs to slave
func (mdb *MultiDB) masterFullReSyncWithSlave(slave *slaveClient) error { func (mdb *MultiDB) masterFullReSyncWithSlave(slave *slaveClient) error {
// write replication header // write replication header
header := "+FULLRESYNC " + mdb.masterStatus.replId + " " + header := "+FULLRESYNC " + mdb.masterStatus.replId + " " +
strconv.FormatInt(mdb.masterStatus.beginOffset, 10) + protocol.CRLF strconv.FormatInt(mdb.masterStatus.backlog.beginOffset, 10) + protocol.CRLF
_, err := slave.conn.Write([]byte(header)) _, err := slave.conn.Write([]byte(header))
if err != nil { if err != nil {
return fmt.Errorf("write replication header to slave failed: %v", err) return fmt.Errorf("write replication header to slave failed: %v", err)
@@ -151,8 +204,7 @@ func (mdb *MultiDB) masterFullReSyncWithSlave(slave *slaveClient) error {
// send backlog // send backlog
mdb.masterStatus.mu.RLock() mdb.masterStatus.mu.RLock()
currentOffset := mdb.masterStatus.currentOffset backlog, currentOffset := mdb.masterStatus.backlog.getSnapshot()
backlog := mdb.masterStatus.backlog[:currentOffset-mdb.masterStatus.beginOffset]
mdb.masterStatus.mu.RUnlock() mdb.masterStatus.mu.RUnlock()
_, err = slave.conn.Write(backlog) _, err = slave.conn.Write(backlog)
if err != nil { if err != nil {
@@ -172,12 +224,11 @@ func (mdb *MultiDB) masterTryPartialSyncWithSlave(slave *slaveClient, replId str
mdb.masterStatus.mu.RUnlock() mdb.masterStatus.mu.RUnlock()
return cannotPartialSync return cannotPartialSync
} }
if slaveOffset < mdb.masterStatus.beginOffset || slaveOffset > mdb.masterStatus.currentOffset { if !mdb.masterStatus.backlog.isValidOffset(slaveOffset) {
mdb.masterStatus.mu.RUnlock() mdb.masterStatus.mu.RUnlock()
return cannotPartialSync return cannotPartialSync
} }
currentOffset := mdb.masterStatus.currentOffset backlog, currentOffset := mdb.masterStatus.backlog.getSnapshotAfter(slaveOffset)
backlog := mdb.masterStatus.backlog[slaveOffset-mdb.masterStatus.beginOffset : currentOffset-mdb.masterStatus.beginOffset]
mdb.masterStatus.mu.RUnlock() mdb.masterStatus.mu.RUnlock()
// send replication header // send replication header
@@ -197,12 +248,13 @@ func (mdb *MultiDB) masterTryPartialSyncWithSlave(slave *slaveClient, replId str
return nil return nil
} }
// masterSendUpdatesToSlave only sends data to online slaves after bgSave is finished
// if bgSave is running, updates will be sent after the saving finished
func (mdb *MultiDB) masterSendUpdatesToSlave() error { func (mdb *MultiDB) masterSendUpdatesToSlave() error {
onlineSlaves := make(map[*slaveClient]struct{}) onlineSlaves := make(map[*slaveClient]struct{})
mdb.masterStatus.mu.RLock() mdb.masterStatus.mu.RLock()
currentOffset := mdb.masterStatus.currentOffset beginOffset := mdb.masterStatus.backlog.beginOffset
beginOffset := mdb.masterStatus.beginOffset backlog, currentOffset := mdb.masterStatus.backlog.getSnapshot()
backlog := mdb.masterStatus.backlog[:currentOffset-beginOffset]
for slave := range mdb.masterStatus.onlineSlaves { for slave := range mdb.masterStatus.onlineSlaves {
onlineSlaves[slave] = struct{}{} onlineSlaves[slave] = struct{}{}
} }
@@ -211,7 +263,7 @@ func (mdb *MultiDB) masterSendUpdatesToSlave() error {
slaveBeginOffset := slave.offset - beginOffset slaveBeginOffset := slave.offset - beginOffset
_, err := slave.conn.Write(backlog[slaveBeginOffset:]) _, err := slave.conn.Write(backlog[slaveBeginOffset:])
if err != nil { if err != nil {
logger.Errorf("send updates write backlog to slave failed: %v", err) logger.Errorf("send updates backlog to slave failed: %v", err)
mdb.removeSlave(slave) mdb.removeSlave(slave)
continue continue
} }
@@ -313,50 +365,64 @@ func (mdb *MultiDB) setSlaveOnline(slave *slaveClient, currentOffset int64) {
var pingBytes = protocol.MakeMultiBulkReply(utils.ToCmdLine("ping")).ToBytes() var pingBytes = protocol.MakeMultiBulkReply(utils.ToCmdLine("ping")).ToBytes()
const maxBacklogSize = 10 * 1024 * 1024 // 10MB
func (mdb *MultiDB) masterCron() { func (mdb *MultiDB) masterCron() {
if mdb.role != masterRole { if mdb.role != masterRole {
return return
} }
mdb.masterStatus.mu.Lock() mdb.masterStatus.mu.Lock()
if mdb.masterStatus.bgSaveState == bgSaveFinish { if mdb.masterStatus.bgSaveState == bgSaveFinish {
mdb.masterStatus.appendBacklog(pingBytes) mdb.masterStatus.backlog.appendBytes(pingBytes)
} }
backlogSize := len(mdb.masterStatus.backlog.buf)
mdb.masterStatus.mu.Unlock() mdb.masterStatus.mu.Unlock()
if err := mdb.masterSendUpdatesToSlave(); err != nil { if err := mdb.masterSendUpdatesToSlave(); err != nil {
logger.Errorf("masterSendUpdatesToSlave error: %v", err) logger.Errorf("masterSendUpdatesToSlave error: %v", err)
} }
if backlogSize > maxBacklogSize && !mdb.masterStatus.rewriting.Get() {
go func() {
mdb.masterStatus.rewriting.Set(true)
defer mdb.masterStatus.rewriting.Set(false)
if err := mdb.rewriteRDB(); err != nil {
mdb.masterStatus.rewriting.Set(false)
logger.Errorf("rewrite error: %v", err)
}
}()
}
} }
func (mdb *MultiDB) masterListenAof(listener chan CmdLine) { type replAofListener struct {
for { mdb *MultiDB
select { backlog *replBacklog // may NOT be mdb.masterStatus.backlog
case cmdLine := <-listener: readyToSend bool
mdb.masterStatus.mu.Lock() }
reply := protocol.MakeMultiBulkReply(cmdLine)
mdb.masterStatus.appendBacklog(reply.ToBytes()) func (listener *replAofListener) Callback(cmdLines []CmdLine) {
mdb.masterStatus.mu.Unlock() listener.mdb.masterStatus.mu.Lock()
if err := mdb.masterSendUpdatesToSlave(); err != nil { for _, cmdLine := range cmdLines {
logger.Errorf("masterSendUpdatesToSlave after receive aof error: %v", err) reply := protocol.MakeMultiBulkReply(cmdLine)
} listener.backlog.appendBytes(reply.ToBytes())
// if bgSave is running, updates will be sent after the save finished }
case <-mdb.masterStatus.ctx.Done(): listener.mdb.masterStatus.mu.Unlock()
break // listener could receive updates generated during rdb saving in progress
// Do not send updates to slave before rdb saving is finished
if listener.readyToSend {
if err := listener.mdb.masterSendUpdatesToSlave(); err != nil {
logger.Errorf("masterSendUpdatesToSlave after receive aof error: %v", err)
} }
} }
} }
func (mdb *MultiDB) startAsMaster() { func (mdb *MultiDB) startAsMaster() {
mdb.masterStatus = &masterStatus{ mdb.masterStatus = &masterStatus{
ctx: context.Background(), mu: sync.RWMutex{},
mu: sync.RWMutex{}, replId: utils.RandHexString(40),
replId: utils.RandHexString(40), backlog: &replBacklog{},
backlog: nil, slaveMap: make(map[redis.Connection]*slaveClient),
beginOffset: 0, waitSlaves: make(map[*slaveClient]struct{}),
currentOffset: 0, onlineSlaves: make(map[*slaveClient]struct{}),
slaveMap: make(map[redis.Connection]*slaveClient), bgSaveState: bgSaveIdle,
waitSlaves: make(map[*slaveClient]struct{}), rdbFilename: "",
onlineSlaves: make(map[*slaveClient]struct{}),
bgSaveState: bgSaveIdle,
rdbFilename: "",
} }
} }

View File

@@ -192,3 +192,114 @@ func TestReplicationMasterSide(t *testing.T) {
resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "c")) resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "c"))
asserts.AssertBulkReply(t, resp, "c") asserts.AssertBulkReply(t, resp, "c")
} }
func TestReplicationMasterRewriteRDB(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "godis")
if err != nil {
t.Error(err)
return
}
aofFilename := path.Join(tmpDir, "a.aof")
defer func() {
_ = os.Remove(aofFilename)
}()
config.Properties = &config.ServerProperties{
Databases: 16,
AppendOnly: true,
AppendFilename: aofFilename,
}
master := mockServer()
master.initAof()
master.startAsMaster()
masterConn := connection.NewFakeConn()
resp := master.Exec(masterConn, utils.ToCmdLine("SET", "a", "a"))
asserts.AssertNotError(t, resp)
resp = master.Exec(masterConn, utils.ToCmdLine("SET", "b", "b"))
asserts.AssertNotError(t, resp)
time.Sleep(time.Millisecond * 100) // wait write aof
err = master.rewriteRDB()
if err != nil {
t.Error(err)
return
}
resp = master.Exec(masterConn, utils.ToCmdLine("SET", "c", "c"))
asserts.AssertNotError(t, resp)
time.Sleep(time.Millisecond * 100) // wait write aof
// set slave
slave := mockServer()
replConn := connection.NewFakeConn()
master.Exec(replConn, utils.ToCmdLine("psync", "?", "-1"))
masterChan := parser.ParseStream(replConn)
psyncPayload := <-masterChan
if psyncPayload.Err != nil {
t.Errorf("master bad protocol: %v", psyncPayload.Err)
return
}
psyncHeader, ok := psyncPayload.Data.(*protocol.StatusReply)
if !ok {
t.Error("psync header is not a status reply")
return
}
headers := strings.Split(psyncHeader.Status, " ")
if len(headers) != 3 {
t.Errorf("illegal psync header: %s", psyncHeader.Status)
return
}
replId := headers[1]
replOffset, err := strconv.ParseInt(headers[2], 10, 64)
if err != nil {
t.Errorf("illegal offset: %s", headers[2])
return
}
t.Logf("repl id: %s, offset: %d", replId, replOffset)
rdbPayload := <-masterChan
if rdbPayload.Err != nil {
t.Error("read response failed: " + rdbPayload.Err.Error())
return
}
rdbReply, ok := rdbPayload.Data.(*protocol.BulkReply)
if !ok {
t.Error("illegal payload header: " + string(rdbPayload.Data.ToBytes()))
return
}
rdbDec := rdb.NewDecoder(bytes.NewReader(rdbReply.Arg))
err = importRDB(rdbDec, slave)
if err != nil {
t.Error("import rdb failed: " + err.Error())
return
}
slaveConn := connection.NewFakeConn()
resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "a"))
asserts.AssertBulkReply(t, resp, "a")
resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "b"))
asserts.AssertBulkReply(t, resp, "b")
master.masterCron()
for {
payload := <-masterChan
if payload.Err != nil {
t.Error(payload.Err)
return
}
cmdLine, ok := payload.Data.(*protocol.MultiBulkReply)
if !ok {
t.Error("unexpected payload: " + string(payload.Data.ToBytes()))
return
}
slave.Exec(replConn, cmdLine.Args)
n := len(cmdLine.ToBytes())
slave.slaveStatus.replOffset += int64(n)
if string(cmdLine.Args[0]) != "ping" {
break
}
}
resp = slave.Exec(slaveConn, utils.ToCmdLine("get", "c"))
asserts.AssertBulkReply(t, resp, "c")
}

View File

@@ -21,7 +21,7 @@ const (
type Connection struct { type Connection struct {
conn net.Conn conn net.Conn
// waiting until finish sending data, used for graceful shutdown // wait until finish sending data, used for graceful shutdown
sendingData wait.Wait sendingData wait.Wait
// lock while server sending response // lock while server sending response

View File

@@ -1,7 +1,8 @@
package connection package connection
import ( import (
"bytes" "fmt"
"github.com/hdt3213/godis/lib/logger"
"io" "io"
"sync" "sync"
) )
@@ -9,8 +10,9 @@ import (
// FakeConn implements redis.Connection for test // FakeConn implements redis.Connection for test
type FakeConn struct { type FakeConn struct {
Connection Connection
buf bytes.Buffer buf []byte
wait chan struct{} offset int
waitOn chan struct{}
closed bool closed bool
mu sync.Mutex mu sync.Mutex
} }
@@ -25,51 +27,76 @@ func (c *FakeConn) Write(b []byte) (int, error) {
if c.closed { if c.closed {
return 0, io.EOF return 0, io.EOF
} }
n, _ := c.buf.Write(b) c.mu.Lock()
c.buf = append(c.buf, b...)
c.mu.Unlock()
c.notify() c.notify()
return n, nil return len(b), nil
} }
func (c *FakeConn) notify() { func (c *FakeConn) notify() {
if c.wait != nil { if c.waitOn != nil {
c.mu.Lock() c.mu.Lock()
if c.wait != nil { if c.waitOn != nil {
close(c.wait) logger.Debug(fmt.Sprintf("notify %p", c.waitOn))
c.wait = nil close(c.waitOn)
c.waitOn = nil
} }
c.mu.Unlock() c.mu.Unlock()
} }
} }
func (c *FakeConn) waiting() { func (c *FakeConn) wait(offset int) {
c.mu.Lock() c.mu.Lock()
c.wait = make(chan struct{}) if c.offset != offset { // new data during waiting lock
return
}
if c.waitOn == nil {
c.waitOn = make(chan struct{})
}
waitOn := c.waitOn
logger.Debug(fmt.Sprintf("wait on %p", waitOn))
c.mu.Unlock() c.mu.Unlock()
<-c.wait <-waitOn
logger.Debug(fmt.Sprintf("wait on %p finish", waitOn))
} }
// Read reads data from buffer // Read reads data from buffer
func (c *FakeConn) Read(p []byte) (int, error) { func (c *FakeConn) Read(p []byte) (int, error) {
n, err := c.buf.Read(p) c.mu.Lock()
if err == io.EOF { n := copy(p, c.buf[c.offset:])
c.offset += n
offset := c.offset
c.mu.Unlock()
if n == 0 {
if c.closed { if c.closed {
return 0, io.EOF return n, io.EOF
} }
c.waiting() c.wait(offset)
return c.buf.Read(p) // after notify
if c.closed {
return n, io.EOF
}
n = copy(p, c.buf[c.offset:])
c.offset += n
return n, nil
} }
return n, err if c.closed {
return n, io.EOF
}
return n, nil
} }
// Clean resets the buffer // Clean resets the buffer
func (c *FakeConn) Clean() { func (c *FakeConn) Clean() {
c.wait = make(chan struct{}) c.waitOn = make(chan struct{})
c.buf.Reset() c.buf = nil
c.offset = 0
} }
// Bytes returns written data // Bytes returns written data
func (c *FakeConn) Bytes() []byte { func (c *FakeConn) Bytes() []byte {
return c.buf.Bytes() return c.buf
} }
func (c *FakeConn) Close() error { func (c *FakeConn) Close() error {

View File

@@ -156,6 +156,9 @@ func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) er
func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error { func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error {
header, err := reader.ReadBytes('\n') header, err := reader.ReadBytes('\n')
header = bytes.TrimSuffix(header, []byte{'\r', '\n'}) header = bytes.TrimSuffix(header, []byte{'\r', '\n'})
if len(header) == 0 {
return errors.New("empty header")
}
strLen, err := strconv.ParseInt(string(header[1:]), 10, 64) strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)
if err != nil || strLen <= 0 { if err != nil || strLen <= 0 {
return errors.New("illegal bulk header: " + string(header)) return errors.New("illegal bulk header: " + string(header))