add ConnCreated plugin

This commit is contained in:
smallnest
2019-05-16 17:19:33 +08:00
parent 463d14363d
commit f235181ad9
4 changed files with 63 additions and 34 deletions

View File

@@ -7,7 +7,9 @@
- support opentracing and opencensus
- upload/download files by streaming
- add Pool for XClient and OneClient
- remove rudp support
- add ConnCreated plugin. Yu can use it to set KCP UDPSession
- update client plugins. All plugin returns error instead of bool
## 4.0

View File

@@ -48,6 +48,13 @@ func (c *Client) Connect(network, address string) error {
conn.SetWriteDeadline(time.Now().Add(c.option.WriteTimeout))
}
if c.Plugins != nil {
conn, err = c.Plugins.DoConnCreated(conn)
if err != nil {
return err
}
}
c.Conn = conn
c.r = bufio.NewReaderSize(conn, ReaderBuffsize)
//c.w = bufio.NewWriterSize(conn, WriterBuffsize)

View File

@@ -73,60 +73,74 @@ func (p *pluginContainer) DoPostCall(ctx context.Context, servicePath, serviceMe
return nil
}
// DoConnCreated is called in case of client connection created.
func (p *pluginContainer) DoConnCreated(conn net.Conn) (net.Conn, error) {
var err error
for i := range p.plugins {
if plugin, ok := p.plugins[i].(ConnCreatedPlugin); ok {
conn, err = plugin.ConnCreated(conn)
if err != nil {
return conn, err
}
}
}
return conn, nil
}
// DoClientConnected is called in case of connected.
func (p *pluginContainer) DoClientConnected(conn net.Conn) (net.Conn, bool) {
var handleOk bool
func (p *pluginContainer) DoClientConnected(conn net.Conn) (net.Conn, error) {
var err error
for i := range p.plugins {
if plugin, ok := p.plugins[i].(ClientConnectedPlugin); ok {
conn, handleOk = plugin.ClientConnected(conn)
if !handleOk {
return conn, false
conn, err = plugin.ClientConnected(conn)
if err != nil {
return conn, err
}
}
}
return conn, true
return conn, nil
}
// DoClientConnected is called in case of connected.
func (p *pluginContainer) DoClientConnectionClose(conn net.Conn) bool {
var handleOk bool
func (p *pluginContainer) DoClientConnectionClose(conn net.Conn) error {
var err error
for i := range p.plugins {
if plugin, ok := p.plugins[i].(ClientConnectionClosePlugin); ok {
handleOk = plugin.ClientConnectionClose(conn)
if !handleOk {
return false
err = plugin.ClientConnectionClose(conn)
if err != nil {
return err
}
}
}
return true
return err
}
// DoClientBeforeEncode is called when requests are encoded and sent.
func (p *pluginContainer) DoClientBeforeEncode(req *protocol.Message) bool {
var handleOk bool
func (p *pluginContainer) DoClientBeforeEncode(req *protocol.Message) error {
var err error
for i := range p.plugins {
if plugin, ok := p.plugins[i].(ClientBeforeEncodePlugin); ok {
handleOk = plugin.ClientBeforeEncode(req)
if !handleOk {
return false
err = plugin.ClientBeforeEncode(req)
if err != nil {
return err
}
}
}
return true
return nil
}
// DoClientBeforeEncode is called when requests are encoded and sent.
func (p *pluginContainer) DoClientAfterDecode(req *protocol.Message) bool {
var handleOk bool
func (p *pluginContainer) DoClientAfterDecode(req *protocol.Message) error {
var err error
for i := range p.plugins {
if plugin, ok := p.plugins[i].(ClientAfterDecodePlugin); ok {
handleOk = plugin.ClientAfterDecode(req)
if !handleOk {
return false
err = plugin.ClientAfterDecode(req)
if err != nil {
return err
}
}
}
return true
return nil
}
type (
@@ -140,24 +154,29 @@ type (
DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
}
// ConnCreatedPlugin is invoked when the client connection has created.
ConnCreatedPlugin interface {
ConnCreated(net.Conn) (net.Conn, error)
}
// ClientConnectedPlugin is invoked when the client has connected the server.
ClientConnectedPlugin interface {
ClientConnected(net.Conn) (net.Conn, bool)
ClientConnected(net.Conn) (net.Conn, error)
}
// ClientConnectionClosePlugin is invoked when the connection is closing.
ClientConnectionClosePlugin interface {
ClientConnectionClose(net.Conn) bool
ClientConnectionClose(net.Conn) error
}
// ClientBeforeEncodePlugin is invoked when the message is encoded and sent.
ClientBeforeEncodePlugin interface {
ClientBeforeEncode(*protocol.Message) bool
ClientBeforeEncode(*protocol.Message) error
}
// ClientAfterDecodePlugin is invoked when the message is decoded.
ClientAfterDecodePlugin interface {
ClientAfterDecode(*protocol.Message) bool
ClientAfterDecode(*protocol.Message) error
}
//PluginContainer represents a plugin container that defines all methods to manage plugins.
@@ -167,13 +186,14 @@ type (
Remove(plugin Plugin)
All() []Plugin
DoClientConnected(net.Conn) (net.Conn, bool)
DoClientConnectionClose(net.Conn) bool
DoConnCreated(net.Conn) (net.Conn, error)
DoClientConnected(net.Conn) (net.Conn, error)
DoClientConnectionClose(net.Conn) error
DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
DoClientBeforeEncode(*protocol.Message) bool
DoClientAfterDecode(*protocol.Message) bool
DoClientBeforeEncode(*protocol.Message) error
DoClientAfterDecode(*protocol.Message) error
}
)

View File

@@ -21,7 +21,7 @@ func kcpMakeListener(s *Server, address string) (ln net.Listener, err error) {
return kcp.ListenWithOptions(address, s.options["BlockCrypt"].(kcp.BlockCrypt), 10, 3)
}
// WithWriteTimeout sets writeTimeout.
// WithBlockCrypt sets kcp.BlockCrypt.
func WithBlockCrypt(bc kcp.BlockCrypt) OptionFn {
return func(s *Server) {
s.options["BlockCrypt"] = bc