refactor makelisteners

This commit is contained in:
smallnest
2017-10-27 09:28:43 +08:00
parent 9f763e876e
commit a93e1c4be0
11 changed files with 103 additions and 155 deletions

View File

@@ -13,18 +13,20 @@ install the basic features:
`go get -u -v github.com/smallnest/rpcx/...` `go get -u -v github.com/smallnest/rpcx/...`
If you want to use `quic/kcp`, `zookeeper`, `etcd`, `consul` registry, use those tags to `go get``go build` or `go run`. For example, if you want to use all features, you can: If you want to use `reuseport``quic``kcp`, `zookeeper`, `etcd`, `consul` registry, use those tags to `go get``go build` or `go run`. For example, if you want to use all features, you can:
```sh ```sh
go get -u -v -tags "udp zookeeper etcd consul ping" github.com/smallnest/rpcx/... go get -u -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
``` ```
**_tags_**: **_tags_**:
- **udp**: support quic and kcp transport - **quic**: support quic transport
- **kcp**: support kcp transport
- **zookeeper**: support zookeeper register - **zookeeper**: support zookeeper register
- **etcd**: support etcd register - **etcd**: support etcd register
- **consul**: support consul register - **consul**: support consul register
- **ping**: support network quality load balancing - **ping**: support network quality load balancing
- **reuseport**: support reuseport
## Features ## Features
rpcx is a RPC framework like [Alibaba Dubbo](http://dubbo.io/) and [Weibo Motan](https://github.com/weibocom/motan). rpcx is a RPC framework like [Alibaba Dubbo](http://dubbo.io/) and [Weibo Motan](https://github.com/weibocom/motan).
@@ -32,7 +34,7 @@ rpcx is a RPC framework like [Alibaba Dubbo](http://dubbo.io/) and [Weibo Motan]
**rpcx 3.0** has been refactored for targets: **rpcx 3.0** has been refactored for targets:
1. **Simple**: easy to learn, easy to develop, easy to intergate and easy to deploy 1. **Simple**: easy to learn, easy to develop, easy to intergate and easy to deploy
2. **Performance**: high perforamnce (>= grpc-go) 2. **Performance**: high perforamnce (>= grpc-go)
3. **Cross-platform**: support _raw slice of bytes_, _JSON_, _Protobuf_ and _MessagePack_. Theoretically it can be use in java, php, python, c/c++, node.js, c# and other platforms theoretically 3. **Cross-platform**: support _raw slice of bytes_, _JSON_, _Protobuf_ and _MessagePack_. Theoretically it can be use in java, php, python, c/c++, node.js, c# and other platforms
4. **Service discovery and service governance.**: support zookeeper, etcd and consul. 4. **Service discovery and service governance.**: support zookeeper, etcd and consul.
It contains below features It contains below features

View File

@@ -272,7 +272,6 @@ func (client *Client) send(ctx context.Context, call *Call) {
call.done() call.done()
return return
} }
if len(data) > 1024 && client.option.CompressType == protocol.Gzip { if len(data) > 1024 && client.option.CompressType == protocol.Gzip {
data, err = util.Zip(data) data, err = util.Zip(data)
if err != nil { if err != nil {
@@ -288,7 +287,7 @@ func (client *Client) send(ctx context.Context, call *Call) {
} }
data := req.Encode() data := req.Encode()
protocol.FreeMsg(req)
_, err := client.Conn.Write(data) _, err := client.Conn.Write(data)
if err != nil { if err != nil {
client.mutex.Lock() client.mutex.Lock()
@@ -301,6 +300,8 @@ func (client *Client) send(ctx context.Context, call *Call) {
} }
} }
protocol.FreeMsg(req)
if req.IsOneway() { if req.IsOneway() {
client.mutex.Lock() client.mutex.Lock()
call = client.pending[seq] call = client.pending[seq]

View File

@@ -426,7 +426,7 @@ func (m *Message) Decode(r io.Reader) error {
// Reset clean data of this message but keep allocated data // Reset clean data of this message but keep allocated data
func (m *Message) Reset() { func (m *Message) Reset() {
for i := 1; i < 12; i++ { for i := 1; i < 12; i++ {
m.Header[0] = 0 m.Header[i] = 0
} }
m.Metadata = nil m.Metadata = nil
m.Payload = m.Payload[:0] m.Payload = m.Payload[:0]

22
server/kcp.go Normal file
View File

@@ -0,0 +1,22 @@
// +build kcp
package server
import (
"errors"
"net"
kcp "github.com/xtaci/kcp-go"
)
func init() {
makeListeners["kcp"] = kcpMakeListener
}
func kcpMakeListener(s *Server, address string) (ln net.Listener, err error) {
if s.Options == nil || s.Options["BlockCrypt"] == nil {
return nil, errors.New("KCP BlockCrypt must be configured in server.Options")
}
return kcp.ListenWithOptions(address, s.Options["BlockCrypt"].(kcp.BlockCrypt), 10, 3)
}

View File

@@ -1,34 +1,41 @@
// +build !windows
// +build !udp
package server package server
import ( import (
"crypto/tls" "crypto/tls"
"fmt"
"net" "net"
reuseport "github.com/kavu/go_reuseport"
) )
var makeListeners = make(map[string]MakeListener)
func init() {
makeListeners["tcp"] = tcpMakeListener
makeListeners["http"] = tcpMakeListener
}
// RegisterMakeListener registers a MakeListener for network.
func RegisterMakeListener(network string, ml MakeListener) {
makeListeners[network] = ml
}
// MakeListener defines a listener generater.
type MakeListener func(s *Server, address string) (ln net.Listener, err error)
// block can be nil if the caller wishes to skip encryption in kcp. // block can be nil if the caller wishes to skip encryption in kcp.
// tlsConfig can be nil iff we are not using network "quic". // tlsConfig can be nil iff we are not using network "quic".
func (s *Server) makeListener(network, address string) (ln net.Listener, err error) { func (s *Server) makeListener(network, address string) (ln net.Listener, err error) {
switch network { ml := makeListeners[network]
case "reuseport": if ml == nil {
if validIP4(address) { return nil, fmt.Errorf("can not make listener for %s", network)
network = "tcp4" }
} else { return ml(s, address)
network = "tcp6" }
}
ln, err = reuseport.NewReusablePortListener(network, address)
default: //tcp, http
if s.TLSConfig == nil {
ln, err = net.Listen(network, address)
} else {
ln, err = tls.Listen(network, address, s.TLSConfig)
}
func tcpMakeListener(s *Server, address string) (ln net.Listener, err error) {
if s.TLSConfig == nil {
ln, err = net.Listen("tcp", address)
} else {
ln, err = tls.Listen("tcp", address, s.TLSConfig)
} }
return ln, err return ln, err

View File

@@ -1,48 +0,0 @@
// +build !windows
// +build udp
package server
import (
"crypto/tls"
"errors"
"net"
reuseport "github.com/kavu/go_reuseport"
quicconn "github.com/marten-seemann/quic-conn"
kcp "github.com/xtaci/kcp-go"
)
// block can be nil if the caller wishes to skip encryption in kcp.
// tlsConfig can be nil iff we are not using network "quic".
func (s *Server) makeListener(network, address string) (ln net.Listener, err error) {
switch network {
case "kcp":
if s.Options == nil || s.Options["BlockCrypt"] == nil {
return nil, errors.New("KCP BlockCrypt must be configured in server.Options")
}
ln, err = kcp.ListenWithOptions(address, s.Options["BlockCrypt"].(kcp.BlockCrypt), 10, 3)
case "reuseport":
if validIP4(address) {
network = "tcp4"
} else {
network = "tcp6"
}
ln, err = reuseport.NewReusablePortListener(network, address)
case "quic":
if s.TLSConfig == nil {
return nil, errors.New("TLSConfig must be configured in server.Options")
}
ln, err = quicconn.Listen("udp", address, s.TLSConfig)
default: //tcp, http
if s.TLSConfig == nil {
ln, err = net.Listen(network, address)
} else {
ln, err = tls.Listen(network, address, s.TLSConfig)
}
}
return ln, err
}

23
server/listener_unix.go Normal file
View File

@@ -0,0 +1,23 @@
// +build !windows
package server
import (
reuseport "github.com/kavu/go_reuseport"
)
func init() {
makeListeners["reuseport"] = reuseportMakeListener
}
func reuseportMakeListener func(s *Server, address string) (ln net.Listener, err error) {
if validIP4(address) {
network = "tcp4"
} else {
network = "tcp6"
}
return reuseport.NewReusablePortListener(network, address)
}

View File

@@ -1,32 +0,0 @@
// +build windows
// +build !udp
package server
import (
"crypto/tls"
"net"
)
// block can be nil if the caller wishes to skip encryption.
// tlsConfig can be nil iff we are not using network "quic".
func (s *Server) makeListener(network, address string) (ln net.Listener, err error) {
switch network {
case "reuseport":
if validIP4(address) {
network = "tcp4"
} else {
network = "tcp6"
}
ln, err = net.Listen(network, address)
default: //tcp
if s.TLSConfig == nil {
ln, err = net.Listen(network, address)
} else {
ln, err = tls.Listen(network, address, s.TLSConfig)
}
}
return ln, err
}

View File

@@ -1,47 +0,0 @@
// +build windows
// +build udp
package server
import (
"crypto/tls"
"errors"
"net"
quicconn "github.com/marten-seemann/quic-conn"
kcp "github.com/xtaci/kcp-go"
)
// block can be nil if the caller wishes to skip encryption.
// tlsConfig can be nil if we are not using network "quic".
func (s *Server) makeListener(network, address string) (ln net.Listener, err error) {
switch network {
case "kcp":
if s.Options == nil || s.Options["BlockCrypt"] == nil {
return nil, errors.New("KCP BlockCrypt must be configured in server.Options")
}
ln, err = kcp.ListenWithOptions(address, s.Options["BlockCrypt"].(kcp.BlockCrypt), 10, 3)
case "reuseport":
if validIP4(address) {
network = "tcp4"
} else {
network = "tcp6"
}
ln, err = net.Listen(network, address)
case "quic":
if s.TLSConfig == nil {
return nil, errors.New("TLSConfig must be configured in server.Options")
}
ln, err = quicconn.Listen("udp", address, s.TLSConfig)
default: //tcp
if s.TLSConfig == nil {
ln, err = net.Listen(network, address)
} else {
ln, err = tls.Listen(network, address, s.TLSConfig)
}
}
return ln, err
}

21
server/quic.go Normal file
View File

@@ -0,0 +1,21 @@
// +build quic
package server
import (
"errors"
"net"
quicconn "github.com/marten-seemann/quic-conn"
)
func init() {
makeListeners["quic"] = quicMakeListener
}
func quicMakeListener(s *Server, address string) (ln net.Listener, err error) {
if s.TLSConfig == nil {
return nil, errors.New("TLSConfig must be configured in server.Options")
}
return quicconn.Listen("udp", address, s.TLSConfig)
}

View File

@@ -329,7 +329,6 @@ func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Me
req = protocol.GetPooledMsg() req = protocol.GetPooledMsg()
err = req.Decode(r) err = req.Decode(r)
s.Plugins.DoPostReadRequest(ctx, req, err) s.Plugins.DoPostReadRequest(ctx, req, err)
return req, err return req, err
} }