diff --git a/README.md b/README.md index 38dc288..2c0b4f8 100644 --- a/README.md +++ b/README.md @@ -13,18 +13,20 @@ install the basic features: `go get -u -v github.com/smallnest/rpcx/...` -If you want to use `quic/kcp`, `zookeeper`, `etcd`, `consul` registry, use those tags to `go get` 、 `go build` or `go run`. For example, if you want to use all features, you can: +If you want to use `reuseport`、`quic`、`kcp`, `zookeeper`, `etcd`, `consul` registry, use those tags to `go get` 、 `go build` or `go run`. For example, if you want to use all features, you can: ```sh -go get -u -v -tags "udp zookeeper etcd consul ping" github.com/smallnest/rpcx/... +go get -u -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/... ``` **_tags_**: -- **udp**: support quic and kcp transport +- **quic**: support quic transport +- **kcp**: support kcp transport - **zookeeper**: support zookeeper register - **etcd**: support etcd register - **consul**: support consul register - **ping**: support network quality load balancing +- **reuseport**: support reuseport ## Features rpcx is a RPC framework like [Alibaba Dubbo](http://dubbo.io/) and [Weibo Motan](https://github.com/weibocom/motan). @@ -32,7 +34,7 @@ rpcx is a RPC framework like [Alibaba Dubbo](http://dubbo.io/) and [Weibo Motan] **rpcx 3.0** has been refactored for targets: 1. **Simple**: easy to learn, easy to develop, easy to intergate and easy to deploy 2. **Performance**: high perforamnce (>= grpc-go) -3. **Cross-platform**: support _raw slice of bytes_, _JSON_, _Protobuf_ and _MessagePack_. Theoretically it can be use in java, php, python, c/c++, node.js, c# and other platforms theoretically +3. **Cross-platform**: support _raw slice of bytes_, _JSON_, _Protobuf_ and _MessagePack_. Theoretically it can be use in java, php, python, c/c++, node.js, c# and other platforms 4. **Service discovery and service governance.**: support zookeeper, etcd and consul. It contains below features diff --git a/client/client.go b/client/client.go index d94fcf7..a0e2cc7 100644 --- a/client/client.go +++ b/client/client.go @@ -272,7 +272,6 @@ func (client *Client) send(ctx context.Context, call *Call) { call.done() return } - if len(data) > 1024 && client.option.CompressType == protocol.Gzip { data, err = util.Zip(data) if err != nil { @@ -288,7 +287,7 @@ func (client *Client) send(ctx context.Context, call *Call) { } data := req.Encode() - protocol.FreeMsg(req) + _, err := client.Conn.Write(data) if err != nil { client.mutex.Lock() @@ -301,6 +300,8 @@ func (client *Client) send(ctx context.Context, call *Call) { } } + protocol.FreeMsg(req) + if req.IsOneway() { client.mutex.Lock() call = client.pending[seq] diff --git a/protocol/message.go b/protocol/message.go index 1bd11df..82c9c27 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -426,7 +426,7 @@ func (m *Message) Decode(r io.Reader) error { // Reset clean data of this message but keep allocated data func (m *Message) Reset() { for i := 1; i < 12; i++ { - m.Header[0] = 0 + m.Header[i] = 0 } m.Metadata = nil m.Payload = m.Payload[:0] diff --git a/server/kcp.go b/server/kcp.go new file mode 100644 index 0000000..0b9ddc2 --- /dev/null +++ b/server/kcp.go @@ -0,0 +1,22 @@ +// +build kcp + +package server + +import ( + "errors" + "net" + + kcp "github.com/xtaci/kcp-go" +) + +func init() { + makeListeners["kcp"] = kcpMakeListener +} + +func kcpMakeListener(s *Server, address string) (ln net.Listener, err error) { + if s.Options == nil || s.Options["BlockCrypt"] == nil { + return nil, errors.New("KCP BlockCrypt must be configured in server.Options") + } + + return kcp.ListenWithOptions(address, s.Options["BlockCrypt"].(kcp.BlockCrypt), 10, 3) +} diff --git a/server/listener.go b/server/listener.go index 5f5dd4d..71e67e3 100644 --- a/server/listener.go +++ b/server/listener.go @@ -1,34 +1,41 @@ -// +build !windows -// +build !udp - package server import ( "crypto/tls" + "fmt" "net" - - reuseport "github.com/kavu/go_reuseport" ) +var makeListeners = make(map[string]MakeListener) + +func init() { + makeListeners["tcp"] = tcpMakeListener + makeListeners["http"] = tcpMakeListener +} + +// RegisterMakeListener registers a MakeListener for network. +func RegisterMakeListener(network string, ml MakeListener) { + makeListeners[network] = ml +} + +// MakeListener defines a listener generater. +type MakeListener func(s *Server, address string) (ln net.Listener, err error) + // block can be nil if the caller wishes to skip encryption in kcp. // tlsConfig can be nil iff we are not using network "quic". func (s *Server) makeListener(network, address string) (ln net.Listener, err error) { - switch network { - case "reuseport": - if validIP4(address) { - network = "tcp4" - } else { - network = "tcp6" - } - - ln, err = reuseport.NewReusablePortListener(network, address) - default: //tcp, http - if s.TLSConfig == nil { - ln, err = net.Listen(network, address) - } else { - ln, err = tls.Listen(network, address, s.TLSConfig) - } + ml := makeListeners[network] + if ml == nil { + return nil, fmt.Errorf("can not make listener for %s", network) + } + return ml(s, address) +} +func tcpMakeListener(s *Server, address string) (ln net.Listener, err error) { + if s.TLSConfig == nil { + ln, err = net.Listen("tcp", address) + } else { + ln, err = tls.Listen("tcp", address, s.TLSConfig) } return ln, err diff --git a/server/listener_all.go b/server/listener_all.go deleted file mode 100644 index 7b884ee..0000000 --- a/server/listener_all.go +++ /dev/null @@ -1,48 +0,0 @@ -// +build !windows -// +build udp - -package server - -import ( - "crypto/tls" - "errors" - "net" - - reuseport "github.com/kavu/go_reuseport" - quicconn "github.com/marten-seemann/quic-conn" - kcp "github.com/xtaci/kcp-go" -) - -// block can be nil if the caller wishes to skip encryption in kcp. -// tlsConfig can be nil iff we are not using network "quic". -func (s *Server) makeListener(network, address string) (ln net.Listener, err error) { - switch network { - case "kcp": - if s.Options == nil || s.Options["BlockCrypt"] == nil { - return nil, errors.New("KCP BlockCrypt must be configured in server.Options") - } - - ln, err = kcp.ListenWithOptions(address, s.Options["BlockCrypt"].(kcp.BlockCrypt), 10, 3) - case "reuseport": - if validIP4(address) { - network = "tcp4" - } else { - network = "tcp6" - } - - ln, err = reuseport.NewReusablePortListener(network, address) - case "quic": - if s.TLSConfig == nil { - return nil, errors.New("TLSConfig must be configured in server.Options") - } - ln, err = quicconn.Listen("udp", address, s.TLSConfig) - default: //tcp, http - if s.TLSConfig == nil { - ln, err = net.Listen(network, address) - } else { - ln, err = tls.Listen(network, address, s.TLSConfig) - } - } - - return ln, err -} diff --git a/server/listener_unix.go b/server/listener_unix.go new file mode 100644 index 0000000..04b0d7b --- /dev/null +++ b/server/listener_unix.go @@ -0,0 +1,23 @@ +// +build !windows + +package server + +import ( + + reuseport "github.com/kavu/go_reuseport" +) + +func init() { + makeListeners["reuseport"] = reuseportMakeListener +} + + +func reuseportMakeListener func(s *Server, address string) (ln net.Listener, err error) { + if validIP4(address) { + network = "tcp4" + } else { + network = "tcp6" + } + + return reuseport.NewReusablePortListener(network, address) +} \ No newline at end of file diff --git a/server/listener_windows.go b/server/listener_windows.go deleted file mode 100644 index b3cf272..0000000 --- a/server/listener_windows.go +++ /dev/null @@ -1,32 +0,0 @@ -// +build windows -// +build !udp - -package server - -import ( - "crypto/tls" - "net" -) - -// block can be nil if the caller wishes to skip encryption. -// tlsConfig can be nil iff we are not using network "quic". -func (s *Server) makeListener(network, address string) (ln net.Listener, err error) { - switch network { - case "reuseport": - if validIP4(address) { - network = "tcp4" - } else { - network = "tcp6" - } - - ln, err = net.Listen(network, address) - default: //tcp - if s.TLSConfig == nil { - ln, err = net.Listen(network, address) - } else { - ln, err = tls.Listen(network, address, s.TLSConfig) - } - } - - return ln, err -} diff --git a/server/listener_windows_all.go b/server/listener_windows_all.go deleted file mode 100644 index bc3928c..0000000 --- a/server/listener_windows_all.go +++ /dev/null @@ -1,47 +0,0 @@ -// +build windows -// +build udp - -package server - -import ( - "crypto/tls" - "errors" - "net" - - quicconn "github.com/marten-seemann/quic-conn" - kcp "github.com/xtaci/kcp-go" -) - -// block can be nil if the caller wishes to skip encryption. -// tlsConfig can be nil if we are not using network "quic". -func (s *Server) makeListener(network, address string) (ln net.Listener, err error) { - switch network { - case "kcp": - if s.Options == nil || s.Options["BlockCrypt"] == nil { - return nil, errors.New("KCP BlockCrypt must be configured in server.Options") - } - - ln, err = kcp.ListenWithOptions(address, s.Options["BlockCrypt"].(kcp.BlockCrypt), 10, 3) - case "reuseport": - if validIP4(address) { - network = "tcp4" - } else { - network = "tcp6" - } - - ln, err = net.Listen(network, address) - case "quic": - if s.TLSConfig == nil { - return nil, errors.New("TLSConfig must be configured in server.Options") - } - ln, err = quicconn.Listen("udp", address, s.TLSConfig) - default: //tcp - if s.TLSConfig == nil { - ln, err = net.Listen(network, address) - } else { - ln, err = tls.Listen(network, address, s.TLSConfig) - } - } - - return ln, err -} diff --git a/server/quic.go b/server/quic.go new file mode 100644 index 0000000..6e78176 --- /dev/null +++ b/server/quic.go @@ -0,0 +1,21 @@ +// +build quic + +package server + +import ( + "errors" + "net" + + quicconn "github.com/marten-seemann/quic-conn" +) + +func init() { + makeListeners["quic"] = quicMakeListener +} + +func quicMakeListener(s *Server, address string) (ln net.Listener, err error) { + if s.TLSConfig == nil { + return nil, errors.New("TLSConfig must be configured in server.Options") + } + return quicconn.Listen("udp", address, s.TLSConfig) +} diff --git a/server/server.go b/server/server.go index 7c6e230..efc78e6 100644 --- a/server/server.go +++ b/server/server.go @@ -329,7 +329,6 @@ func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Me req = protocol.GetPooledMsg() err = req.Decode(r) s.Plugins.DoPostReadRequest(ctx, req, err) - return req, err }