mirror of
https://github.com/HDT3213/godis.git
synced 2025-10-04 16:32:41 +08:00
refactor code structure for gnet
This commit is contained in:
74
redis/server/gnet/server.go
Normal file
74
redis/server/gnet/server.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package gnet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/hdt3213/godis/interface/database"
|
||||
"github.com/hdt3213/godis/interface/redis"
|
||||
"github.com/hdt3213/godis/lib/logger"
|
||||
"github.com/hdt3213/godis/redis/connection"
|
||||
"github.com/hdt3213/godis/redis/parser"
|
||||
"github.com/panjf2000/gnet/v2"
|
||||
)
|
||||
|
||||
type GnetServer struct {
|
||||
gnet.BuiltinEventEngine
|
||||
eng gnet.Engine
|
||||
connected int32
|
||||
db database.DB
|
||||
}
|
||||
|
||||
func NewGnetServer(db database.DB) *GnetServer {
|
||||
return &GnetServer{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *GnetServer) Run(listenAddr string) error {
|
||||
return gnet.Run(s, "tcp://"+listenAddr, gnet.WithMulticore(true))
|
||||
}
|
||||
|
||||
func (s *GnetServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
|
||||
s.eng = eng
|
||||
return
|
||||
}
|
||||
|
||||
func (s *GnetServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
|
||||
client := connection.NewConn(c)
|
||||
c.SetContext(client)
|
||||
atomic.AddInt32(&s.connected, 1)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *GnetServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
|
||||
if err != nil {
|
||||
logger.Infof("error occurred on connection=%s, %v\n", c.RemoteAddr().String(), err)
|
||||
}
|
||||
atomic.AddInt32(&s.connected, -1)
|
||||
conn := c.Context().(redis.Connection)
|
||||
s.db.AfterClientClose(conn)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *GnetServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
|
||||
conn := c.Context().(redis.Connection)
|
||||
cmdLine, err := parser.ParseV2(c)
|
||||
if err != nil {
|
||||
logger.Infof("parse command line failed: %v", err)
|
||||
return gnet.Close
|
||||
}
|
||||
if len(cmdLine) == 0 {
|
||||
return gnet.None
|
||||
}
|
||||
result := s.db.Exec(conn, cmdLine)
|
||||
buffer := result.ToBytes()
|
||||
if len(buffer) > 0 {
|
||||
c.Write(buffer)
|
||||
}
|
||||
return gnet.None
|
||||
}
|
||||
|
||||
func (s *GnetServer) Close() {
|
||||
s.eng.Stop(context.Background())
|
||||
}
|
45
redis/server/gnet/server_test.go
Normal file
45
redis/server/gnet/server_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package gnet
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/hdt3213/godis/database"
|
||||
)
|
||||
|
||||
func TestListenAndServe(t *testing.T) {
|
||||
var err error
|
||||
listener, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
addr := listener.Addr().String()
|
||||
db := database.NewStandaloneServer()
|
||||
server := NewGnetServer(db)
|
||||
go server.Run(addr)
|
||||
|
||||
conn, err := net.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
_, err = conn.Write([]byte("PING\r\n"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
bufReader := bufio.NewReader(conn)
|
||||
line, _, err := bufReader.ReadLine()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if string(line) != "+PONG" {
|
||||
t.Error("get wrong response")
|
||||
return
|
||||
}
|
||||
conn.Close()
|
||||
server.Close()
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
package server
|
||||
package std
|
||||
|
||||
import (
|
||||
"github.com/hdt3213/godis/lib/utils"
|
@@ -1,4 +1,4 @@
|
||||
package server
|
||||
package std
|
||||
|
||||
/*
|
||||
* A tcp.Handler implements redis protocol
|
||||
@@ -13,14 +13,14 @@ import (
|
||||
|
||||
"github.com/hdt3213/godis/cluster"
|
||||
"github.com/hdt3213/godis/config"
|
||||
database2 "github.com/hdt3213/godis/database"
|
||||
"github.com/hdt3213/godis/interface/database"
|
||||
"github.com/hdt3213/godis/interface/redis"
|
||||
"github.com/hdt3213/godis/database"
|
||||
idatabase "github.com/hdt3213/godis/interface/database"
|
||||
"github.com/hdt3213/godis/lib/logger"
|
||||
"github.com/hdt3213/godis/lib/sync/atomic"
|
||||
"github.com/hdt3213/godis/redis/connection"
|
||||
"github.com/hdt3213/godis/redis/parser"
|
||||
"github.com/hdt3213/godis/redis/protocol"
|
||||
"github.com/hdt3213/godis/tcp"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -30,23 +30,29 @@ var (
|
||||
// Handler implements tcp.Handler and serves as a redis server
|
||||
type Handler struct {
|
||||
activeConn sync.Map // *client -> placeholder
|
||||
db database.DB
|
||||
db idatabase.DB
|
||||
closing atomic.Boolean // refusing new client and new request
|
||||
}
|
||||
|
||||
// MakeHandler creates a Handler instance
|
||||
func MakeHandler() *Handler {
|
||||
var db database.DB
|
||||
var db idatabase.DB
|
||||
if config.Properties.ClusterEnable {
|
||||
db = cluster.MakeCluster()
|
||||
} else {
|
||||
db = database2.NewStandaloneServer()
|
||||
db = database.NewStandaloneServer()
|
||||
}
|
||||
return &Handler{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func Serve(addr string, handler *Handler) error {
|
||||
return tcp.ListenAndServeWithSignal(&tcp.Config{
|
||||
Address: addr,
|
||||
}, handler)
|
||||
}
|
||||
|
||||
func (h *Handler) closeClient(client *connection.Connection) {
|
||||
_ = client.Close()
|
||||
h.db.AfterClientClose(client)
|
@@ -1,4 +1,4 @@
|
||||
package server
|
||||
package std
|
||||
|
||||
import (
|
||||
"bufio"
|
Reference in New Issue
Block a user