Added resp package for encoding and decoding resp messages.

Created encoding functions for the following commands: ping, set, setnx, get, mget, incr, incrby, incrbyfloat.
Created utils packages for shared utility functions.
This commit is contained in:
Kelvin Clement Mwinuka
2023-06-24 05:38:59 +08:00
parent 0b1c76f8b6
commit 40be716513
16 changed files with 1412 additions and 79 deletions

View File

@@ -3,6 +3,7 @@
[x] Config file support
[x] TCP support w/ TLS
[x] HTTP support w/ TLS
[] Ping/Pong
[] String support
[] List support
[] Set support

View File

@@ -112,6 +112,7 @@ func main() {
defer conn.Close()
done := make(chan struct{})
connClosed := make(chan struct{})
connRW := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
stdioRW := bufio.NewReadWriter(bufio.NewReader(os.Stdin), bufio.NewWriter(os.Stdout))
@@ -127,18 +128,42 @@ func main() {
} else {
in := bytes.TrimSpace(in)
// Check for exit command
if bytes.Equal(bytes.ToLower(in), []byte("exit")) {
// Check for quit command
if bytes.Equal(bytes.ToLower(in), []byte("quit")) {
break
}
enc := serialization.Encode(in)
connRW.Write([]byte(fmt.Sprintf("\"%s\"\n", string(enc))))
if err := serialization.Encode(connRW, string(in)); err != nil {
stdioRW.Write([]byte(err.Error()))
stdioRW.Flush()
} else {
connRW.Write([]byte("\n"))
connRW.Flush()
}
}
}
done <- struct{}{}
}()
<-done
go func() {
for {
l, _, err := connRW.ReadLine()
if err != nil && err == io.EOF {
break
}
stdioRW.Write(l)
stdioRW.Write([]byte("\n"))
stdioRW.Flush()
}
connClosed <- struct{}{}
}()
select {
case <-done:
fmt.Println("Exited")
case <-connClosed:
fmt.Println("Connection closed")
}
}

5
go.mod
View File

@@ -2,4 +2,7 @@ module github.com/kelvinmwinuka/memstore
go 1.20
require gopkg.in/yaml.v3 v3.0.1 // indirect
require (
github.com/tidwall/resp v0.1.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

2
go.sum
View File

@@ -1,3 +1,5 @@
github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE=
github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1 +1,32 @@
package serialization
import (
"bytes"
"fmt"
"io"
"github.com/tidwall/resp"
)
func Decode(raw string) {
rd := resp.NewReader(bytes.NewBufferString(raw))
for {
v, _, err := rd.ReadValue()
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
}
fmt.Println(v)
if v.Type().String() == "Array" {
for _, elem := range v.Array() {
fmt.Printf("%s: %v\n", elem.Type().String(), elem)
}
}
}
}

View File

@@ -1,83 +1,159 @@
package serialization
import (
"bytes"
"encoding/csv"
"errors"
"fmt"
"log"
"io"
"math"
"strconv"
"strings"
"github.com/kelvinmwinuka/memstore/utils"
"github.com/tidwall/resp"
)
func tokenize(b []byte) ([][]byte, error) {
qOpen := false
transformed := []byte("")
const (
wrong_args_error = "wrong number of arguments for %s command"
wrong_type_error = "wrong data type for %s command"
)
for _, c := range b {
if c != ' ' && c != '"' {
transformed = append(transformed, c)
continue
func tokenize(comm string) ([]string, error) {
r := csv.NewReader(strings.NewReader(comm))
r.Comma = ' '
return r.Read()
}
if c == '"' {
qOpen = !qOpen
if qOpen && !bytes.HasSuffix(transformed, []byte(" ")) {
transformed = append(transformed, ' ')
}
transformed = append(transformed, c)
continue
}
if c == ' ' && qOpen {
transformed = append(transformed, []byte("*-*")...)
continue
}
if c == ' ' && !qOpen {
transformed = append(transformed, c)
continue
func encodePingPong(wr *resp.Writer, tokens []string) error {
switch len(tokens) {
default:
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
case 1:
wr.WriteSimpleString(strings.ToUpper(tokens[0]))
return nil
case 2:
wr.WriteArray([]resp.Value{
resp.StringValue(strings.ToUpper(tokens[0])),
resp.StringValue(tokens[1]),
})
return nil
}
}
if qOpen {
return nil, errors.New("open quote in command")
func encodeSet(wr *resp.Writer, tokens []string) error {
switch len(tokens) {
default:
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
case 3:
arr := []resp.Value{
resp.StringValue(strings.ToUpper(tokens[0])),
resp.StringValue(tokens[1]),
}
tokens := bytes.Split(transformed, []byte(" "))
for i := 0; i < len(tokens); i++ {
tokens[i] = bytes.Trim(tokens[i], "\"")
tokens[i] = bytes.ReplaceAll(tokens[i], []byte("*-*"), []byte(" "))
if n, err := strconv.ParseFloat(tokens[2], 32); err != nil {
arr = append(arr, resp.StringValue(tokens[2]))
} else if math.Mod(n, 1.0) == 0 {
arr = append(arr, resp.IntegerValue(int(n)))
} else {
arr = append(arr, resp.FloatValue(n))
}
return tokens, nil
wr.WriteArray(arr)
return nil
}
}
func Encode(b []byte) []byte {
tokens, err := tokenize(b)
func encodeGet(wr *resp.Writer, tokens []string) error {
switch len(tokens) {
default:
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
case 2:
wr.WriteArray([]resp.Value{
resp.StringValue(strings.ToUpper(tokens[0])),
resp.StringValue(tokens[1]),
})
return nil
}
}
func encodeMGet(wr *resp.Writer, tokens []string) error {
switch len(tokens) {
default:
arr := []resp.Value{resp.StringValue(strings.ToUpper(tokens[0]))}
for _, token := range tokens[1:] {
arr = append(arr, resp.StringValue(token))
}
wr.WriteArray(arr)
return nil
case 1:
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
}
}
func encodeIncr(wr *resp.Writer, tokens []string) error {
switch len(tokens) {
default:
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
case 2:
if utils.Contains[string]([]string{"incrby", "incrbyfloat"}, strings.ToLower(tokens[0])) {
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
}
wr.WriteArray([]resp.Value{
resp.StringValue(strings.ToUpper(tokens[0])),
resp.StringValue(tokens[1]),
})
return nil
case 3:
if strings.ToLower(tokens[0]) == "incr" {
return fmt.Errorf(wrong_args_error, strings.ToUpper(tokens[0]))
}
arr := []resp.Value{
resp.StringValue(strings.ToUpper(tokens[0])),
resp.StringValue(tokens[1]),
}
if n, err := strconv.ParseFloat(tokens[2], 32); err != nil {
return fmt.Errorf(wrong_type_error, strings.ToUpper(tokens[0]))
} else if !utils.IsInteger(n) || strings.ToLower(tokens[0]) == "incrbyfloat" {
arr = append(arr, resp.FloatValue(n))
} else {
arr = append(arr, resp.IntegerValue(int(n)))
}
wr.WriteArray(arr)
return nil
}
}
func Encode(buf io.ReadWriter, comm string) error {
var err error = nil
tokens, err := tokenize(comm)
if err != nil {
log.Fatal(err)
return errors.New("could not parse command")
}
if len(tokens) <= 0 {
return b
wr := resp.NewWriter(buf)
switch string(strings.ToLower(tokens[0])) {
case "ping", "pong":
err = encodePingPong(wr, tokens)
case "set", "setnx":
err = encodeSet(wr, tokens)
case "get":
err = encodeGet(wr, tokens)
case "mget":
err = encodeMGet(wr, tokens)
case "incr", "incrby", "incrbyfloat":
err = encodeIncr(wr, tokens)
default:
err = errors.New("failed to parse command")
}
if len(tokens) == 1 && bytes.Equal(bytes.ToLower(tokens[0]), []byte("ping")) {
return []byte(fmt.Sprintf("+%s\\r\\n", string(bytes.ToUpper(tokens[0]))))
}
if len(tokens) > 1 && bytes.Equal(bytes.ToLower(tokens[0]), []byte("ping")) {
enc := []byte(fmt.Sprintf("*%d\\r\\n$%d\\r\\n%s\\r\\n",
len(tokens), len(tokens[0]), string(bytes.ToUpper(tokens[0]))))
for i := 1; i < len(tokens); i++ {
token := tokens[i]
enc = append(enc, []byte(fmt.Sprintf("$%d\\r\\n%s\\r\\n", len(token), token))...)
}
return enc
}
return b
return err
}

View File

@@ -2,15 +2,18 @@ package main
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"github.com/kelvinmwinuka/memstore/serialization"
"gopkg.in/yaml.v3"
)
@@ -32,20 +35,34 @@ type Server struct {
func (server *Server) hanndleConnection(conn net.Conn) {
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
sw := bufio.NewWriter(os.Stdout)
// sw := bufio.NewWriter(os.Stdout)
var line [][]byte
for {
l, _, err := rw.ReadLine()
b, _, err := rw.ReadLine()
if err != nil {
if err != nil && err == io.EOF {
fmt.Println(err)
break
}
sw.Write(l)
sw.Flush()
line = append(line, b)
if bytes.Equal(b, []byte("")) {
// End of RESP message
// sw.Write(bytes.Join(line, []byte("\\r\\n")))
// sw.Flush()
serialization.Decode(string(bytes.Join(line, []byte("\r\n"))))
line = [][]byte{}
}
}
conn.Close()
}
func (server *Server) StartTCP() {
conf := server.config
var listener net.Listener
@@ -84,7 +101,6 @@ func (server *Server) StartTCP() {
fmt.Println("Could not establish connection")
continue
}
// Read loop for connection
go server.hanndleConnection(conn)
}

16
utils/utils.go Normal file
View File

@@ -0,0 +1,16 @@
package utils
import "math"
func Contains[T comparable](arr []T, elem T) bool {
for _, v := range arr {
if v == elem {
return true
}
}
return false
}
func IsInteger(n float64) bool {
return math.Mod(n, 1.0) == 0
}

4
vendor/github.com/tidwall/resp/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,4 @@
.DS_Store
aof.tmp
appendonly.aof
coverage.out

19
vendor/github.com/tidwall/resp/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,19 @@
Copyright (c) 2016 Josh Baker
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

193
vendor/github.com/tidwall/resp/README.md generated vendored Normal file
View File

@@ -0,0 +1,193 @@
**This project has been archived. If you are looking for a high-performance Redis server for Go, please checkout [Redcon](https://github.com/tidwall/redcon). It's much faster than this implementation and can handle pipelining.**
RESP
====
[![GoDoc](https://godoc.org/github.com/tidwall/resp?status.svg)](https://godoc.org/github.com/tidwall/resp)
RESP is a [Go](http://golang.org/) library that provides a reader, writer, and server implementation for the [Redis RESP Protocol](http://redis.io/topics/protocol).
RESP is short for **REdis Serialization Protocol**.
While the protocol was designed specifically for Redis, it can be used for other client-server software projects.
The RESP protocol has the advantages of being human readable and with performance of a binary protocol.
Features
--------
- [Reader](#reader) and [Writer](#writer) types for streaming RESP values from files, networks, or byte streams.
- [Server Implementation](#server) for creating your own RESP server. [Clients](#clients) use the same tools and libraries as Redis.
- [Append-only File](#append-only-file) type for persisting RESP values to disk.
Installation
------------
Install resp using the "go get" command:
go get github.com/tidwall/resp
The Go distribution is Resp's only dependency.
Documentation
-------------
- [API Reference](http://godoc.org/github.com/tidwall/resp)
Server
------
A Redis clone that implements the SET and GET commands.
- You can interact using the Redis CLI (redis-cli). http://redis.io/download
- Or, use the telnet by typing in "telnet localhost 6380" and type in "set key value" and "get key".
- Or, use a client library such as http://github.com/garyburd/redigo
- The "QUIT" command will close the connection.
```go
package main
import (
"errors"
"log"
"sync"
"github.com/tidwall/resp"
)
func main() {
var mu sync.RWMutex
kvs := make(map[string]string)
s := resp.NewServer()
s.HandleFunc("set", func(conn *resp.Conn, args []resp.Value) bool {
if len(args) != 3 {
conn.WriteError(errors.New("ERR wrong number of arguments for 'set' command"))
} else {
mu.Lock()
kvs[args[1].String()] = args[2].String()
mu.Unlock()
conn.WriteSimpleString("OK")
}
return true
})
s.HandleFunc("get", func(conn *resp.Conn, args []resp.Value) bool {
if len(args) != 2 {
conn.WriteError(errors.New("ERR wrong number of arguments for 'get' command"))
} else {
mu.RLock()
s, ok := kvs[args[1].String()]
mu.RUnlock()
if !ok {
conn.WriteNull()
} else {
conn.WriteString(s)
}
}
return true
})
if err := s.ListenAndServe(":6379"); err != nil {
log.Fatal(err)
}
}
```
Reader
------
The resp Reader type allows for an application to read raw RESP values from a file, network, or byte stream.
```go
raw := "*3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n"
raw += "*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n"
rd := resp.NewReader(bytes.NewBufferString(raw))
for {
v, _, err := rd.ReadValue()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
fmt.Printf("Read %s\n", v.Type())
if v.Type() == Array {
for i, v := range v.Array() {
fmt.Printf(" #%d %s, value: '%s'\n", i, v.Type(), v)
}
}
}
// Output:
// Read Array
// #0 BulkString, value: 'set'
// #1 BulkString, value: 'leader'
// #2 BulkString, value: 'Charlie'
// Read Array
// #0 BulkString, value: 'set'
// #1 BulkString, value: 'follower'
// #2 BulkString, value: 'Skyler'
```
Writer
------
The resp Writer type allows for an application to write raw RESP values to a file, network, or byte stream.
```go
var buf bytes.Buffer
wr := resp.NewWriter(&buf)
wr.WriteArray([]resp.Value{resp.StringValue("set"), resp.StringValue("leader"), resp.StringValue("Charlie")})
wr.WriteArray([]resp.Value{resp.StringValue("set"), resp.StringValue("follower"), resp.StringValue("Skyler")})
fmt.Printf("%s", buf.String())
// Output:
// *3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n
// *3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n
```
Append-Only File
----------------
An append only file (AOF) allows your application to persist values to disk. It's very easy to use, and includes the same level of durablilty and binary format as [Redis AOF Persistence](http://redis.io/topics/persistence).
Check out the [AOF documentation](https://godoc.org/github.com/tidwall/resp#AOF) for more information
```go
// create and fill an appendonly file
aof, err := resp.OpenAOF("appendonly.aof")
if err != nil {
log.Fatal(err)
}
// append a couple values and close the file
aof.Append(resp.MultiBulkValue("set", "leader", "Charlie"))
aof.Append(resp.MultiBulkValue("set", "follower", "Skyler"))
aof.Close()
// reopen and scan all values
aof, err = resp.OpenAOF("appendonly.aof")
if err != nil {
log.Fatal(err)
}
defer aof.Close()
aof.Scan(func(v Value) {
fmt.Printf("%s\n", v.String())
})
// Output:
// [set leader Charlie]
// [set follower Skyler]
}
```
Clients
-------
There are bunches of [RESP Clients](http://redis.io/clients). Most any client that supports Redis will support this implementation.
Contact
-------
Josh Baker [@tidwall](http://twitter.com/tidwall)
License
-------
Tile38 source code is available under the MIT [License](/LICENSE).

177
vendor/github.com/tidwall/resp/aof.go generated vendored Normal file
View File

@@ -0,0 +1,177 @@
package resp
import (
"errors"
"io"
"os"
"sync"
"time"
)
// SyncPolicy represents a file's fsync policy.
type SyncPolicy int
const (
Never SyncPolicy = iota // The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method.
EverySecond SyncPolicy = iota // The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster.
Always SyncPolicy = iota // The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow.
)
// String returns a string respesentation.
func (policy SyncPolicy) String() string {
switch policy {
default:
return "unknown"
case Never:
return "never"
case EverySecond:
return "every second"
case Always:
return "always"
}
}
var errClosed = errors.New("closed")
// AOF represents an open file descriptor.
type AOF struct {
mu sync.Mutex
f *os.File
closed bool
rd *Reader
policy SyncPolicy
atEnd bool
}
// OpenAOF will open and return an AOF file. If the file does not exist a new one will be created.
func OpenAOF(path string) (*AOF, error) {
var err error
aof := &AOF{}
aof.f, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
aof.policy = EverySecond
go func() {
for {
aof.mu.Lock()
if aof.closed {
aof.mu.Unlock()
return
}
if aof.policy == EverySecond {
aof.f.Sync()
}
aof.mu.Unlock()
time.Sleep(time.Second)
}
}()
return aof, nil
}
// SetSyncPolicy set the sync policy of the file.
// The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster.
// The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method.
// The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow.
// EverySecond is the default.
func (aof *AOF) SetSyncPolicy(policy SyncPolicy) {
aof.mu.Lock()
defer aof.mu.Unlock()
if aof.policy == policy {
return
}
switch policy {
default:
return
case Never, EverySecond, Always:
}
aof.policy = policy
}
// Close will close the file.
func (aof *AOF) Close() error {
aof.mu.Lock()
defer aof.mu.Unlock()
if aof.closed {
return errClosed
}
aof.f.Close()
aof.closed = true
return nil
}
func (aof *AOF) readValues(iterator func(v Value)) error {
aof.atEnd = false
if _, err := aof.f.Seek(0, 0); err != nil {
return err
}
rd := NewReader(aof.f)
for {
v, _, err := rd.ReadValue()
if err != nil {
if err == io.EOF {
break
}
return err
}
if iterator != nil {
iterator(v)
}
}
if _, err := aof.f.Seek(0, 2); err != nil {
return err
}
aof.atEnd = true
return nil
}
// Append writes a value to the end of the file.
func (aof *AOF) Append(v Value) error {
return aof.AppendMulti([]Value{v})
}
// AppendMulti writes multiple values to the end of the file.
// This operation can increase performance over calling multiple Append()s and also has the benefit of transactional writes.
func (aof *AOF) AppendMulti(vs []Value) error {
var bs []byte
for _, v := range vs {
b, err := v.MarshalRESP()
if err != nil {
return err
}
if bs == nil {
bs = b
} else {
bs = append(bs, b...)
}
}
aof.mu.Lock()
defer aof.mu.Unlock()
if aof.closed {
return errClosed
}
if !aof.atEnd {
if err := aof.readValues(nil); err != nil {
return err
}
}
_, err := aof.f.Write(bs)
if err != nil {
return err
}
if aof.policy == Always {
aof.f.Sync()
}
return nil
}
// Scan iterates though all values in the file.
// This operation could take a long time if there lots of values, and the operation cannot be canceled part way through.
func (aof *AOF) Scan(iterator func(v Value)) error {
aof.mu.Lock()
defer aof.mu.Unlock()
if aof.closed {
return errClosed
}
return aof.readValues(iterator)
}

13
vendor/github.com/tidwall/resp/doc.go generated vendored Normal file
View File

@@ -0,0 +1,13 @@
// Copyright 2016 Josh Baker. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
/*
Package resp provides a reader, writer, and server implementation for the RESP protocol. http://redis.io/topics/protocol
RESP is short for "REdis Serialization Protocol".
While the protocol was designed specifically for Redis, it can be used for other client-server software projects.
RESP has the advantages of being human readable and with performance of a binary protocol.
*/
package resp

619
vendor/github.com/tidwall/resp/resp.go generated vendored Normal file
View File

@@ -0,0 +1,619 @@
package resp
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"strconv"
)
const bufsz = 4096
// Type represents a Value type
type Type byte
const (
SimpleString Type = '+'
Error Type = '-'
Integer Type = ':'
BulkString Type = '$'
Array Type = '*'
)
// TypeName returns name of the underlying RESP type.
func (t Type) String() string {
switch t {
default:
return "Unknown"
case '+':
return "SimpleString"
case '-':
return "Error"
case ':':
return "Integer"
case '$':
return "BulkString"
case '*':
return "Array"
}
}
// Value represents the data of a valid RESP type.
type Value struct {
typ Type
integer int
str []byte
array []Value
null bool
}
// Integer converts Value to an int. If Value cannot be converted, Zero is returned.
func (v Value) Integer() int {
switch v.typ {
default:
n, _ := strconv.ParseInt(v.String(), 10, 64)
return int(n)
case ':':
return v.integer
}
}
// String converts Value to a string.
func (v Value) String() string {
if v.typ == '$' {
return string(v.str)
}
switch v.typ {
case '+', '-':
return string(v.str)
case ':':
return strconv.FormatInt(int64(v.integer), 10)
case '*':
return fmt.Sprintf("%v", v.array)
}
return ""
}
// Bytes converts the Value to a byte array. An empty string is converted to a non-nil empty byte array. If it's a RESP Null value, nil is returned.
func (v Value) Bytes() []byte {
switch v.typ {
default:
return []byte(v.String())
case '$', '+', '-':
return v.str
}
}
// Float converts Value to a float64. If Value cannot be converted, Zero is returned.
func (v Value) Float() float64 {
switch v.typ {
default:
f, _ := strconv.ParseFloat(v.String(), 64)
return f
case ':':
return float64(v.integer)
}
}
// IsNull indicates whether or not the base value is null.
func (v Value) IsNull() bool {
return v.null
}
// Bool converts Value to an bool. If Value cannot be converted, false is returned.
func (v Value) Bool() bool {
return v.Integer() != 0
}
// Error converts the Value to an error. If Value is not an error, nil is returned.
func (v Value) Error() error {
switch v.typ {
case '-':
return errors.New(string(v.str))
}
return nil
}
// Array converts the Value to a an array. If Value is not an array or when it's is a RESP Null value, nil is returned.
func (v Value) Array() []Value {
if v.typ == '*' && !v.null {
return v.array
}
return nil
}
// Type returns the underlying RESP type. The following types are represent valid RESP values.
//
// '+' SimpleString
// '-' Error
// ':' Integer
// '$' BulkString
// '*' Array
func (v Value) Type() Type {
return v.typ
}
func marshalSimpleRESP(typ Type, b []byte) ([]byte, error) {
bb := make([]byte, 3+len(b))
bb[0] = byte(typ)
copy(bb[1:], b)
bb[1+len(b)+0] = '\r'
bb[1+len(b)+1] = '\n'
return bb, nil
}
func marshalBulkRESP(v Value) ([]byte, error) {
if v.null {
return []byte("$-1\r\n"), nil
}
szb := []byte(strconv.FormatInt(int64(len(v.str)), 10))
bb := make([]byte, 5+len(szb)+len(v.str))
bb[0] = '$'
copy(bb[1:], szb)
bb[1+len(szb)+0] = '\r'
bb[1+len(szb)+1] = '\n'
copy(bb[1+len(szb)+2:], v.str)
bb[1+len(szb)+2+len(v.str)+0] = '\r'
bb[1+len(szb)+2+len(v.str)+1] = '\n'
return bb, nil
}
func marshalArrayRESP(v Value) ([]byte, error) {
if v.null {
return []byte("*-1\r\n"), nil
}
szb := []byte(strconv.FormatInt(int64(len(v.array)), 10))
var buf bytes.Buffer
buf.Grow(3 + len(szb) + 16*len(v.array)) // prime the buffer
buf.WriteByte('*')
buf.Write(szb)
buf.WriteByte('\r')
buf.WriteByte('\n')
for i := 0; i < len(v.array); i++ {
data, err := v.array[i].MarshalRESP()
if err != nil {
return nil, err
}
buf.Write(data)
}
return buf.Bytes(), nil
}
func marshalAnyRESP(v Value) ([]byte, error) {
switch v.typ {
default:
if v.typ == 0 && v.null {
return []byte("$-1\r\n"), nil
}
return nil, errors.New("unknown resp type encountered")
case '-', '+':
return marshalSimpleRESP(v.typ, v.str)
case ':':
return marshalSimpleRESP(v.typ, []byte(strconv.FormatInt(int64(v.integer), 10)))
case '$':
return marshalBulkRESP(v)
case '*':
return marshalArrayRESP(v)
}
}
// Equals compares one value to another value.
func (v Value) Equals(value Value) bool {
data1, err := v.MarshalRESP()
if err != nil {
return false
}
data2, err := value.MarshalRESP()
if err != nil {
return false
}
return string(data1) == string(data2)
}
// MarshalRESP returns the original serialized byte representation of Value.
// For more information on this format please see http://redis.io/topics/protocol.
func (v Value) MarshalRESP() ([]byte, error) {
return marshalAnyRESP(v)
}
var nullValue = Value{null: true}
type errProtocol struct{ msg string }
func (err errProtocol) Error() string {
return "Protocol error: " + err.msg
}
// Reader is a specialized RESP Value type reader.
type Reader struct {
rd *bufio.Reader
}
// NewReader returns a Reader for reading Value types.
func NewReader(rd io.Reader) *Reader {
r := &Reader{rd: bufio.NewReader(rd)}
return r
}
// ReadValue reads the next Value from Reader.
func (rd *Reader) ReadValue() (value Value, n int, err error) {
value, _, n, err = rd.readValue(false, false)
return
}
// ReadMultiBulk reads the next multi bulk Value from Reader.
// A multi bulk value is a RESP array that contains one or more bulk strings.
// For more information on RESP arrays and strings please see http://redis.io/topics/protocol.
func (rd *Reader) ReadMultiBulk() (value Value, telnet bool, n int, err error) {
return rd.readValue(true, false)
}
func (rd *Reader) readValue(multibulk, child bool) (val Value, telnet bool, n int, err error) {
var rn int
var c byte
c, err = rd.rd.ReadByte()
if err != nil {
return nullValue, false, n, err
}
n++
if c == '*' {
val, rn, err = rd.readArrayValue(multibulk)
} else if multibulk && !child {
telnet = true
} else {
switch c {
default:
if multibulk && child {
return nullValue, telnet, n, &errProtocol{"expected '$', got '" + string(c) + "'"}
}
if child {
return nullValue, telnet, n, &errProtocol{"unknown first byte"}
}
telnet = true
case '-', '+':
val, rn, err = rd.readSimpleValue(c)
case ':':
val, rn, err = rd.readIntegerValue()
case '$':
val, rn, err = rd.readBulkValue()
}
}
if telnet {
n--
rd.rd.UnreadByte()
val, rn, err = rd.readTelnetMultiBulk()
if err == nil {
telnet = true
}
}
n += rn
if err == io.EOF {
return nullValue, telnet, n, io.ErrUnexpectedEOF
}
return val, telnet, n, err
}
func (rd *Reader) readTelnetMultiBulk() (v Value, n int, err error) {
values := make([]Value, 0, 8)
var c byte
var bline []byte
var quote, mustspace bool
for {
c, err = rd.rd.ReadByte()
if err != nil {
return nullValue, n, err
}
n += 1
if c == '\n' {
if len(bline) > 0 && bline[len(bline)-1] == '\r' {
bline = bline[:len(bline)-1]
}
break
}
if mustspace && c != ' ' {
return nullValue, n, &errProtocol{"unbalanced quotes in request"}
}
if c == ' ' {
if quote {
bline = append(bline, c)
} else {
values = append(values, Value{typ: '$', str: bline})
bline = nil
}
} else if c == '"' {
if quote {
mustspace = true
} else {
if len(bline) > 0 {
return nullValue, n, &errProtocol{"unbalanced quotes in request"}
}
quote = true
}
} else {
bline = append(bline, c)
}
}
if quote {
return nullValue, n, &errProtocol{"unbalanced quotes in request"}
}
if len(bline) > 0 {
values = append(values, Value{typ: '$', str: bline})
}
return Value{typ: '*', array: values}, n, nil
}
func (rd *Reader) readSimpleValue(typ byte) (val Value, n int, err error) {
var line []byte
line, n, err = rd.readLine()
if err != nil {
return nullValue, n, err
}
return Value{typ: Type(typ), str: line}, n, nil
}
func (rd *Reader) readLine() (line []byte, n int, err error) {
for {
b, err := rd.rd.ReadBytes('\n')
if err != nil {
return nil, 0, err
}
n += len(b)
line = append(line, b...)
if len(line) >= 2 && line[len(line)-2] == '\r' {
break
}
}
return line[:len(line)-2], n, nil
}
func (rd *Reader) readBulkValue() (val Value, n int, err error) {
var rn int
var l int
l, rn, err = rd.readInt()
n += rn
if err != nil {
if _, ok := err.(*errProtocol); ok {
return nullValue, n, &errProtocol{"invalid bulk length"}
}
return nullValue, n, err
}
if l < 0 {
return Value{typ: '$', null: true}, n, nil
}
if l > 512*1024*1024 {
return nullValue, n, &errProtocol{"invalid bulk length"}
}
b := make([]byte, l+2)
rn, err = io.ReadFull(rd.rd, b)
n += rn
if err != nil {
return nullValue, n, err
}
if b[l] != '\r' || b[l+1] != '\n' {
return nullValue, n, &errProtocol{"invalid bulk line ending"}
}
return Value{typ: '$', str: b[:l]}, n, nil
}
func (rd *Reader) readArrayValue(multibulk bool) (val Value, n int, err error) {
var rn int
var l int
l, rn, err = rd.readInt()
n += rn
if err != nil || l > 1024*1024 {
if _, ok := err.(*errProtocol); ok {
if multibulk {
return nullValue, n, &errProtocol{"invalid multibulk length"}
}
return nullValue, n, &errProtocol{"invalid array length"}
}
return nullValue, n, err
}
if l < 0 {
return Value{typ: '*', null: true}, n, nil
}
var aval Value
vals := make([]Value, l)
for i := 0; i < l; i++ {
aval, _, rn, err = rd.readValue(multibulk, true)
n += rn
if err != nil {
return nullValue, n, err
}
vals[i] = aval
}
return Value{typ: '*', array: vals}, n, nil
}
func (rd *Reader) readIntegerValue() (val Value, n int, err error) {
var l int
l, n, err = rd.readInt()
if err != nil {
if _, ok := err.(*errProtocol); ok {
return nullValue, n, &errProtocol{"invalid integer"}
}
return nullValue, n, err
}
return Value{typ: ':', integer: l}, n, nil
}
func (rd *Reader) readInt() (x int, n int, err error) {
line, n, err := rd.readLine()
if err != nil {
return 0, 0, err
}
i64, err := strconv.ParseInt(string(line), 10, 64)
if err != nil {
return 0, n, err
}
return int(i64), n, nil
}
// AnyValue returns a RESP value from an interface. This function infers the types. Arrays are not allowed.
func AnyValue(v interface{}) Value {
switch v := v.(type) {
default:
return StringValue(fmt.Sprintf("%v", v))
case nil:
return NullValue()
case int:
return IntegerValue(int(v))
case uint:
return IntegerValue(int(v))
case int8:
return IntegerValue(int(v))
case uint8:
return IntegerValue(int(v))
case int16:
return IntegerValue(int(v))
case uint16:
return IntegerValue(int(v))
case int32:
return IntegerValue(int(v))
case uint32:
return IntegerValue(int(v))
case int64:
return IntegerValue(int(v))
case uint64:
return IntegerValue(int(v))
case bool:
return BoolValue(v)
case float32:
return FloatValue(float64(v))
case float64:
return FloatValue(float64(v))
case []byte:
return BytesValue(v)
case string:
return StringValue(v)
}
}
// SimpleStringValue returns a RESP simple string. A simple string has no new lines. The carriage return and new line characters are replaced with spaces.
func SimpleStringValue(s string) Value { return Value{typ: '+', str: []byte(formSingleLine(s))} }
// BytesValue returns a RESP bulk string. A bulk string can represent any data.
func BytesValue(b []byte) Value { return Value{typ: '$', str: b} }
// StringValue returns a RESP bulk string. A bulk string can represent any data.
func StringValue(s string) Value { return Value{typ: '$', str: []byte(s)} }
// NullValue returns a RESP null bulk string.
func NullValue() Value { return Value{typ: '$', null: true} }
// ErrorValue returns a RESP error.
func ErrorValue(err error) Value {
if err == nil {
return Value{typ: '-'}
}
return Value{typ: '-', str: []byte(formSingleLine(err.Error()))}
}
// IntegerValue returns a RESP integer.
func IntegerValue(i int) Value { return Value{typ: ':', integer: i} }
// BoolValue returns a RESP integer representation of a bool.
func BoolValue(t bool) Value {
if t {
return Value{typ: ':', integer: 1}
}
return Value{typ: ':', integer: 0}
}
// FloatValue returns a RESP bulk string representation of a float.
func FloatValue(f float64) Value { return StringValue(strconv.FormatFloat(f, 'f', -1, 64)) }
// ArrayValue returns a RESP array.
func ArrayValue(vals []Value) Value { return Value{typ: '*', array: vals} }
func formSingleLine(s string) string {
var clean bool
for i := 0; i < len(s); i++ {
if s[i] < ' ' {
clean = true
break
}
}
if !clean {
return s
}
b := []byte(s)
for i := 0; i < len(b); i++ {
if b[i] < ' ' {
b[i] = ' '
}
}
return string(b)
}
// MultiBulkValue returns a RESP array which contains one or more bulk strings.
// For more information on RESP arrays and strings please see http://redis.io/topics/protocol.
func MultiBulkValue(commandName string, args ...interface{}) Value {
vals := make([]Value, len(args)+1)
vals[0] = StringValue(commandName)
for i, arg := range args {
if rval, ok := arg.(Value); ok && rval.Type() == BulkString {
vals[i+1] = rval
continue
}
switch arg := arg.(type) {
default:
vals[i+1] = StringValue(fmt.Sprintf("%v", arg))
case []byte:
vals[i+1] = StringValue(string(arg))
case string:
vals[i+1] = StringValue(arg)
case nil:
vals[i+1] = NullValue()
}
}
return ArrayValue(vals)
}
// Writer is a specialized RESP Value type writer.
type Writer struct {
wr io.Writer
}
// NewWriter returns a new Writer.
func NewWriter(wr io.Writer) *Writer {
return &Writer{wr}
}
// WriteValue writes a RESP Value.
func (wr *Writer) WriteValue(v Value) error {
b, err := v.MarshalRESP()
if err != nil {
return err
}
_, err = wr.wr.Write(b)
return nil
}
// WriteSimpleString writes a RESP simple string. A simple string has no new lines. The carriage return and new line characters are replaced with spaces.
func (wr *Writer) WriteSimpleString(s string) error { return wr.WriteValue(SimpleStringValue(s)) }
// WriteBytes writes a RESP bulk string. A bulk string can represent any data.
func (wr *Writer) WriteBytes(b []byte) error { return wr.WriteValue(BytesValue(b)) }
// WriteString writes a RESP bulk string. A bulk string can represent any data.
func (wr *Writer) WriteString(s string) error { return wr.WriteValue(StringValue(s)) }
// WriteNull writes a RESP null bulk string.
func (wr *Writer) WriteNull() error { return wr.WriteValue(NullValue()) }
// WriteError writes a RESP error.
func (wr *Writer) WriteError(err error) error { return wr.WriteValue(ErrorValue(err)) }
// WriteInteger writes a RESP integer.
func (wr *Writer) WriteInteger(i int) error { return wr.WriteValue(IntegerValue(i)) }
// WriteArray writes a RESP array.
func (wr *Writer) WriteArray(vals []Value) error { return wr.WriteValue(ArrayValue(vals)) }
// WriteMultiBulk writes a RESP array which contains one or more bulk strings.
// For more information on RESP arrays and strings please see http://redis.io/topics/protocol.
func (wr *Writer) WriteMultiBulk(commandName string, args ...interface{}) error {
return wr.WriteValue(MultiBulkValue(commandName, args...))
}

135
vendor/github.com/tidwall/resp/server.go generated vendored Normal file
View File

@@ -0,0 +1,135 @@
package resp
import (
"errors"
"io"
"net"
"strings"
"sync"
)
// Server represents a RESP server which handles reading RESP Values.
type Server struct {
mu sync.RWMutex
handlers map[string]func(conn *Conn, args []Value) bool
accept func(conn *Conn) bool
}
// Conn represents a RESP network connection.
type Conn struct {
*Reader
*Writer
base net.Conn
RemoteAddr string
}
// NewConn returns a Conn.
func NewConn(conn net.Conn) *Conn {
return &Conn{
Reader: NewReader(conn),
Writer: NewWriter(conn),
base: conn,
RemoteAddr: conn.RemoteAddr().String(),
}
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
handlers: make(map[string]func(conn *Conn, args []Value) bool),
}
}
// HandleFunc registers the handler function for the given command.
// The conn parameter is a Conn type and it can be used to read and write further RESP messages from and to the connection.
// Returning false will close the connection.
func (s *Server) HandleFunc(command string, handler func(conn *Conn, args []Value) bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.handlers[strings.ToUpper(command)] = handler
}
// AcceptFunc registers a function for accepting connections.
// Calling this function is optional and it allows for total control over reading and writing RESP Values from and to the connections.
// Returning false will close the connection.
func (s *Server) AcceptFunc(accept func(conn *Conn) bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.accept = accept
}
// ListenAndServe listens on the TCP network address addr for incoming connections.
func (s *Server) ListenAndServe(addr string) error {
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
return err
}
go func() {
err = s.handleConn(conn)
if err != nil {
if _, ok := err.(*errProtocol); ok {
io.WriteString(conn, "-ERR "+formSingleLine(err.Error())+"\r\n")
} else {
io.WriteString(conn, "-ERR unknown error\r\n")
}
}
conn.Close()
}()
}
}
func (s *Server) handleConn(nconn net.Conn) error {
conn := NewConn(nconn)
s.mu.RLock()
accept := s.accept
s.mu.RUnlock()
if accept != nil {
if !accept(conn) {
return nil
}
}
for {
v, _, _, err := conn.ReadMultiBulk()
if err != nil {
return err
}
values := v.Array()
if len(values) == 0 {
continue
}
lccommandName := values[0].String()
commandName := strings.ToUpper(lccommandName)
s.mu.RLock()
h := s.handlers[commandName]
s.mu.RUnlock()
switch commandName {
case "QUIT":
if h == nil {
conn.WriteSimpleString("OK")
return nil
}
case "PING":
if h == nil {
if err := conn.WriteSimpleString("PONG"); err != nil {
return err
}
continue
}
}
if h == nil {
if err := conn.WriteError(errors.New("ERR unknown command '" + lccommandName + "'")); err != nil {
return err
}
} else {
if !h(conn, values) {
return nil
}
}
}
}

3
vendor/modules.txt vendored
View File

@@ -1,3 +1,6 @@
# github.com/tidwall/resp v0.1.1
## explicit; go 1.19
github.com/tidwall/resp
# gopkg.in/yaml.v3 v3.0.1
## explicit
gopkg.in/yaml.v3