Files
rpcx/client/xclient.go
2017-12-19 13:39:00 +08:00

570 lines
13 KiB
Go

package client
import (
"context"
"errors"
"reflect"
"strings"
"sync"
"time"
ex "github.com/smallnest/rpcx/errors"
"github.com/smallnest/rpcx/protocol"
"github.com/smallnest/rpcx/share"
)
var (
// ErrXClientShutdown xclient is shutdown.
ErrXClientShutdown = errors.New("xClient is shut down")
// ErrXClientNoServer selector can't found one server.
ErrXClientNoServer = errors.New("can not found any server")
// ErrServerUnavailable selected server is unavailable.
ErrServerUnavailable = errors.New("selected server is unavilable")
)
// XClient is an interface that used by client with service discovery and service governance.
// One XClient is used only for one service. You should create multiple XClient for multiple services.
type XClient interface {
SetPlugins(plugins PluginContainer)
SetSelector(s Selector)
ConfigGeoSelector(latitude, longitude float64)
Auth(auth string)
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
Close() error
}
// KVPair contains a key and a string.
type KVPair struct {
Key string
Value string
}
// ServiceDiscovery defines ServiceDiscovery of zookeeper, etcd and consul
type ServiceDiscovery interface {
GetServices() []*KVPair
WatchService() chan []*KVPair
RemoveWatcher(ch chan []*KVPair)
Clone(servicePath string) ServiceDiscovery
Close()
}
type xClient struct {
failMode FailMode
selectMode SelectMode
cachedClient map[string]RPCClient
servicePath string
serviceMethod string
option Option
mu sync.RWMutex
servers map[string]string
discovery ServiceDiscovery
selector Selector
isShutdown bool
// auth is a string for Authentication, for example, "Bearer mF_9.B5f-4.1JqM"
auth string
Plugins PluginContainer
ch chan []*KVPair
}
// NewXClient creates a XClient that supports service discovery and service governance.
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
client := &xClient{
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
option: option,
}
servers := make(map[string]string)
pairs := discovery.GetServices()
for _, p := range pairs {
servers[p.Key] = p.Value
}
client.servers = servers
if selectMode != Closest && selectMode != SelectByUser {
client.selector = newSelector(selectMode, servers)
}
client.Plugins = &pluginContainer{}
ch := client.discovery.WatchService()
if ch != nil {
client.ch = ch
go client.watch(ch)
}
return client
}
// SetSelector sets customized selector by users.
func (c *xClient) SetSelector(s Selector) {
c.mu.RLock()
s.UpdateServer(c.servers)
c.mu.RUnlock()
c.selector = s
}
// SetPlugins sets client's plugins.
func (c *xClient) SetPlugins(plugins PluginContainer) {
c.Plugins = plugins
}
// ConfigGeoSelector sets location of client's latitude and longitude,
// and use newGeoSelector.
func (c *xClient) ConfigGeoSelector(latitude, longitude float64) {
c.selector = newGeoSelector(c.servers, latitude, longitude)
c.selectMode = Closest
}
// Auth sets s token for Authentication.
func (c *xClient) Auth(auth string) {
c.auth = auth
}
// watch changes of service and update cached clients.
func (c *xClient) watch(ch chan []*KVPair) {
for pairs := range ch {
servers := make(map[string]string)
for _, p := range pairs {
servers[p.Key] = p.Value
}
c.mu.Lock()
c.servers = servers
if c.selector != nil {
c.selector.UpdateServer(servers)
}
c.mu.Unlock()
}
}
// selects a client from candidates base on c.selectMode
func (c *xClient) selectClient(ctx context.Context, servicePath, serviceMethod string, args interface{}) (string, RPCClient, error) {
k := c.selector.Select(ctx, servicePath, serviceMethod, args)
if k == "" {
return "", nil, ErrXClientNoServer
}
client, err := c.getCachedClient(k)
return k, client, err
}
func (c *xClient) getCachedClient(k string) (RPCClient, error) {
c.mu.RLock()
client := c.cachedClient[k]
if client != nil {
if !client.IsClosing() && !client.IsShutdown() {
c.mu.RUnlock()
return client, nil
}
}
c.mu.RUnlock()
//double check
c.mu.Lock()
client = c.cachedClient[k]
if client == nil {
network, addr := splitNetworkAndAddress(k)
if network == "inprocess" {
client = InprocessClient
} else {
client = &Client{
option: c.option,
Plugins: c.Plugins,
}
err := client.Connect(network, addr)
if err != nil {
c.mu.Unlock()
return nil, err
}
}
c.cachedClient[k] = client
}
c.mu.Unlock()
return client, nil
}
func (c *xClient) removeClient(k string, client RPCClient) {
c.mu.Lock()
cl := c.cachedClient[k]
if cl == client {
delete(c.cachedClient, k)
}
c.mu.Unlock()
if client != nil {
client.Close()
}
}
func splitNetworkAndAddress(server string) (string, string) {
ss := strings.SplitN(server, "@", 2)
if len(ss) == 1 {
return "tcp", server
}
return ss[0], ss[1]
}
// Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
// It does not use FailMode.
func (c *xClient) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error) {
if c.isShutdown {
return nil, ErrXClientShutdown
}
if c.auth != "" {
metadata := ctx.Value(share.ReqMetaDataKey)
if metadata == nil {
return nil, errors.New("must set ReqMetaDataKey in context")
}
m := metadata.(map[string]string)
m[share.AuthKey] = c.auth
}
_, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args)
if err != nil {
return nil, err
}
return client.Go(ctx, c.servicePath, serviceMethod, args, reply, done), nil
}
// Call invokes the named function, waits for it to complete, and returns its error status.
// It handles errors base on FailMode.
func (c *xClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
if c.isShutdown {
return ErrXClientShutdown
}
if c.auth != "" {
metadata := ctx.Value(share.ReqMetaDataKey)
if metadata == nil {
return errors.New("must set ReqMetaDataKey in context")
}
m := metadata.(map[string]string)
m[share.AuthKey] = c.auth
}
var err error
k, client, err := c.selectClient(ctx, c.servicePath, serviceMethod, args)
if err != nil {
if c.failMode == Failfast {
return err
}
if _, ok := err.(ServiceError); ok {
return err
}
}
switch c.failMode {
case Failtry:
retries := c.option.Retries
for retries > 0 {
retries--
err = c.wrapCall(ctx, client, serviceMethod, args, reply)
if err == nil {
return nil
}
if _, ok := err.(ServiceError); ok {
return err
}
c.removeClient(k, client)
client, _ = c.getCachedClient(k)
}
return err
case Failover:
retries := c.option.Retries
for retries > 0 {
retries--
err = c.wrapCall(ctx, client, serviceMethod, args, reply)
if err == nil {
return nil
}
if _, ok := err.(ServiceError); ok {
return err
}
c.removeClient(k, client)
//select another server
k, client, _ = c.selectClient(ctx, c.servicePath, serviceMethod, args)
}
return err
default: //Failfast
err = c.wrapCall(ctx, client, serviceMethod, args, reply)
if err != nil {
if _, ok := err.(ServiceError); !ok {
c.removeClient(k, client)
}
}
return err
}
}
func (c *xClient) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) {
if c.isShutdown {
return nil, nil, ErrXClientShutdown
}
if c.auth != "" {
metadata := ctx.Value(share.ReqMetaDataKey)
if metadata == nil {
return nil, nil, errors.New("must set ReqMetaDataKey in context")
}
m := metadata.(map[string]string)
m[share.AuthKey] = c.auth
}
var err error
k, client, err := c.selectClient(ctx, r.ServicePath, r.ServiceMethod, r.Payload)
if err != nil {
if c.failMode == Failfast {
return nil, nil, err
}
if _, ok := err.(ServiceError); ok {
return nil, nil, err
}
}
switch c.failMode {
case Failtry:
retries := c.option.Retries
for retries > 0 {
retries--
m, payload, err := client.SendRaw(ctx, r)
if err == nil {
return m, payload, nil
}
if _, ok := err.(ServiceError); ok {
return nil, nil, err
}
c.removeClient(k, client)
client, _ = c.getCachedClient(k)
}
return nil, nil, err
case Failover:
retries := c.option.Retries
for retries > 0 {
retries--
m, payload, err := client.SendRaw(ctx, r)
if err == nil {
return m, payload, nil
}
if _, ok := err.(ServiceError); ok {
return nil, nil, err
}
c.removeClient(k, client)
//select another server
k, client, _ = c.selectClient(ctx, r.ServicePath, r.ServiceMethod, r.Payload)
}
return nil, nil, err
default: //Failfast
m, payload, err := client.SendRaw(ctx, r)
if err != nil {
if _, ok := err.(ServiceError); !ok {
c.removeClient(k, client)
}
}
return m, payload, nil
}
}
func (c *xClient) wrapCall(ctx context.Context, client RPCClient, serviceMethod string, args interface{}, reply interface{}) error {
if client == nil {
return ErrServerUnavailable
}
c.Plugins.DoPreCall(ctx, c.servicePath, serviceMethod, args)
err := client.Call(ctx, c.servicePath, serviceMethod, args, reply)
c.Plugins.DoPostCall(ctx, c.servicePath, serviceMethod, args, reply, err)
return err
}
// Broadcast sends requests to all servers and Success only when all servers return OK.
// FailMode and SelectMode are meanless for this method.
// Please set timeout to avoid hanging.
func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
if c.isShutdown {
return ErrXClientShutdown
}
if c.auth != "" {
metadata := ctx.Value(share.ReqMetaDataKey)
if metadata == nil {
return errors.New("must set ReqMetaDataKey in context")
}
m := metadata.(map[string]string)
m[share.AuthKey] = c.auth
}
var clients []RPCClient
c.mu.RLock()
for k := range c.servers {
client, err := c.getCachedClient(k)
if err != nil {
c.mu.RUnlock()
return err
}
clients = append(clients, client)
}
c.mu.RUnlock()
if len(clients) == 0 {
return ErrXClientNoServer
}
var err error
l := len(clients)
done := make(chan bool, l)
for _, client := range clients {
client := client
go func() {
err = c.wrapCall(ctx, client, serviceMethod, args, reply)
done <- (err == nil)
}()
}
timeout := time.After(time.Minute)
check:
for {
select {
case result := <-done:
l--
if l == 0 || !result { // all returns or some one returns an error
break check
}
case <-timeout:
break check
}
}
return err
}
// Fork sends requests to all servers and Success once one server returns OK.
// FailMode and SelectMode are meanless for this method.
func (c *xClient) Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
if c.isShutdown {
return ErrXClientShutdown
}
if c.auth != "" {
metadata := ctx.Value(share.ReqMetaDataKey)
if metadata == nil {
return errors.New("must set ReqMetaDataKey in context")
}
m := metadata.(map[string]string)
m[share.AuthKey] = c.auth
}
var clients []RPCClient
c.mu.RLock()
for k := range c.servers {
client, err := c.getCachedClient(k)
if err != nil {
c.mu.RUnlock()
return err
}
clients = append(clients, client)
}
c.mu.RUnlock()
if len(clients) == 0 {
return ErrXClientNoServer
}
var err error
l := len(clients)
done := make(chan bool, l)
for _, client := range clients {
client := client
go func() {
clonedReply := reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
err = c.wrapCall(ctx, client, serviceMethod, args, clonedReply)
done <- (err == nil)
if err == nil {
reflect.ValueOf(reply).Set(reflect.ValueOf(clonedReply))
}
}()
}
timeout := time.After(time.Minute)
check:
for {
select {
case result := <-done:
l--
if result {
return nil
}
if l == 0 { // all returns or some one returns an error
break check
}
case <-timeout:
break check
}
}
return err
}
// Close closes this client and its underlying connnections to services.
func (c *xClient) Close() error {
c.isShutdown = true
var errs []error
c.mu.Lock()
for k, v := range c.cachedClient {
e := v.Close()
if e != nil {
errs = append(errs, e)
}
delete(c.cachedClient, k)
}
c.mu.Unlock()
go func() {
defer func() {
if r := recover(); r != nil {
}
}()
c.discovery.RemoveWatcher(c.ch)
close(c.ch)
}()
if len(errs) > 0 {
return ex.NewMultiError(errs)
}
return nil
}