mirror of
				https://github.com/HDT3213/godis.git
				synced 2025-10-31 03:56:22 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			276 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			276 lines
		
	
	
		
			6.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package parser
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"bytes"
 | |
| 	"errors"
 | |
| 	"github.com/hdt3213/godis/interface/redis"
 | |
| 	"github.com/hdt3213/godis/lib/logger"
 | |
| 	"github.com/hdt3213/godis/redis/reply"
 | |
| 	"io"
 | |
| 	"runtime/debug"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| )
 | |
| 
 | |
| // Payload stores redis.Reply or error
 | |
| type Payload struct {
 | |
| 	Data redis.Reply
 | |
| 	Err  error
 | |
| }
 | |
| 
 | |
| // ParseStream reads data from io.Reader and send payloads through channel
 | |
| func ParseStream(reader io.Reader) <-chan *Payload {
 | |
| 	ch := make(chan *Payload)
 | |
| 	go parse0(reader, ch)
 | |
| 	return ch
 | |
| }
 | |
| 
 | |
| // ParseOne reads data from []byte and return the first payload
 | |
| func ParseOne(data []byte) (redis.Reply, error) {
 | |
| 	ch := make(chan *Payload)
 | |
| 	reader := bytes.NewReader(data)
 | |
| 	go parse0(reader, ch)
 | |
| 	payload := <-ch // parse0 will close the channel
 | |
| 	if payload == nil {
 | |
| 		return nil, errors.New("no reply")
 | |
| 	}
 | |
| 	return payload.Data, payload.Err
 | |
| }
 | |
| 
 | |
| type readState struct {
 | |
| 	downloading       bool
 | |
| 	expectedArgsCount int
 | |
| 	receivedCount     int
 | |
| 	msgType           byte
 | |
| 	args              [][]byte
 | |
| 	fixedLen          int64
 | |
| }
 | |
| 
 | |
| func (s *readState) finished() bool {
 | |
| 	return s.expectedArgsCount > 0 && s.receivedCount == s.expectedArgsCount
 | |
| }
 | |
| 
 | |
| func parse0(reader io.Reader, ch chan<- *Payload) {
 | |
| 	defer func() {
 | |
| 		if err := recover(); err != nil {
 | |
| 			logger.Error(string(debug.Stack()))
 | |
| 		}
 | |
| 	}()
 | |
| 	bufReader := bufio.NewReader(reader)
 | |
| 	var state readState
 | |
| 	var err error
 | |
| 	var msg []byte
 | |
| 	for {
 | |
| 		// read line
 | |
| 		var ioErr bool
 | |
| 		msg, ioErr, err = readLine(bufReader, &state)
 | |
| 		if err != nil {
 | |
| 			if ioErr { // encounter io err, stop read
 | |
| 				ch <- &Payload{
 | |
| 					Err: err,
 | |
| 				}
 | |
| 				close(ch)
 | |
| 				return
 | |
| 			}
 | |
| 			// protocol err, reset read state
 | |
| 			ch <- &Payload{
 | |
| 				Err: err,
 | |
| 			}
 | |
| 			state = readState{}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// parse line
 | |
| 		if !state.downloading {
 | |
| 			// receive new response
 | |
| 			if msg[0] == '*' {
 | |
| 				// multi bulk reply
 | |
| 				err = parseMultiBulkHeader(msg, &state)
 | |
| 				if err != nil {
 | |
| 					ch <- &Payload{
 | |
| 						Err: errors.New("protocol error: " + string(msg)),
 | |
| 					}
 | |
| 					state = readState{} // reset state
 | |
| 					continue
 | |
| 				}
 | |
| 				if state.expectedArgsCount == 0 {
 | |
| 					ch <- &Payload{
 | |
| 						Data: &reply.EmptyMultiBulkReply{},
 | |
| 					}
 | |
| 					state = readState{} // reset state
 | |
| 					continue
 | |
| 				}
 | |
| 			} else if msg[0] == '$' { // bulk reply
 | |
| 				err = parseBulkHeader(msg, &state)
 | |
| 				if err != nil {
 | |
| 					ch <- &Payload{
 | |
| 						Err: errors.New("protocol error: " + string(msg)),
 | |
| 					}
 | |
| 					state = readState{} // reset state
 | |
| 					continue
 | |
| 				}
 | |
| 				if state.fixedLen == -1 { // null bulk reply
 | |
| 					ch <- &Payload{
 | |
| 						Data: &reply.NullBulkReply{},
 | |
| 					}
 | |
| 					state = readState{} // reset state
 | |
| 					continue
 | |
| 				}
 | |
| 			} else {
 | |
| 				// single line reply
 | |
| 				result, err := parseSingleLineReply(msg)
 | |
| 				ch <- &Payload{
 | |
| 					Data: result,
 | |
| 					Err:  err,
 | |
| 				}
 | |
| 				state = readState{} // reset state
 | |
| 				continue
 | |
| 			}
 | |
| 		} else {
 | |
| 			// receive following bulk reply
 | |
| 			err = readBulkBody(msg, &state)
 | |
| 			if err != nil {
 | |
| 				ch <- &Payload{
 | |
| 					Err: errors.New("protocol error: " + string(msg)),
 | |
| 				}
 | |
| 				state = readState{} // reset state
 | |
| 				continue
 | |
| 			}
 | |
| 			// if sending finished
 | |
| 			if state.finished() {
 | |
| 				var result redis.Reply
 | |
| 				if state.msgType == '*' {
 | |
| 					result = reply.MakeMultiBulkReply(state.args)
 | |
| 				} else if state.msgType == '$' {
 | |
| 					result = reply.MakeBulkReply(state.args[0])
 | |
| 				}
 | |
| 				ch <- &Payload{
 | |
| 					Data: result,
 | |
| 					Err:  err,
 | |
| 				}
 | |
| 				state = readState{}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
 | |
| 	var msg []byte
 | |
| 	var err error
 | |
| 	if state.fixedLen == 0 { // read normal line
 | |
| 		msg, err = bufReader.ReadBytes('\n')
 | |
| 		if err != nil {
 | |
| 			return nil, true, err
 | |
| 		}
 | |
| 		if len(msg) == 0 || msg[len(msg)-2] != '\r' {
 | |
| 			return nil, false, errors.New("protocol error: " + string(msg))
 | |
| 		}
 | |
| 	} else { // read bulk line (binary safe)
 | |
| 		msg = make([]byte, state.fixedLen+2)
 | |
| 		_, err = io.ReadFull(bufReader, msg)
 | |
| 		if err != nil {
 | |
| 			return nil, true, err
 | |
| 		}
 | |
| 		if len(msg) == 0 ||
 | |
| 			msg[len(msg)-2] != '\r' ||
 | |
| 			msg[len(msg)-1] != '\n' {
 | |
| 			return nil, false, errors.New("protocol error: " + string(msg))
 | |
| 		}
 | |
| 		state.fixedLen = 0
 | |
| 	}
 | |
| 	return msg, false, nil
 | |
| }
 | |
| 
 | |
| func parseMultiBulkHeader(msg []byte, state *readState) error {
 | |
| 	var err error
 | |
| 	var expectedLine uint64
 | |
| 	expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
 | |
| 	if err != nil {
 | |
| 		return errors.New("protocol error: " + string(msg))
 | |
| 	}
 | |
| 	if expectedLine == 0 {
 | |
| 		state.expectedArgsCount = 0
 | |
| 		return nil
 | |
| 	} else if expectedLine > 0 {
 | |
| 		// first line of multi bulk reply
 | |
| 		state.msgType = msg[0]
 | |
| 		state.downloading = true
 | |
| 		state.expectedArgsCount = int(expectedLine)
 | |
| 		state.receivedCount = 0
 | |
| 		state.args = make([][]byte, expectedLine)
 | |
| 		return nil
 | |
| 	} else {
 | |
| 		return errors.New("protocol error: " + string(msg))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func parseBulkHeader(msg []byte, state *readState) error {
 | |
| 	var err error
 | |
| 	state.fixedLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
 | |
| 	if err != nil {
 | |
| 		return errors.New("protocol error: " + string(msg))
 | |
| 	}
 | |
| 	if state.fixedLen == -1 { // null bulk
 | |
| 		return nil
 | |
| 	} else if state.fixedLen > 0 {
 | |
| 		state.msgType = msg[0]
 | |
| 		state.downloading = true
 | |
| 		state.expectedArgsCount = 1
 | |
| 		state.receivedCount = 0
 | |
| 		state.args = make([][]byte, 1)
 | |
| 		return nil
 | |
| 	} else {
 | |
| 		return errors.New("protocol error: " + string(msg))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func parseSingleLineReply(msg []byte) (redis.Reply, error) {
 | |
| 	str := strings.TrimSuffix(string(msg), "\n")
 | |
| 	str = strings.TrimSuffix(str, "\r")
 | |
| 	var result redis.Reply
 | |
| 	switch msg[0] {
 | |
| 	case '+': // status reply
 | |
| 		result = reply.MakeStatusReply(str[1:])
 | |
| 	case '-': // err reply
 | |
| 		result = reply.MakeErrReply(str[1:])
 | |
| 	case ':': // int reply
 | |
| 		val, err := strconv.ParseInt(str[1:], 10, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.New("protocol error: " + string(msg))
 | |
| 		}
 | |
| 		result = reply.MakeIntReply(val)
 | |
| 	default:
 | |
| 		// parse as text protocol
 | |
| 		strs := strings.Split(str, " ")
 | |
| 		args := make([][]byte, len(strs))
 | |
| 		for i, s := range strs {
 | |
| 			args[i] = []byte(s)
 | |
| 		}
 | |
| 		result = reply.MakeMultiBulkReply(args)
 | |
| 	}
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // read the non-first lines of multi bulk reply or bulk reply
 | |
| func readBulkBody(msg []byte, state *readState) error {
 | |
| 	line := msg[0 : len(msg)-2]
 | |
| 	var err error
 | |
| 	if line[0] == '$' {
 | |
| 		// bulk reply
 | |
| 		state.fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
 | |
| 		if err != nil {
 | |
| 			return errors.New("protocol error: " + string(msg))
 | |
| 		}
 | |
| 		if state.fixedLen <= 0 { // null bulk in multi bulks
 | |
| 			state.args[state.receivedCount] = []byte{}
 | |
| 			state.receivedCount++
 | |
| 			state.fixedLen = 0
 | |
| 		}
 | |
| 	} else {
 | |
| 		state.args[state.receivedCount] = line
 | |
| 		state.receivedCount++
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | 
