grpc重写插件完成

This commit is contained in:
Liujian
2023-02-15 16:54:09 +08:00
parent c8655ba04d
commit bce77d0b8d
6 changed files with 93 additions and 61 deletions

View File

@@ -36,6 +36,8 @@ builds:
- -X "github.com/eolinker/apinto/utils/version.buildUser=goreleaser" - -X "github.com/eolinker/apinto/utils/version.buildUser=goreleaser"
- -X "github.com/eolinker/apinto/utils/version.goVersion={{.Env.GOVERSION}}" - -X "github.com/eolinker/apinto/utils/version.goVersion={{.Env.GOVERSION}}"
- -X "github.com/eolinker/apinto/utils/version.eoscVersion={{.Env.EoscVersion}}" - -X "github.com/eolinker/apinto/utils/version.eoscVersion={{.Env.EoscVersion}}"
env:
- CGO_ENABLED=0
archives: archives:
- id: default - id: default
format: tar.gz format: tar.gz

View File

@@ -21,10 +21,11 @@ func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWork
pw := &ProxyRewrite{ pw := &ProxyRewrite{
WorkerBase: drivers.Worker(id, name), WorkerBase: drivers.Worker(id, name),
host: strings.TrimSpace(conf.Authority),
headers: conf.Headers, headers: conf.Headers,
tls: conf.Tls, tls: conf.Tls,
skipCertificate: conf.SkipCertificate, skipCertificate: conf.SkipCertificate,
service: conf.Service,
method: conf.Method,
} }
return pw, nil return pw, nil

View File

@@ -1,8 +1,6 @@
package grpc_proxy_rewrite package grpc_proxy_rewrite
import ( import (
"strings"
grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" grpc_context "github.com/eolinker/eosc/eocontext/grpc-context"
"github.com/eolinker/apinto/drivers" "github.com/eolinker/apinto/drivers"
@@ -21,8 +19,8 @@ var (
type ProxyRewrite struct { type ProxyRewrite struct {
drivers.WorkerBase drivers.WorkerBase
service string
host string method string
headers map[string]string headers map[string]string
tls bool tls bool
skipCertificate bool skipCertificate bool
@@ -33,8 +31,12 @@ func (p *ProxyRewrite) DoFilter(ctx eocontext.EoContext, next eocontext.IChain)
} }
func (p *ProxyRewrite) DoGrpcFilter(ctx grpc_context.IGrpcContext, next eocontext.IChain) (err error) { func (p *ProxyRewrite) DoGrpcFilter(ctx grpc_context.IGrpcContext, next eocontext.IChain) (err error) {
if p.host != "" {
ctx.Proxy().SetHost(p.host) if p.service != "" {
ctx.Proxy().SetService(p.service)
}
if p.method != "" {
ctx.Proxy().SetMethod(p.method)
} }
ctx.EnableTls(p.tls) ctx.EnableTls(p.tls)
ctx.InsecureCertificateVerify(p.skipCertificate) ctx.InsecureCertificateVerify(p.skipCertificate)
@@ -59,7 +61,8 @@ func (p *ProxyRewrite) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWor
p.skipCertificate = conf.SkipCertificate p.skipCertificate = conf.SkipCertificate
p.headers = conf.Headers p.headers = conf.Headers
p.tls = conf.Tls p.tls = conf.Tls
p.host = strings.TrimSpace(conf.Authority) p.service = conf.Service
p.method = conf.Method
return nil return nil
} }

View File

@@ -1,6 +1,7 @@
package grpc_context package grpc_context
import ( import (
"fmt"
"sync" "sync"
"time" "time"
) )
@@ -8,25 +9,18 @@ import (
var _ IClient = (*Client)(nil) var _ IClient = (*Client)(nil)
var ( var (
clientPool IClient = NewClient() clientPool IClient = NewClient()
defaultClientOption = ClientOption{
ClientPoolConnSize: defaultClientPoolConnsSizeCap,
DialTimeOut: defaultDialTimeout,
KeepAlive: defaultKeepAlive,
KeepAliveTimeout: defaultKeepAliveTimeout,
}
) )
type IClient interface { type IClient interface {
Get(target string, isTls bool) (IClientPool, bool) Get(target string, isTls bool, host ...string) IClientPool
Set(target string, isTls bool, pool IClientPool)
Del(target string, isTls bool)
Close() Close()
} }
type Client struct { type Client struct {
clients map[string]IClientPool clients map[string]IClientPool
tlsClients map[string]IClientPool tlsClients map[string]IClientPool
stop bool
locker sync.RWMutex locker sync.RWMutex
} }
@@ -44,6 +38,10 @@ func (c *Client) clean() {
sleep := time.Second * 10 sleep := time.Second * 10
for { for {
c.locker.Lock() c.locker.Lock()
if c.stop {
c.locker.Unlock()
return
}
for key, client := range c.clients { for key, client := range c.clients {
if client.ConnCount() < 1 { if client.ConnCount() < 1 {
delete(c.clients, key) delete(c.clients, key)
@@ -59,33 +57,41 @@ func (c *Client) clean() {
} }
} }
func (c *Client) Get(target string, isTls bool) (IClientPool, bool) { func (c *Client) Get(target string, isTls bool, host ...string) IClientPool {
key := target
authority := ""
if len(host) > 0 && host[0] != "" {
key = fmt.Sprintf("%s|%s", target, host[0])
authority = host[0]
}
c.locker.RLock() c.locker.RLock()
defer c.locker.RUnlock()
clients := c.clients clients := c.clients
if isTls { if isTls {
clients = c.tlsClients clients = c.tlsClients
} }
client, ok := clients[target] client, ok := clients[key]
c.locker.RUnlock()
return client, ok if ok {
} return client
func (c *Client) Set(target string, isTls bool, pool IClientPool) {
c.locker.Lock()
defer c.locker.Unlock()
c.del(target, isTls)
clients := c.clients
if isTls {
clients = c.tlsClients
} }
clients[target] = pool
}
func (c *Client) Del(target string, isTls bool) {
c.locker.Lock() c.locker.Lock()
defer c.locker.Unlock() defer c.locker.Unlock()
c.del(target, isTls) client, ok = clients[key]
if ok {
return client
}
p := NewClientPoolWithOption(target, &ClientOption{
ClientPoolConnSize: defaultClientPoolConnsSizeCap,
DialTimeOut: defaultDialTimeout,
KeepAlive: defaultKeepAlive,
KeepAliveTimeout: defaultKeepAliveTimeout,
IsTls: isTls,
Authority: authority,
})
clients[key] = p
return p
} }
func (c *Client) del(target string, isTls bool) { func (c *Client) del(target string, isTls bool) {
@@ -111,6 +117,7 @@ func (c *Client) Close() {
for _, client := range tlsClients { for _, client := range tlsClients {
client.Close() client.Close()
} }
c.stop = true
c.clients = nil c.clients = nil
c.tlsClients = nil c.tlsClients = nil
} }

View File

@@ -41,8 +41,8 @@ type Context struct {
srv interface{} srv interface{}
acceptTime time.Time acceptTime time.Time
requestId string requestId string
request *Request request grpc_context.IRequest
proxy *Request proxy grpc_context.IRequest
response grpc_context.IResponse response grpc_context.IResponse
completeHandler eocontext.CompleteHandler completeHandler eocontext.CompleteHandler
finishHandler eocontext.FinishHandler finishHandler eocontext.FinishHandler
@@ -199,19 +199,18 @@ func (c *Context) SetResponse(response grpc_context.IResponse) {
} }
func (c *Context) Invoke(address string, timeout time.Duration) error { func (c *Context) Invoke(address string, timeout time.Duration) error {
clientConn, err := c.dial(address) passHost, targetHost := c.GetUpstreamHostHandler().PassHost()
switch passHost {
case eocontext.NodeHost:
c.proxy.SetHost(address)
case eocontext.ReWriteHost:
c.proxy.SetHost(targetHost)
}
clientConn, err := clientPool.Get(address, c.tls, c.proxy.Host()).Get()
if err != nil { if err != nil {
return err return err
} }
passHost, targetHost := c.GetUpstreamHostHandler().PassHost()
switch passHost {
case eocontext.PassHost:
case eocontext.NodeHost:
c.proxy.Headers().Set(":authority", address)
case eocontext.ReWriteHost:
c.proxy.Headers().Set(":authority", targetHost)
}
c.proxy.Headers().Set("grpc-timeout", fmt.Sprintf("%dn", timeout)) c.proxy.Headers().Set("grpc-timeout", fmt.Sprintf("%dn", timeout))
clientCtx, _ := context.WithCancel(metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy())) clientCtx, _ := context.WithCancel(metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy()))
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, c.proxy.FullMethodName()) clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, c.proxy.FullMethodName())
@@ -255,11 +254,31 @@ func (c *Context) reset() {
pool.Put(c) pool.Put(c)
} }
func (c *Context) dial(address string) (*grpc.ClientConn, error) { //func (c *Context) dial(address string, timeout time.Duration) (*grpc.ClientConn, error) {
p, has := clientPool.Get(address, c.tls) // return clientPool.Get(address, c.tls, c.proxy.Host()).Get()
if !has { //
p = NewClientPoolWithOption(address, &defaultClientOption) // //opts := make([]grpc.DialOption, 0, 5)
defer clientPool.Set(address, c.tls, p) // //if c.tls {
} // // opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: c.insecureCertificateVerify})))
return p.Get() // //} else {
} // // opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
// //}
// //opts = append(opts,
// // //grpc.WithBlock(),
// // grpc.WithKeepaliveParams(keepalive.ClientParameters{
// // Time: defaultKeepAlive,
// // Timeout: defaultKeepAliveTimeout,
// // }),
// //)
// //
// //if c.proxy.Host() != "" {
// // opts = append(opts, grpc.WithAuthority(c.proxy.Host()))
// //}
// ////ctx, cancel := context.WithTimeout(context.TODO(), timeout)
// ////defer cancel()
// //conn, err := grpc.Dial(address, opts...)
// //if err != nil {
// // return nil, err
// //}
// //return conn, nil
//}

View File

@@ -29,6 +29,7 @@ var (
type ClientOption struct { type ClientOption struct {
ClientPoolConnSize int ClientPoolConnSize int
IsTls bool IsTls bool
Authority string
DialTimeOut time.Duration DialTimeOut time.Duration
KeepAlive time.Duration KeepAlive time.Duration
KeepAliveTimeout time.Duration KeepAliveTimeout time.Duration
@@ -120,18 +121,17 @@ func (cc *ClientPool) connect() (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} }
opts = append(opts, opts = append(opts,
grpc.WithBlock(), //grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: cc.option.KeepAlive, Time: cc.option.KeepAlive,
Timeout: cc.option.KeepAliveTimeout, Timeout: cc.option.KeepAliveTimeout,
}), }),
) )
conn, err := grpc.DialContext(ctx, cc.target, opts...) if cc.option.Authority != "" {
if err != nil { opts = append(opts, grpc.WithAuthority(cc.option.Authority))
return nil, err
} }
return conn, nil return grpc.DialContext(ctx, cc.target, opts...)
} }
func (cc *ClientPool) Close() { func (cc *ClientPool) Close() {