diff --git a/app/apinto/worker.go b/app/apinto/worker.go index d7d5f4f4..6bee2a94 100644 --- a/app/apinto/worker.go +++ b/app/apinto/worker.go @@ -19,6 +19,7 @@ import ( circuit_breaker "github.com/eolinker/apinto/drivers/plugins/circuit-breaker" "github.com/eolinker/apinto/drivers/plugins/cors" extra_params "github.com/eolinker/apinto/drivers/plugins/extra-params" + grpc_proxy_rewrite "github.com/eolinker/apinto/drivers/plugins/grpc-proxy-rewrite" "github.com/eolinker/apinto/drivers/plugins/gzip" ip_restriction "github.com/eolinker/apinto/drivers/plugins/ip-restriction" "github.com/eolinker/apinto/drivers/plugins/monitor" @@ -126,4 +127,6 @@ func Register(extenderRegister eosc.IExtenderDriverRegister) { fuse.Register(extenderRegister) fuse_strategy.Register(extenderRegister) + grpc_proxy_rewrite.Register(extenderRegister) + } diff --git a/drivers/plugins/grpc-proxy-rewrite/config.go b/drivers/plugins/grpc-proxy-rewrite/config.go new file mode 100644 index 00000000..563a5e27 --- /dev/null +++ b/drivers/plugins/grpc-proxy-rewrite/config.go @@ -0,0 +1,10 @@ +package grpc_proxy_rewrite + +type Config struct { + Service string `json:"service" label:"服务名称"` + Method string `json:"method" label:"方法名称"` + Authority string `json:"authority" label:"虚拟主机域名(Authority)"` + Headers map[string]string `json:"headers" label:"请求头部"` + Tls bool `json:"tls" label:"TLS传输"` + SkipCertificate bool `json:"skip_certificate" label:"跳过证书检查"` +} diff --git a/drivers/plugins/grpc-proxy-rewrite/driver.go b/drivers/plugins/grpc-proxy-rewrite/driver.go new file mode 100644 index 00000000..3c6b59f2 --- /dev/null +++ b/drivers/plugins/grpc-proxy-rewrite/driver.go @@ -0,0 +1,31 @@ +package grpc_proxy_rewrite + +import ( + "strings" + + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +func check(v interface{}) (*Config, error) { + conf, err := drivers.Assert[Config](v) + if err != nil { + return nil, err + } + conf.Authority = strings.TrimSpace(conf.Authority) + + return conf, nil +} + +func Create(id, name string, conf *Config, workers map[eosc.RequireId]eosc.IWorker) (eosc.IWorker, error) { + + pw := &ProxyRewrite{ + WorkerBase: drivers.Worker(id, name), + host: strings.TrimSpace(conf.Authority), + headers: conf.Headers, + tls: conf.Tls, + skipCertificate: conf.SkipCertificate, + } + + return pw, nil +} diff --git a/drivers/plugins/grpc-proxy-rewrite/factory.go b/drivers/plugins/grpc-proxy-rewrite/factory.go new file mode 100644 index 00000000..106fa1fc --- /dev/null +++ b/drivers/plugins/grpc-proxy-rewrite/factory.go @@ -0,0 +1,18 @@ +package grpc_proxy_rewrite + +import ( + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" +) + +const ( + Name = "grpc-proxy_write" +) + +func Register(register eosc.IExtenderDriverRegister) { + register.RegisterExtenderDriver(Name, NewFactory()) +} + +func NewFactory() eosc.IExtenderDriverFactory { + return drivers.NewFactory[Config](Create) +} diff --git a/drivers/plugins/grpc-proxy-rewrite/proxy-rewrite.go b/drivers/plugins/grpc-proxy-rewrite/proxy-rewrite.go new file mode 100644 index 00000000..0cf890b7 --- /dev/null +++ b/drivers/plugins/grpc-proxy-rewrite/proxy-rewrite.go @@ -0,0 +1,77 @@ +package grpc_proxy_rewrite + +import ( + "strings" + + grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" + + "github.com/eolinker/apinto/drivers" + "github.com/eolinker/eosc" + "github.com/eolinker/eosc/eocontext" + http_service "github.com/eolinker/eosc/eocontext/http-context" +) + +var _ eocontext.IFilter = (*ProxyRewrite)(nil) +var _ grpc_context.GrpcFilter = (*ProxyRewrite)(nil) + +var ( + regexpErrInfo = `[plugin proxy-rewrite2 config err] Compile regexp fail. err regexp: %s ` + notMatchErrInfo = `[plugin proxy-rewrite2 err] Proxy path rewrite fail. Request path can't match any rewrite-path. request path: %s ` +) + +type ProxyRewrite struct { + drivers.WorkerBase + + host string + headers map[string]string + tls bool + skipCertificate bool +} + +func (p *ProxyRewrite) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) { + return grpc_context.DoGrpcFilter(p, ctx, next) +} + +func (p *ProxyRewrite) DoGrpcFilter(ctx grpc_context.IGrpcContext, next eocontext.IChain) (err error) { + if p.host != "" { + ctx.Proxy().SetHost(p.host) + } + ctx.EnableTls(p.tls) + ctx.InsecureCertificateVerify(p.skipCertificate) + for key, value := range p.headers { + ctx.Proxy().Headers().Set(key, value) + } + if next != nil { + return next.DoChain(ctx) + } + return nil +} + +func (p *ProxyRewrite) Start() error { + return nil +} + +func (p *ProxyRewrite) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error { + conf, err := check(v) + if err != nil { + return err + } + p.skipCertificate = conf.SkipCertificate + p.headers = conf.Headers + p.tls = conf.Tls + p.host = strings.TrimSpace(conf.Authority) + return nil +} + +func (p *ProxyRewrite) Stop() error { + return nil +} + +func (p *ProxyRewrite) Destroy() { + + p.headers = nil +} + +func (p *ProxyRewrite) CheckSkill(skill string) bool { + return http_service.FilterSkillName == skill +} diff --git a/drivers/router/grpc-router/manager/complete.go b/drivers/router/grpc-router/manager/complete.go index b750d54f..fcf45c23 100644 --- a/drivers/router/grpc-router/manager/complete.go +++ b/drivers/router/grpc-router/manager/complete.go @@ -42,7 +42,6 @@ func (h *Complete) Complete(org eocontext.EoContext) error { ctx.Response().SetErr(lastErr) ctx.SetLabel("handler", "proxy") }() - timeOut := app.TimeOut() for index := 0; index <= h.retry; index++ { diff --git a/node/grpc-context/context.go b/node/grpc-context/context.go index 7ca49cce..613e9014 100644 --- a/node/grpc-context/context.go +++ b/node/grpc-context/context.go @@ -203,6 +203,15 @@ func (c *Context) Invoke(address string, timeout time.Duration) error { if err != nil { return err } + passHost, targetHost := c.GetUpstreamHostHandler().PassHost() + switch passHost { + case eocontext.PassHost: + + case eocontext.NodeHost: + c.proxy.Headers().Set(":authority", address) + case eocontext.ReWriteHost: + c.proxy.Headers().Set(":authority", targetHost) + } c.proxy.Headers().Set("grpc-timeout", fmt.Sprintf("%dn", timeout)) clientCtx, _ := context.WithCancel(metadata.NewOutgoingContext(c.Context(), c.proxy.Headers().Copy())) clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, clientConn, c.proxy.FullMethodName()) diff --git a/node/grpc-context/pool.go b/node/grpc-context/pool.go index 19face6b..ad9f4c83 100644 --- a/node/grpc-context/pool.go +++ b/node/grpc-context/pool.go @@ -126,7 +126,6 @@ func (cc *ClientPool) connect() (*grpc.ClientConn, error) { Timeout: cc.option.KeepAliveTimeout, }), ) - conn, err := grpc.DialContext(ctx, cc.target, opts...) if err != nil { return nil, err diff --git a/node/grpc-context/request.go b/node/grpc-context/request.go index 92724c2f..a29b97b7 100644 --- a/node/grpc-context/request.go +++ b/node/grpc-context/request.go @@ -30,6 +30,10 @@ type Request struct { realIP string } +func (r *Request) SetHost(s string) { + r.host = s +} + func (r *Request) SetService(service string) { r.service = service } diff --git a/router/grpc-router/matcher.go b/router/grpc-router/matcher.go index 6df3532f..d64b8ea4 100644 --- a/router/grpc-router/matcher.go +++ b/router/grpc-router/matcher.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" "strconv" + "strings" grpc_context "github.com/eolinker/eosc/eocontext/grpc-context" @@ -30,7 +31,11 @@ func newHostMatcher(children map[string]router.IMatcher) router.IMatcher { children: children, name: "host", read: func(port int, request grpc_context.IRequest) (string, bool) { - return request.Host(), true + orgHost := request.Host() + if i := strings.Index(orgHost, ":"); i > 0 { + return orgHost[:i], true + } + return orgHost, true }, } }