refactor to support other rpc clients

This commit is contained in:
smallnest
2021-04-09 21:01:11 +08:00
parent 7cece5b70e
commit fa5e5976dc
3 changed files with 88 additions and 38 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

View File

@@ -94,6 +94,8 @@ type RPCClient interface {
IsClosing() bool
IsShutdown() bool
GetConn() net.Conn
}
// Client represents a RPC client.
@@ -129,6 +131,11 @@ func (c *Client) RemoteAddr() string {
return c.Conn.RemoteAddr().String()
}
// GetConn returns the underlying conn.
func (c *Client) GetConn() net.Conn {
return c.Conn
}
// Option contains all options for creating clients.
type Option struct {
// Group is used to select the services in the same group. Services set group info in their meta.

View File

@@ -53,6 +53,15 @@ type XClient interface {
DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error
Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
Close() error
RegisterCacheClientBuilder(network string, builder CacheClientBuilder)
}
type CacheClientBuilder interface {
setCachedClient(client RPCClient, k, servicePath, serviceMethod string)
findCachedClient(k, servicePath, serviceMethod string) RPCClient
deleteCachedClient(client RPCClient, k, servicePath, serviceMethod string)
generateClient(k, servicePath, serviceMethod string) (client RPCClient, err error)
}
// KVPair contains a key and a string.
@@ -84,10 +93,11 @@ type xClient struct {
servicePath string
option Option
mu sync.RWMutex
servers map[string]string
discovery ServiceDiscovery
selector Selector
mu sync.RWMutex
servers map[string]string
discovery ServiceDiscovery
selector Selector
cacheClientBuilders map[string]CacheClientBuilder
slGroup singleflight.Group
@@ -106,12 +116,13 @@ type xClient struct {
// NewXClient creates a XClient that supports service discovery and service governance.
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
client := &xClient{
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
option: option,
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
cacheClientBuilders: make(map[string]CacheClientBuilder),
option: option,
}
pairs := discovery.GetServices()
@@ -171,6 +182,12 @@ func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode S
return client
}
func (c *xClient) RegisterCacheClientBuilder(network string, builder CacheClientBuilder) {
c.mu.Lock()
c.mu.Unlock()
c.cacheClientBuilders[network] = builder
}
// SetSelector sets customized selector by users.
func (c *xClient) SetSelector(s Selector) {
c.mu.RLock()
@@ -255,7 +272,7 @@ func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, a
var needCallPlugin bool
defer func() {
if needCallPlugin {
c.Plugins.DoClientConnected((client.(*Client)).Conn)
c.Plugins.DoClientConnected(client.GetConn())
}
}()
c.mu.Lock()
@@ -302,20 +319,52 @@ func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, a
}
func (c *xClient) setCachedClient(client RPCClient, k, servicePath, serviceMethod string) {
network, _ := splitNetworkAndAddress(k)
if builder, ok := c.cacheClientBuilders[network]; ok {
builder.setCachedClient(client, k, servicePath, serviceMethod)
return
}
c.cachedClient[k] = client
}
func (c *xClient) findCachedClient(k, servicePath, serviceMethod string) RPCClient {
network, _ := splitNetworkAndAddress(k)
if builder, ok := c.cacheClientBuilders[network]; ok {
return builder.findCachedClient(k, servicePath, serviceMethod)
}
return c.cachedClient[k]
}
func (c *xClient) deleteCachedClient(client RPCClient, k, servicePath, serviceMethod string) {
network, _ := splitNetworkAndAddress(k)
if builder, ok := c.cacheClientBuilders[network]; ok && client != nil {
builder.deleteCachedClient(client, k, servicePath, serviceMethod)
client.Close()
return
}
delete(c.cachedClient, k)
if client != nil {
client.Close()
}
}
func (c *xClient) removeClient(k, servicePath, serviceMethod string, client RPCClient) {
c.mu.Lock()
cl := c.findCachedClient(k, servicePath, serviceMethod)
if cl == client {
c.deleteCachedClient(client, k, servicePath, serviceMethod)
}
c.mu.Unlock()
if client != nil {
client.UnregisterServerMessageChan()
client.Close()
}
}
func (c *xClient) generateClient(k, servicePath, serviceMethod string) (client RPCClient, err error) {
client = &Client{
option: c.option,
@@ -339,51 +388,45 @@ func (c *xClient) generateClient(k, servicePath, serviceMethod string) (client R
}
func (c *xClient) getCachedClientWithoutLock(k, servicePath, serviceMethod string) (RPCClient, error) {
client := c.cachedClient[k]
client := c.findCachedClient(k, servicePath, serviceMethod)
if client != nil {
if !client.IsClosing() && !client.IsShutdown() {
return client, nil
}
delete(c.cachedClient, k)
client.Close()
c.deleteCachedClient(client, k, servicePath, serviceMethod)
}
// double check
client = c.cachedClient[k]
if client == nil || client.IsShutdown() {
network, addr := splitNetworkAndAddress(k)
client = &Client{
option: c.option,
Plugins: c.Plugins,
var needCallPlugin bool
defer func() {
if needCallPlugin {
c.Plugins.DoClientConnected(client.GetConn())
}
err := client.Connect(network, addr)
}()
// double check
client = c.findCachedClient(k, servicePath, serviceMethod)
if client == nil || client.IsShutdown() {
generatedClient, err, _ := c.slGroup.Do(k, func() (interface{}, error) {
return c.generateClient(k, servicePath, serviceMethod)
})
c.slGroup.Forget(k)
if err != nil {
return nil, err
}
client = generatedClient.(RPCClient)
if c.Plugins != nil {
needCallPlugin = true
}
client.RegisterServerMessageChan(c.serverMessageChan)
c.cachedClient[k] = client
c.setCachedClient(client, k, servicePath, serviceMethod)
}
return client, nil
}
func (c *xClient) removeClient(k, servicePath, serviceMethod string, client RPCClient) {
c.mu.Lock()
cl := c.cachedClient[k]
if cl == client {
delete(c.cachedClient, k)
}
c.mu.Unlock()
if client != nil {
client.UnregisterServerMessageChan()
client.Close()
}
}
func splitNetworkAndAddress(server string) (string, string) {
ss := strings.SplitN(server, "@", 2)
if len(ss) == 1 {