#743 refactor to improve the performance

This commit is contained in:
smallnest
2022-07-10 22:44:47 +08:00
parent 0f372c4f24
commit f5fc9d3a91
7 changed files with 121 additions and 80 deletions

View File

@@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"
"github.com/alphadose/itogami"
"github.com/smallnest/rpcx/log"
"github.com/smallnest/rpcx/protocol"
"github.com/smallnest/rpcx/share"
@@ -78,6 +79,7 @@ type Server struct {
DisableHTTPGateway bool // should disable http invoke or not.
DisableJSONRPC bool // should disable json rpc or not.
AsyncWrite bool // set true if your server only serves few clients
pool *itogami.Pool
serviceMapMu sync.RWMutex
serviceMap map[string]*service
@@ -487,98 +489,105 @@ func (s *Server) serveConn(conn net.Conn) {
continue
}
// handle biz logics
go func() {
defer func() {
if r := recover(); r != nil {
log.Errorf("failed to handle the request: %v", r)
}
}()
if s.pool != nil {
s.pool.Submit(func() {
s.processOneRequest(ctx, req, conn, writeCh)
})
} else {
go s.processOneRequest(ctx, req, conn, writeCh)
}
}
}
atomic.AddInt32(&s.handlerMsgNum, 1)
defer atomic.AddInt32(&s.handlerMsgNum, -1)
func (s *Server) processOneRequest(ctx *share.Context, req *protocol.Message, conn net.Conn, writeCh chan *[]byte) {
defer func() {
if r := recover(); r != nil {
log.Errorf("failed to handle the request: %v", r)
}
}()
// 心跳请求,直接处理返回
if req.IsHeartbeat() {
s.Plugins.DoHeartbeatRequest(ctx, req)
req.SetMessageType(protocol.Response)
data := req.EncodeSlicePointer()
atomic.AddInt32(&s.handlerMsgNum, 1)
defer atomic.AddInt32(&s.handlerMsgNum, -1)
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
// 心跳请求,直接处理返回
if req.IsHeartbeat() {
s.Plugins.DoHeartbeatRequest(ctx, req)
req.SetMessageType(protocol.Response)
data := req.EncodeSlicePointer()
protocol.PutData(data)
protocol.FreeMsg(req)
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
return
}
protocol.PutData(data)
protocol.FreeMsg(req)
cancelFunc := parseServerTimeout(ctx, req)
if cancelFunc != nil {
defer cancelFunc()
}
return
}
resMetadata := make(map[string]string)
if req.Metadata == nil {
req.Metadata = make(map[string]string)
}
ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata),
share.ResMetaDataKey, resMetadata)
cancelFunc := parseServerTimeout(ctx, req)
if cancelFunc != nil {
defer cancelFunc()
}
s.Plugins.DoPreHandleRequest(ctx, req)
resMetadata := make(map[string]string)
if req.Metadata == nil {
req.Metadata = make(map[string]string)
}
ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata),
share.ResMetaDataKey, resMetadata)
if share.Trace {
log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String())
}
s.Plugins.DoPreHandleRequest(ctx, req)
// use handlers first
if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok {
sctx := NewContext(ctx, conn, req, writeCh)
err := handler(sctx)
if err != nil {
log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err)
}
if share.Trace {
log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String())
}
protocol.FreeMsg(req)
return
}
// use handlers first
if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok {
sctx := NewContext(ctx, conn, req, writeCh)
err := handler(sctx)
if err != nil {
log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err)
}
res, err := s.handleRequest(ctx, req)
if err != nil {
if s.HandleServiceError != nil {
s.HandleServiceError(err)
} else {
log.Warnf("rpcx: failed to handle request: %v", err)
}
}
protocol.FreeMsg(req)
return
}
if !req.IsOneway() {
if len(resMetadata) > 0 { // copy meta in context to responses
meta := res.Metadata
if meta == nil {
res.Metadata = resMetadata
} else {
for k, v := range resMetadata {
if meta[k] == "" {
meta[k] = v
}
}
res, err := s.handleRequest(ctx, req)
if err != nil {
if s.HandleServiceError != nil {
s.HandleServiceError(err)
} else {
log.Warnf("rpcx: failed to handle request: %v", err)
}
}
if !req.IsOneway() {
if len(resMetadata) > 0 { // copy meta in context to responses
meta := res.Metadata
if meta == nil {
res.Metadata = resMetadata
} else {
for k, v := range resMetadata {
if meta[k] == "" {
meta[k] = v
}
}
s.sendResponse(ctx, conn, writeCh, err, req, res)
}
}
if share.Trace {
log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String())
}
protocol.FreeMsg(req)
protocol.FreeMsg(res)
}()
s.sendResponse(ctx, conn, writeCh, err, req, res)
}
if share.Trace {
log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String())
}
protocol.FreeMsg(req)
protocol.FreeMsg(res)
}
func (s *Server) serveAsyncWrite(conn net.Conn, writeCh chan *[]byte) {