package parser import ( "bufio" "bytes" "errors" "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/redis/reply" "io" "runtime/debug" "strconv" "strings" ) // Payload stores redis.Reply or error type Payload struct { Data redis.Reply Err error } // ParseStream reads data from io.Reader and send payloads through channel func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } // ParseOne reads data from []byte and return the first payload func ParseOne(data []byte) (redis.Reply, error) { ch := make(chan *Payload) reader := bytes.NewReader(data) go parse0(reader, ch) payload := <-ch // parse0 will close the channel if payload == nil { return nil, errors.New("no reply") } return payload.Data, payload.Err } type readState struct { readingMultiLine bool expectedArgsCount int msgType byte args [][]byte bulkLen int64 } func (s *readState) finished() bool { return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount } func parse0(reader io.Reader, ch chan<- *Payload) { defer func() { if err := recover(); err != nil { logger.Error(string(debug.Stack())) } }() bufReader := bufio.NewReader(reader) var state readState var err error var msg []byte for { // read line var ioErr bool msg, ioErr, err = readLine(bufReader, &state) if err != nil { if ioErr { // encounter io err, stop read ch <- &Payload{ Err: err, } close(ch) return } // protocol err, reset read state ch <- &Payload{ Err: err, } state = readState{} continue } // parse line if !state.readingMultiLine { // receive new response if msg[0] == '*' { // multi bulk reply err = parseMultiBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.expectedArgsCount == 0 { ch <- &Payload{ Data: &reply.EmptyMultiBulkReply{}, } state = readState{} // reset state continue } } else if msg[0] == '$' { // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &Payload{ Data: &reply.NullBulkReply{}, } state = readState{} // reset state continue } } else { // single line reply result, err := parseSingleLineReply(msg) ch <- &Payload{ Data: result, Err: err, } state = readState{} // reset state continue } } else { // receive following bulk reply err = readBody(msg, &state) if err != nil { ch <- &Payload{ Err: errors.New("protocol error: " + string(msg)), } state = readState{} // reset state continue } // if sending finished if state.finished() { var result redis.Reply if state.msgType == '*' { result = reply.MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = reply.MakeBulkReply(state.args[0]) } ch <- &Payload{ Data: result, Err: err, } state = readState{} } } } } func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { // read normal line msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { // read bulk line (binary safe) msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil } func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { // first line of multi bulk reply state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseSingleLineReply(msg []byte) (redis.Reply, error) { str := strings.TrimSuffix(string(msg), "\n") str = strings.TrimSuffix(str, "\r") var result redis.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) default: // parse as text protocol strs := strings.Split(str, " ") args := make([][]byte, len(strs)) for i, s := range strs { args[i] = []byte(s) } result = reply.MakeMultiBulkReply(args) } return result, nil } // read the non-first lines of multi bulk reply or bulk reply func readBody(msg []byte, state *readState) error { line := msg[0 : len(msg)-2] var err error if line[0] == '$' { // bulk reply state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }