diff --git a/client/client.go b/client/client.go index 41da402..ba6d155 100644 --- a/client/client.go +++ b/client/client.go @@ -34,7 +34,10 @@ const ( ) // ServiceError is an error from server. -type ServiceError error +type ServiceError interface { + Error() string + IsServiceError() bool +} var ClientErrorFunc func(e string) ServiceError @@ -44,6 +47,10 @@ func (s strErr) Error() string { return string(s) } +func (s strErr) IsServiceError() bool { + return true +} + // DefaultOption is a common option configuration for client. var DefaultOption = Option{ Retries: 3, diff --git a/client/connection_nonquic.go b/client/connection_nonquic.go index baa4e00..7c1881a 100644 --- a/client/connection_nonquic.go +++ b/client/connection_nonquic.go @@ -1,3 +1,4 @@ +//go:build !quic // +build !quic package client diff --git a/client/connection_quic.go b/client/connection_quic.go index 6a8229a..602c2bf 100644 --- a/client/connection_quic.go +++ b/client/connection_quic.go @@ -1,3 +1,4 @@ +//go:build quic // +build quic package client @@ -21,9 +22,7 @@ func newDirectQuicConn(c *Client, network, address string) (net.Conn, error) { tlsConf.NextProtos = []string{"rpcx"} } - quicConfig := &quic.Config{ - KeepAlive: c.option.Heartbeat, - } + quicConfig := &quic.Config{} return quick.Dial(address, tlsConf, quicConfig) } diff --git a/go.mod b/go.mod index 330934b..f307032 100644 --- a/go.mod +++ b/go.mod @@ -38,12 +38,13 @@ require ( github.com/xtaci/kcp-go v5.4.20+incompatible go.opentelemetry.io/otel v1.7.0 go.opentelemetry.io/otel/trace v1.7.0 - golang.org/x/net v0.0.0-20220630215102-69896b714898 + golang.org/x/net v0.0.0-20220708220712-1185a9018129 golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f google.golang.org/protobuf v1.28.0 ) require ( + github.com/alphadose/itogami v0.0.0-20220705100819-134f04183c42 // indirect github.com/armon/go-metrics v0.4.0 // indirect github.com/cenk/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect @@ -93,7 +94,7 @@ require ( github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/sys v0.0.0-20220702020025-31831981b65f // indirect + golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.11 // indirect golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect diff --git a/go.sum b/go.sum index 604707a..f384891 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alphadose/itogami v0.0.0-20220705100819-134f04183c42 h1:GLC/6Lfd6l1CRfkeUnR973fZBETOPQAtoaYGGl+Ho10= +github.com/alphadose/itogami v0.0.0-20220705100819-134f04183c42/go.mod h1:QDsatlDSUJB4sXxZsJEpawGnTDwSdvX27ZXqtuZY3WA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= @@ -505,6 +507,8 @@ golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220630215102-69896b714898 h1:K7wO6V1IrczY9QOQ2WkVpw4JQSwCd52UsxVEirZUfiw= golang.org/x/net v0.0.0-20220630215102-69896b714898/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220708220712-1185a9018129 h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0= +golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -564,6 +568,8 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220702020025-31831981b65f h1:xdsejrW/0Wf2diT5CPp3XmKUNbr7Xvw8kYilQ+6qjRY= golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d h1:/m5NbqQelATgoSPVC2Z23sR4kVNokFwDDyWh/3rGY+I= +golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/server/option.go b/server/option.go index 515112f..bbe697f 100644 --- a/server/option.go +++ b/server/option.go @@ -3,6 +3,8 @@ package server import ( "crypto/tls" "time" + + "github.com/alphadose/itogami" ) // OptionFn configures options of server. @@ -37,3 +39,19 @@ func WithWriteTimeout(writeTimeout time.Duration) OptionFn { s.writeTimeout = writeTimeout } } + +// WithPool sets goroutine pool. +func WithPool(n uint64) OptionFn { + return func(s *Server) { + if n > 0 { + s.pool = itogami.NewPool(n) + } + } +} + +// WithAsyncWrite sets AsyncWrite to true. +func WithAsyncWrite() OptionFn { + return func(s *Server) { + s.AsyncWrite = true + } +} diff --git a/server/server.go b/server/server.go index d433781..e32063a 100644 --- a/server/server.go +++ b/server/server.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/alphadose/itogami" "github.com/smallnest/rpcx/log" "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/share" @@ -78,6 +79,7 @@ type Server struct { DisableHTTPGateway bool // should disable http invoke or not. DisableJSONRPC bool // should disable json rpc or not. AsyncWrite bool // set true if your server only serves few clients + pool *itogami.Pool serviceMapMu sync.RWMutex serviceMap map[string]*service @@ -487,98 +489,105 @@ func (s *Server) serveConn(conn net.Conn) { continue } - // handle biz logics - go func() { - defer func() { - if r := recover(); r != nil { - log.Errorf("failed to handle the request: %v", r) - } - }() + if s.pool != nil { + s.pool.Submit(func() { + s.processOneRequest(ctx, req, conn, writeCh) + }) + } else { + go s.processOneRequest(ctx, req, conn, writeCh) + } + } +} - atomic.AddInt32(&s.handlerMsgNum, 1) - defer atomic.AddInt32(&s.handlerMsgNum, -1) +func (s *Server) processOneRequest(ctx *share.Context, req *protocol.Message, conn net.Conn, writeCh chan *[]byte) { + defer func() { + if r := recover(); r != nil { + log.Errorf("failed to handle the request: %v", r) + } + }() - // 心跳请求,直接处理返回 - if req.IsHeartbeat() { - s.Plugins.DoHeartbeatRequest(ctx, req) - req.SetMessageType(protocol.Response) - data := req.EncodeSlicePointer() + atomic.AddInt32(&s.handlerMsgNum, 1) + defer atomic.AddInt32(&s.handlerMsgNum, -1) - if s.writeTimeout != 0 { - conn.SetWriteDeadline(time.Now().Add(s.writeTimeout)) - } - conn.Write(*data) + // 心跳请求,直接处理返回 + if req.IsHeartbeat() { + s.Plugins.DoHeartbeatRequest(ctx, req) + req.SetMessageType(protocol.Response) + data := req.EncodeSlicePointer() - protocol.PutData(data) - protocol.FreeMsg(req) + if s.writeTimeout != 0 { + conn.SetWriteDeadline(time.Now().Add(s.writeTimeout)) + } + conn.Write(*data) - return - } + protocol.PutData(data) + protocol.FreeMsg(req) - cancelFunc := parseServerTimeout(ctx, req) - if cancelFunc != nil { - defer cancelFunc() - } + return + } - resMetadata := make(map[string]string) - if req.Metadata == nil { - req.Metadata = make(map[string]string) - } - ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata), - share.ResMetaDataKey, resMetadata) + cancelFunc := parseServerTimeout(ctx, req) + if cancelFunc != nil { + defer cancelFunc() + } - s.Plugins.DoPreHandleRequest(ctx, req) + resMetadata := make(map[string]string) + if req.Metadata == nil { + req.Metadata = make(map[string]string) + } + ctx = share.WithLocalValue(share.WithLocalValue(ctx, share.ReqMetaDataKey, req.Metadata), + share.ResMetaDataKey, resMetadata) - if share.Trace { - log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String()) - } + s.Plugins.DoPreHandleRequest(ctx, req) - // use handlers first - if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok { - sctx := NewContext(ctx, conn, req, writeCh) - err := handler(sctx) - if err != nil { - log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err) - } + if share.Trace { + log.Debugf("server handle request %+v from conn: %v", req, conn.RemoteAddr().String()) + } - protocol.FreeMsg(req) - return - } + // use handlers first + if handler, ok := s.router[req.ServicePath+"."+req.ServiceMethod]; ok { + sctx := NewContext(ctx, conn, req, writeCh) + err := handler(sctx) + if err != nil { + log.Errorf("[handler internal error]: servicepath: %s, servicemethod, err: %v", req.ServicePath, req.ServiceMethod, err) + } - res, err := s.handleRequest(ctx, req) - if err != nil { - if s.HandleServiceError != nil { - s.HandleServiceError(err) - } else { - log.Warnf("rpcx: failed to handle request: %v", err) - } - } + protocol.FreeMsg(req) + return + } - if !req.IsOneway() { - if len(resMetadata) > 0 { // copy meta in context to responses - meta := res.Metadata - if meta == nil { - res.Metadata = resMetadata - } else { - for k, v := range resMetadata { - if meta[k] == "" { - meta[k] = v - } - } + res, err := s.handleRequest(ctx, req) + if err != nil { + if s.HandleServiceError != nil { + s.HandleServiceError(err) + } else { + log.Warnf("rpcx: failed to handle request: %v", err) + } + } + + if !req.IsOneway() { + if len(resMetadata) > 0 { // copy meta in context to responses + meta := res.Metadata + if meta == nil { + res.Metadata = resMetadata + } else { + for k, v := range resMetadata { + if meta[k] == "" { + meta[k] = v } } - - s.sendResponse(ctx, conn, writeCh, err, req, res) } + } - if share.Trace { - log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String()) - } - - protocol.FreeMsg(req) - protocol.FreeMsg(res) - }() + s.sendResponse(ctx, conn, writeCh, err, req, res) } + + if share.Trace { + log.Debugf("server write response %+v for an request %+v from conn: %v", res, req, conn.RemoteAddr().String()) + } + + protocol.FreeMsg(req) + protocol.FreeMsg(res) } func (s *Server) serveAsyncWrite(conn net.Conn, writeCh chan *[]byte) {