mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-05 08:46:56 +08:00
270 lines
6.3 KiB
Go
270 lines
6.3 KiB
Go
package parser
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"errors"
|
|
"github.com/hdt3213/godis/interface/redis"
|
|
"github.com/hdt3213/godis/lib/logger"
|
|
"github.com/hdt3213/godis/redis/reply"
|
|
"io"
|
|
"runtime/debug"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
// Payload stores redis.Reply or error
|
|
type Payload struct {
|
|
Data redis.Reply
|
|
Err error
|
|
}
|
|
|
|
// ParseStream reads data from io.Reader and send payloads through channel
|
|
func ParseStream(reader io.Reader) <-chan *Payload {
|
|
ch := make(chan *Payload)
|
|
go parse0(reader, ch)
|
|
return ch
|
|
}
|
|
|
|
// ParseOne reads data from []byte and return the first payload
|
|
func ParseOne(data []byte) (redis.Reply, error) {
|
|
ch := make(chan *Payload)
|
|
reader := bytes.NewReader(data)
|
|
go parse0(reader, ch)
|
|
payload := <-ch // parse0 will close the channel
|
|
if payload == nil {
|
|
return nil, errors.New("no reply")
|
|
}
|
|
return payload.Data, payload.Err
|
|
}
|
|
|
|
type readState struct {
|
|
readingMultiLine bool
|
|
expectedArgsCount int
|
|
msgType byte
|
|
args [][]byte
|
|
bulkLen int64
|
|
}
|
|
|
|
func (s *readState) finished() bool {
|
|
return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
|
|
}
|
|
|
|
func parse0(reader io.Reader, ch chan<- *Payload) {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
logger.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
bufReader := bufio.NewReader(reader)
|
|
var state readState
|
|
var err error
|
|
var msg []byte
|
|
for {
|
|
// read line
|
|
var ioErr bool
|
|
msg, ioErr, err = readLine(bufReader, &state)
|
|
if err != nil {
|
|
if ioErr { // encounter io err, stop read
|
|
ch <- &Payload{
|
|
Err: err,
|
|
}
|
|
close(ch)
|
|
return
|
|
}
|
|
// protocol err, reset read state
|
|
ch <- &Payload{
|
|
Err: err,
|
|
}
|
|
state = readState{}
|
|
continue
|
|
}
|
|
|
|
// parse line
|
|
if !state.readingMultiLine {
|
|
// receive new response
|
|
if msg[0] == '*' {
|
|
// multi bulk reply
|
|
err = parseMultiBulkHeader(msg, &state)
|
|
if err != nil {
|
|
ch <- &Payload{
|
|
Err: errors.New("protocol error: " + string(msg)),
|
|
}
|
|
state = readState{} // reset state
|
|
continue
|
|
}
|
|
if state.expectedArgsCount == 0 {
|
|
ch <- &Payload{
|
|
Data: &reply.EmptyMultiBulkReply{},
|
|
}
|
|
state = readState{} // reset state
|
|
continue
|
|
}
|
|
} else if msg[0] == '$' { // bulk reply
|
|
err = parseBulkHeader(msg, &state)
|
|
if err != nil {
|
|
ch <- &Payload{
|
|
Err: errors.New("protocol error: " + string(msg)),
|
|
}
|
|
state = readState{} // reset state
|
|
continue
|
|
}
|
|
if state.bulkLen == -1 { // null bulk reply
|
|
ch <- &Payload{
|
|
Data: &reply.NullBulkReply{},
|
|
}
|
|
state = readState{} // reset state
|
|
continue
|
|
}
|
|
} else {
|
|
// single line reply
|
|
result, err := parseSingleLineReply(msg)
|
|
ch <- &Payload{
|
|
Data: result,
|
|
Err: err,
|
|
}
|
|
state = readState{} // reset state
|
|
continue
|
|
}
|
|
} else {
|
|
// receive following bulk reply
|
|
err = readBody(msg, &state)
|
|
if err != nil {
|
|
ch <- &Payload{
|
|
Err: errors.New("protocol error: " + string(msg)),
|
|
}
|
|
state = readState{} // reset state
|
|
continue
|
|
}
|
|
// if sending finished
|
|
if state.finished() {
|
|
var result redis.Reply
|
|
if state.msgType == '*' {
|
|
result = reply.MakeMultiBulkReply(state.args)
|
|
} else if state.msgType == '$' {
|
|
result = reply.MakeBulkReply(state.args[0])
|
|
}
|
|
ch <- &Payload{
|
|
Data: result,
|
|
Err: err,
|
|
}
|
|
state = readState{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
|
|
var msg []byte
|
|
var err error
|
|
if state.bulkLen == 0 { // read normal line
|
|
msg, err = bufReader.ReadBytes('\n')
|
|
if err != nil {
|
|
return nil, true, err
|
|
}
|
|
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
|
|
return nil, false, errors.New("protocol error: " + string(msg))
|
|
}
|
|
} else { // read bulk line (binary safe)
|
|
msg = make([]byte, state.bulkLen+2)
|
|
_, err = io.ReadFull(bufReader, msg)
|
|
if err != nil {
|
|
return nil, true, err
|
|
}
|
|
if len(msg) == 0 ||
|
|
msg[len(msg)-2] != '\r' ||
|
|
msg[len(msg)-1] != '\n' {
|
|
return nil, false, errors.New("protocol error: " + string(msg))
|
|
}
|
|
state.bulkLen = 0
|
|
}
|
|
return msg, false, nil
|
|
}
|
|
|
|
func parseMultiBulkHeader(msg []byte, state *readState) error {
|
|
var err error
|
|
var expectedLine uint64
|
|
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
|
|
if err != nil {
|
|
return errors.New("protocol error: " + string(msg))
|
|
}
|
|
if expectedLine == 0 {
|
|
state.expectedArgsCount = 0
|
|
return nil
|
|
} else if expectedLine > 0 {
|
|
// first line of multi bulk reply
|
|
state.msgType = msg[0]
|
|
state.readingMultiLine = true
|
|
state.expectedArgsCount = int(expectedLine)
|
|
state.args = make([][]byte, 0, expectedLine)
|
|
return nil
|
|
} else {
|
|
return errors.New("protocol error: " + string(msg))
|
|
}
|
|
}
|
|
|
|
func parseBulkHeader(msg []byte, state *readState) error {
|
|
var err error
|
|
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
|
|
if err != nil {
|
|
return errors.New("protocol error: " + string(msg))
|
|
}
|
|
if state.bulkLen == -1 { // null bulk
|
|
return nil
|
|
} else if state.bulkLen > 0 {
|
|
state.msgType = msg[0]
|
|
state.readingMultiLine = true
|
|
state.expectedArgsCount = 1
|
|
state.args = make([][]byte, 0, 1)
|
|
return nil
|
|
} else {
|
|
return errors.New("protocol error: " + string(msg))
|
|
}
|
|
}
|
|
|
|
func parseSingleLineReply(msg []byte) (redis.Reply, error) {
|
|
str := strings.TrimSuffix(string(msg), "\r\n")
|
|
var result redis.Reply
|
|
switch msg[0] {
|
|
case '+': // status reply
|
|
result = reply.MakeStatusReply(str[1:])
|
|
case '-': // err reply
|
|
result = reply.MakeErrReply(str[1:])
|
|
case ':': // int reply
|
|
val, err := strconv.ParseInt(str[1:], 10, 64)
|
|
if err != nil {
|
|
return nil, errors.New("protocol error: " + string(msg))
|
|
}
|
|
result = reply.MakeIntReply(val)
|
|
default:
|
|
// parse as text protocol
|
|
strs := strings.Split(str, " ")
|
|
args := make([][]byte, len(strs))
|
|
for i, s := range strs {
|
|
args[i] = []byte(s)
|
|
}
|
|
result = reply.MakeMultiBulkReply(args)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// read the non-first lines of multi bulk reply or bulk reply
|
|
func readBody(msg []byte, state *readState) error {
|
|
line := msg[0 : len(msg)-2]
|
|
var err error
|
|
if line[0] == '$' {
|
|
// bulk reply
|
|
state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
|
|
if err != nil {
|
|
return errors.New("protocol error: " + string(msg))
|
|
}
|
|
if state.bulkLen <= 0 { // null bulk in multi bulks
|
|
state.args = append(state.args, []byte{})
|
|
state.bulkLen = 0
|
|
}
|
|
} else {
|
|
state.args = append(state.args, line)
|
|
}
|
|
return nil
|
|
}
|