diff --git a/_documents/rpcx_tech_support.png b/_documents/rpcx_tech_support.png deleted file mode 100644 index 6d9cd48..0000000 Binary files a/_documents/rpcx_tech_support.png and /dev/null differ diff --git a/client/client.go b/client/client.go index d68ec57..170c404 100644 --- a/client/client.go +++ b/client/client.go @@ -94,6 +94,8 @@ type RPCClient interface { IsClosing() bool IsShutdown() bool + + GetConn() net.Conn } // Client represents a RPC client. @@ -129,6 +131,11 @@ func (c *Client) RemoteAddr() string { return c.Conn.RemoteAddr().String() } +// GetConn returns the underlying conn. +func (c *Client) GetConn() net.Conn { + return c.Conn +} + // Option contains all options for creating clients. type Option struct { // Group is used to select the services in the same group. Services set group info in their meta. diff --git a/client/xclient.go b/client/xclient.go index 88ec136..5c8c49c 100644 --- a/client/xclient.go +++ b/client/xclient.go @@ -53,6 +53,15 @@ type XClient interface { DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error Stream(ctx context.Context, meta map[string]string) (net.Conn, error) Close() error + + RegisterCacheClientBuilder(network string, builder CacheClientBuilder) +} + +type CacheClientBuilder interface { + setCachedClient(client RPCClient, k, servicePath, serviceMethod string) + findCachedClient(k, servicePath, serviceMethod string) RPCClient + deleteCachedClient(client RPCClient, k, servicePath, serviceMethod string) + generateClient(k, servicePath, serviceMethod string) (client RPCClient, err error) } // KVPair contains a key and a string. @@ -84,10 +93,11 @@ type xClient struct { servicePath string option Option - mu sync.RWMutex - servers map[string]string - discovery ServiceDiscovery - selector Selector + mu sync.RWMutex + servers map[string]string + discovery ServiceDiscovery + selector Selector + cacheClientBuilders map[string]CacheClientBuilder slGroup singleflight.Group @@ -106,12 +116,13 @@ type xClient struct { // NewXClient creates a XClient that supports service discovery and service governance. func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient { client := &xClient{ - failMode: failMode, - selectMode: selectMode, - discovery: discovery, - servicePath: servicePath, - cachedClient: make(map[string]RPCClient), - option: option, + failMode: failMode, + selectMode: selectMode, + discovery: discovery, + servicePath: servicePath, + cachedClient: make(map[string]RPCClient), + cacheClientBuilders: make(map[string]CacheClientBuilder), + option: option, } pairs := discovery.GetServices() @@ -171,6 +182,12 @@ func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode S return client } +func (c *xClient) RegisterCacheClientBuilder(network string, builder CacheClientBuilder) { + c.mu.Lock() + c.mu.Unlock() + c.cacheClientBuilders[network] = builder +} + // SetSelector sets customized selector by users. func (c *xClient) SetSelector(s Selector) { c.mu.RLock() @@ -255,7 +272,7 @@ func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, a var needCallPlugin bool defer func() { if needCallPlugin { - c.Plugins.DoClientConnected((client.(*Client)).Conn) + c.Plugins.DoClientConnected(client.GetConn()) } }() c.mu.Lock() @@ -302,20 +319,52 @@ func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, a } func (c *xClient) setCachedClient(client RPCClient, k, servicePath, serviceMethod string) { + network, _ := splitNetworkAndAddress(k) + if builder, ok := c.cacheClientBuilders[network]; ok { + builder.setCachedClient(client, k, servicePath, serviceMethod) + return + } + c.cachedClient[k] = client } func (c *xClient) findCachedClient(k, servicePath, serviceMethod string) RPCClient { + network, _ := splitNetworkAndAddress(k) + if builder, ok := c.cacheClientBuilders[network]; ok { + return builder.findCachedClient(k, servicePath, serviceMethod) + } + return c.cachedClient[k] } func (c *xClient) deleteCachedClient(client RPCClient, k, servicePath, serviceMethod string) { + network, _ := splitNetworkAndAddress(k) + if builder, ok := c.cacheClientBuilders[network]; ok && client != nil { + builder.deleteCachedClient(client, k, servicePath, serviceMethod) + client.Close() + return + } + delete(c.cachedClient, k) if client != nil { client.Close() } } +func (c *xClient) removeClient(k, servicePath, serviceMethod string, client RPCClient) { + c.mu.Lock() + cl := c.findCachedClient(k, servicePath, serviceMethod) + if cl == client { + c.deleteCachedClient(client, k, servicePath, serviceMethod) + } + c.mu.Unlock() + + if client != nil { + client.UnregisterServerMessageChan() + client.Close() + } +} + func (c *xClient) generateClient(k, servicePath, serviceMethod string) (client RPCClient, err error) { client = &Client{ option: c.option, @@ -339,51 +388,45 @@ func (c *xClient) generateClient(k, servicePath, serviceMethod string) (client R } func (c *xClient) getCachedClientWithoutLock(k, servicePath, serviceMethod string) (RPCClient, error) { - client := c.cachedClient[k] + client := c.findCachedClient(k, servicePath, serviceMethod) if client != nil { if !client.IsClosing() && !client.IsShutdown() { return client, nil } - delete(c.cachedClient, k) - client.Close() + c.deleteCachedClient(client, k, servicePath, serviceMethod) } - // double check - client = c.cachedClient[k] - if client == nil || client.IsShutdown() { - network, addr := splitNetworkAndAddress(k) - - client = &Client{ - option: c.option, - Plugins: c.Plugins, + var needCallPlugin bool + defer func() { + if needCallPlugin { + c.Plugins.DoClientConnected(client.GetConn()) } - err := client.Connect(network, addr) + }() + + // double check + client = c.findCachedClient(k, servicePath, serviceMethod) + if client == nil || client.IsShutdown() { + generatedClient, err, _ := c.slGroup.Do(k, func() (interface{}, error) { + return c.generateClient(k, servicePath, serviceMethod) + }) + c.slGroup.Forget(k) if err != nil { return nil, err } + client = generatedClient.(RPCClient) + if c.Plugins != nil { + needCallPlugin = true + } + client.RegisterServerMessageChan(c.serverMessageChan) - c.cachedClient[k] = client + c.setCachedClient(client, k, servicePath, serviceMethod) } return client, nil } -func (c *xClient) removeClient(k, servicePath, serviceMethod string, client RPCClient) { - c.mu.Lock() - cl := c.cachedClient[k] - if cl == client { - delete(c.cachedClient, k) - } - c.mu.Unlock() - - if client != nil { - client.UnregisterServerMessageChan() - client.Close() - } -} - func splitNetworkAndAddress(server string) (string, string) { ss := strings.SplitN(server, "@", 2) if len(ss) == 1 {