diff --git a/gnet/parser.go b/gnet/parser.go new file mode 100644 index 0000000..54b4a44 --- /dev/null +++ b/gnet/parser.go @@ -0,0 +1,122 @@ +package gnet + +import ( + "errors" + "io" + "strconv" + "strings" +) + +func Parse(r io.Reader) ([][]byte, error) { + // 读取起始字符 '*' + buf := make([]byte, 1) + _, err := io.ReadFull(r, buf) + if err != nil { + return nil, err + } + if buf[0] != '*' { + // try text protocol + buf2, err := readLine(r) + if err != nil { + return nil, err + } + buf = append(buf, buf2...) + line := strings.Split(string(buf), " ") + result := make([][]byte, len(line)) + for i, s := range line { + result[i] = []byte(s) + } + return result, nil + } + + // 读取参数数量 + count, err := readInteger(r) + if err != nil { + return nil, err + } + if count < 0 { + return nil, nil + } + + // 读取每个参数 + result := make([][]byte, count) + for i := 0; i < count; i++ { + // 读取类型前缀 + _, err := io.ReadFull(r, buf) + if err != nil { + return nil, err + } + + switch buf[0] { + case '$': // Bulk String + strLen, err := readInteger(r) + if err != nil { + return nil, err + } + if strLen < 0 { + result[i] = nil // Null Bulk String + continue + } + + data := make([]byte, strLen+2) + _, err = io.ReadFull(r, data) + if err != nil { + return nil, err + } + result[i] = data[:strLen] + + // case '+', ':': // Simple String or Integer + // simpleStr, err := readLine(r) + // if err != nil { + // return nil, err + // } + // result[i] = []byte(simpleStr) + + // case '-': // Error + // errStr, err := readLine(r) + // if err != nil { + // return nil, err + // } + // result[i] = []byte(errStr) + + default: + return nil, errors.New("unsupported RESP type") + } + } + + return result, nil +} + +func readInteger(r io.Reader) (int, error) { + line, err := readLine(r) + if err != nil { + return 0, err + } + return strconv.Atoi(string(line)) +} + +func readLine(r io.Reader) ([]byte, error) { + var line []byte + buf := make([]byte, 1) + + for { + _, err := io.ReadFull(r, buf) + if err != nil { + return nil, err + } + + switch buf[0] { + case '\r': + _, err := io.ReadFull(r, buf) + if err != nil { + return nil, errors.New("unexpected EOF after \\r") + } + if buf[0] != '\n' { + return nil, errors.New("expected \\n after \\r") + } + return line, nil + default: + line = append(line, buf[0]) + } + } +} diff --git a/gnet/parser_test.go b/gnet/parser_test.go new file mode 100644 index 0000000..366e6bc --- /dev/null +++ b/gnet/parser_test.go @@ -0,0 +1,41 @@ +package gnet + +import ( + "bytes" + "fmt" + "testing" +) + +func BenchmarkParseSETCommand(b *testing.B) { + valueSizes := []int{10, 100, 1000, 10000} + + for _, size := range valueSizes { + value := bytes.Repeat([]byte("a"), size) + cmd := []byte(fmt.Sprintf("*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$%d\r\n%s\r\n", len(value), value)) + + b.Run("value_size_"+formatSize(size), func(subB *testing.B) { + subB.ResetTimer() + + for i := 0; i < subB.N; i++ { + reader := bytes.NewReader(cmd) + _, err := Parse(reader) + if err != nil { + subB.Fatalf("解析失败: %v", err) + } + } + }) + } +} + +func formatSize(size int) string { + units := []string{"B", "KB", "MB"} + unitIndex := 0 + floatSize := float64(size) + + for floatSize >= 1024 && unitIndex < len(units)-1 { + floatSize /= 1024 + unitIndex++ + } + + return fmt.Sprintf("%.0f%s", floatSize, units[unitIndex]) +} diff --git a/gnet/server.go b/gnet/server.go new file mode 100644 index 0000000..b048325 --- /dev/null +++ b/gnet/server.go @@ -0,0 +1,64 @@ +package gnet + +import ( + "sync/atomic" + + "github.com/hdt3213/godis/interface/database" + "github.com/hdt3213/godis/interface/redis" + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/redis/connection" + "github.com/panjf2000/gnet/v2" +) + +type GnetServer struct { + gnet.BuiltinEventEngine + eng gnet.Engine + connected int32 + db database.DB +} + +func NewGnetServer(db database.DB) *GnetServer { + return &GnetServer{ + db: db, + } +} + +func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) { + s.eng = eng + return +} + +func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + client := connection.NewConn(c) + c.SetContext(client) + atomic.AddInt32(&s.connected, 1) + return +} + +func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { + if err != nil { + logger.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err) + } + atomic.AddInt32(&s.connected, -1) + conn := c.Context().(redis.Connection) + s.db.AfterClientClose(conn) + return +} + +func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) { + conn := c.Context().(redis.Connection) + cmdLine, err := Parse(c) + if err != nil { + logger.Infof("parse command line failed: %v", err) + return gnet.Close + } + if len(cmdLine) == 0 { + return gnet.None + } + result := s.db.Exec(conn, cmdLine) + buffer := result.ToBytes() + if len(buffer) > 0 { + c.Write(buffer) + } + return gnet.None +} diff --git a/go.mod b/go.mod index a92b251..c5422c6 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,14 @@ require ( github.com/klauspost/cpuid/v2 v2.2.9 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect + github.com/panjf2000/ants/v2 v2.11.0 // indirect + github.com/panjf2000/gnet/v2 v2.7.2 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.13.0 // indirect - golang.org/x/sys v0.29.0 // indirect + golang.org/x/sync v0.11.0 // indirect + golang.org/x/sys v0.30.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) diff --git a/go.sum b/go.sum index ee4c849..779a787 100644 --- a/go.sum +++ b/go.sum @@ -99,6 +99,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/panjf2000/ants/v2 v2.11.0 h1:sHrqEwTBQTQ2w6PMvbMfvBtVUuhsaYPzUmAYDLYmJPg= +github.com/panjf2000/ants/v2 v2.11.0/go.mod h1:V9HhTupTWxcaRmIglJvGwvzqXUTnIZW9uO6q4hAfApw= +github.com/panjf2000/gnet/v2 v2.7.2 h1:c+QhXBKi/Qfdi4fh8ju6xiShGQHS1lHSEk6euFzJaIk= +github.com/panjf2000/gnet/v2 v2.7.2/go.mod h1:PIMw/8ILZsN/4K11bqDtSE1rEVPoFtjFlc0Q4edkncA= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -134,9 +138,16 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/arch v0.13.0 h1:KCkqVVV1kGg0X87TFysjCJ8MxtZEIU4Ja/yXGeoECdA= @@ -150,6 +161,8 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -166,6 +179,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= @@ -173,6 +188,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/godis b/godis new file mode 100755 index 0000000..5062e52 Binary files /dev/null and b/godis differ diff --git a/interface/database/db.go b/interface/database/db.go index 9fc2d89..8054bd4 100644 --- a/interface/database/db.go +++ b/interface/database/db.go @@ -15,7 +15,6 @@ type DB interface { Exec(client redis.Connection, cmdLine [][]byte) redis.Reply AfterClientClose(c redis.Connection) Close() - LoadRDB(dec *core.Decoder) error } // KeyEventCallback will be called back on key event, such as key inserted or deleted @@ -25,6 +24,7 @@ type KeyEventCallback func(dbIndex int, key string, entity *DataEntity) // DBEngine is the embedding storage engine exposing more methods for complex application type DBEngine interface { DB + LoadRDB(dec *core.Decoder) error ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine diff --git a/interface/tcp/handler.go b/interface/tcp/handler.go index 8c649bc..cdb9524 100644 --- a/interface/tcp/handler.go +++ b/interface/tcp/handler.go @@ -3,6 +3,7 @@ package tcp import ( "context" "net" + ) // HandleFunc represents application handler function diff --git a/main.go b/main.go index c0b6b5a..6803561 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,17 @@ import ( "fmt" "os" + "github.com/hdt3213/godis/cluster" "github.com/hdt3213/godis/config" + database2 "github.com/hdt3213/godis/database" + "github.com/hdt3213/godis/gnet" + "github.com/hdt3213/godis/interface/database" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/utils" - RedisServer "github.com/hdt3213/godis/redis/server" - "github.com/hdt3213/godis/tcp" + + // RedisServer "github.com/hdt3213/godis/redis/server" + // "github.com/hdt3213/godis/tcp" + gnetv2 "github.com/panjf2000/gnet/v2" ) var banner = ` @@ -51,10 +57,23 @@ func main() { } else { config.SetupConfig(configFilename) } - err := tcp.ListenAndServeWithSignal(&tcp.Config{ - Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), - }, RedisServer.MakeHandler()) + listenAddr := fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port) + // err := tcp.ListenAndServeWithSignal(&tcp.Config{ + // Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port), + // }, RedisServer.MakeHandler()) + // if err != nil { + // logger.Error(err) + // } + + var db database.DB + if config.Properties.ClusterEnable { + db = cluster.MakeCluster() + } else { + db = database2.NewStandaloneServer() + } + server := gnet.NewGnetServer(db) + err := gnetv2.Run(server, "tcp://" + listenAddr, gnetv2.WithMulticore(true)) if err != nil { - logger.Error(err) + logger.Errorf("start server failed: %v", err) } } diff --git a/redis.conf b/redis.conf index deaf734..8e886c8 100644 --- a/redis.conf +++ b/redis.conf @@ -2,7 +2,7 @@ bind 0.0.0.0 port 6399 maxclients 128 -appendonly yes +appendonly no appendfilename appendonly.aof appendfsync everysec aof-use-rdb-preamble yes diff --git a/redis/connection/conn.go b/redis/connection/conn.go index 9a6ac20..7bf27dc 100644 --- a/redis/connection/conn.go +++ b/redis/connection/conn.go @@ -1,11 +1,12 @@ package connection import ( - "github.com/hdt3213/godis/lib/logger" - "github.com/hdt3213/godis/lib/sync/wait" "net" "sync" "time" + + "github.com/hdt3213/godis/lib/logger" + "github.com/hdt3213/godis/lib/sync/wait" ) const ( @@ -83,6 +84,7 @@ func NewConn(conn net.Conn) *Connection { return c } + // Write sends response to client over tcp connection func (c *Connection) Write(b []byte) (int, error) { if len(b) == 0 { diff --git a/redis/server/server.go b/redis/server/server.go index 29dd10b..abee86d 100644 --- a/redis/server/server.go +++ b/redis/server/server.go @@ -15,6 +15,7 @@ import ( "github.com/hdt3213/godis/config" database2 "github.com/hdt3213/godis/database" "github.com/hdt3213/godis/interface/database" + "github.com/hdt3213/godis/interface/redis" "github.com/hdt3213/godis/lib/logger" "github.com/hdt3213/godis/lib/sync/atomic" "github.com/hdt3213/godis/redis/connection" @@ -52,6 +53,7 @@ func (h *Handler) closeClient(client *connection.Connection) { h.activeConn.Delete(client) } + // Handle receives and executes redis commands func (h *Handler) Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() {