From fd89f3ef88132797b9fe6f888c228efeb97d840b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=AD=9F=E6=9F=B1?= Date: Fri, 12 Nov 2021 16:52:50 +0800 Subject: [PATCH] =?UTF-8?q?upstream=20=E5=AF=B9=E6=8E=A5=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/upstream/upstream-http/config.go | 12 ++-- drivers/upstream/upstream-http/factory.go | 13 ++++ drivers/upstream/upstream-http/handler.go | 77 ++++++++++++++++++++-- drivers/upstream/upstream-http/upstream.go | 68 +++++++++---------- drivers/upstream/upstream-http/worker.go | 54 ++++++++++----- plugin/plugin.go | 39 +++++++++++ upstream/upstream.go | 9 +-- 7 files changed, 207 insertions(+), 65 deletions(-) diff --git a/drivers/upstream/upstream-http/config.go b/drivers/upstream/upstream-http/config.go index 8ca77660..3188cd89 100644 --- a/drivers/upstream/upstream-http/config.go +++ b/drivers/upstream/upstream-http/config.go @@ -2,13 +2,15 @@ package upstream_http import ( "github.com/eolinker/eosc" + "github.com/eolinker/goku/plugin" ) //Config http-service-proxy驱动配置结构体 type Config struct { - Desc string `json:"desc"` - Scheme string `json:"scheme"` - Type string `json:"type"` - Config string `json:"config"` - Discovery eosc.RequireId `json:"discovery" skill:"github.com/eolinker/goku/discovery.discovery.IDiscovery"` + Desc string `json:"desc"` + Scheme string `json:"scheme"` + Type string `json:"type"` + Config string `json:"config"` + Discovery eosc.RequireId `json:"discovery" skill:"github.com/eolinker/goku/discovery.discovery.IDiscovery"` + Plugins map[string]*plugin.Config `json:"plugins"` } diff --git a/drivers/upstream/upstream-http/factory.go b/drivers/upstream/upstream-http/factory.go index 7157829c..4a731cd3 100644 --- a/drivers/upstream/upstream-http/factory.go +++ b/drivers/upstream/upstream-http/factory.go @@ -2,6 +2,10 @@ package upstream_http import ( "reflect" + "sync" + + "github.com/eolinker/eosc/common/bean" + "github.com/eolinker/goku/plugin" round_robin "github.com/eolinker/goku/upstream/round-robin" @@ -9,10 +13,15 @@ import ( ) var name = "upstream_http_proxy" +var ( + pluginManager plugin.IPluginManager + once sync.Once +) //Register 注册http_proxy驱动工厂 func Register(register eosc.IExtenderDriverRegister) { register.RegisterExtenderDriver(name, NewFactory()) + } type factory struct { @@ -26,6 +35,10 @@ func NewFactory() eosc.IExtenderDriverFactory { //Create 创建http_proxy驱动 func (f *factory) Create(profession string, name string, label string, desc string, params map[string]interface{}) (eosc.IExtenderDriver, error) { + + once.Do(func() { + bean.Autowired(&pluginManager) + }) return &driver{ profession: profession, name: name, diff --git a/drivers/upstream/upstream-http/handler.go b/drivers/upstream/upstream-http/handler.go index 7ba38ab9..06baab44 100644 --- a/drivers/upstream/upstream-http/handler.go +++ b/drivers/upstream/upstream-http/handler.go @@ -1,13 +1,82 @@ package upstream_http import ( - http_context "github.com/eolinker/goku/node/http-context" - "github.com/valyala/fasthttp" + "fmt" + "time" + + http_service "github.com/eolinker/eosc/http-service" + "github.com/eolinker/goku/plugin" ) +var _ http_service.IChain = (*UpstreamHandler)(nil) + type UpstreamHandler struct { + id string + upstrem *Upstream + retry int + timeout time.Duration + pluginsSource map[string]*plugin.Config + orgFilter plugin.IPlugin } -func (u *UpstreamHandler) Send(ctx *http_context.Context) (*fasthttp.Response, error) { - panic("implement me") +func NewUpstreamHandler(id string, upstream *Upstream, retry int, timeout time.Duration, pluginsSource map[string]*plugin.Config) *UpstreamHandler { + uh := &UpstreamHandler{ + id: id, + upstrem: upstream, + retry: retry, + timeout: timeout, + pluginsSource: pluginsSource, + orgFilter: nil, + } + uh.reset() + return uh +} + +func (u *UpstreamHandler) reset() { + + configs := u.upstrem.pluginConfig(u.pluginsSource) + + iPlugin := pluginManager.CreateUpstream(u.id, configs) + + u.orgFilter = iPlugin +} + +//Send 请求发送 +func (u *UpstreamHandler) DoChain(ctx http_service.IHttpContext) error { + + var lastErr error + for doTrice := u.retry + 1; doTrice > 0; doTrice-- { + + node, err := u.upstrem.handler.Next() + if err != nil { + return err + } + scheme := node.Scheme() + if scheme != "http" && scheme != "https" { + scheme = u.upstrem.scheme + } + + addr := fmt.Sprintf("%s://%s", scheme, node.Addr()) + filterSender := NewSendAddr(addr, u.timeout) + if u.orgFilter == nil { + lastErr = filterSender.DoFilter(ctx, nil) + } else { + u.orgFilter.Append(filterSender) + } + } + + return lastErr +} + +type SendAddr struct { + timeout time.Duration + addr string +} + +func NewSendAddr(addr string, timeout time.Duration) *SendAddr { + return &SendAddr{timeout: timeout, addr: addr} +} + +func (s *SendAddr) DoFilter(ctx http_service.IHttpContext, next http_service.IChain) (err error) { + return ctx.SendTo(s.addr, s.timeout) } diff --git a/drivers/upstream/upstream-http/upstream.go b/drivers/upstream/upstream-http/upstream.go index 05cc428d..a0c26080 100644 --- a/drivers/upstream/upstream-http/upstream.go +++ b/drivers/upstream/upstream-http/upstream.go @@ -1,62 +1,58 @@ package upstream_http import ( - "fmt" "time" - "github.com/eolinker/goku/plugin" - "github.com/eolinker/goku/upstream" + "github.com/eolinker/eosc" http_service "github.com/eolinker/eosc/http-service" "github.com/eolinker/goku/discovery" + "github.com/eolinker/goku/plugin" "github.com/eolinker/goku/upstream/balance" ) -var ( - _ upstream.IUpstreamCreate = (*Upstream)(nil) -) - type Upstream struct { scheme string app discovery.IApp handler balance.IBalanceHandler - retry int - timeout time.Duration + + handlers eosc.IUntyped + + pluginConf map[string]*plugin.Config } -func (up *Upstream) Create(id string, configs map[string]*plugin.Config) upstream.IUpstream { - panic("implement me") +func (up *Upstream) Create(id string, configs map[string]*plugin.Config, retry int, timeout time.Duration) http_service.IChain { + nh := NewUpstreamHandler(id, up, retry, timeout, configs) + up.handlers.Set(id, nh) + return nh } -func NewUpstream(scheme string, app discovery.IApp, handler balance.IBalanceHandler) *Upstream { - return &Upstream{scheme: scheme, app: app, handler: handler} +func (up *Upstream) pluginConfig(configs map[string]*plugin.Config) map[string]*plugin.Config { + return plugin.MergeConfig(configs, up.pluginConf) +} +func NewUpstream(scheme string, app discovery.IApp, handler balance.IBalanceHandler, pluginConf map[string]*plugin.Config) *Upstream { + return &Upstream{scheme: scheme, app: app, handler: handler, handlers: eosc.NewUntyped(), pluginConf: pluginConf} } -//Send 请求发送,忽略重试 -func (up *Upstream) Send(ctx http_service.IHttpContext) error { +//Reset reset +func (up *Upstream) Reset(scheme string, app discovery.IApp, handler balance.IBalanceHandler, pluginConf map[string]*plugin.Config) { + up.scheme = scheme + up.app = app + up.handler = handler + up.pluginConf = pluginConf - var lastErr error - for doTrice := up.retry + 1; doTrice > 0; doTrice-- { + for _, h := range up.handlers.List() { + hd := h.(*UpstreamHandler) + hd.reset() + } +} - node, err := up.handler.Next() - if err != nil { - return err - } - scheme := node.Scheme() - if scheme != "http" && scheme != "https" { - scheme = up.scheme - } - - addr := fmt.Sprintf("%s://%s", scheme, node.Addr()) - lastErr = ctx.SendTo(addr, up.timeout) - if lastErr != nil { - node.Down() - //处理不可用节点 - up.app.NodeError(node.ID()) - - continue - } +func (up *Upstream) destroy() { + handlers := up.handlers.List() + up.handlers = eosc.NewUntyped() + for _, h := range handlers { + hd := h.(*UpstreamHandler) + hd.orgFilter.Destroy() } - return lastErr } diff --git a/drivers/upstream/upstream-http/worker.go b/drivers/upstream/upstream-http/worker.go index a800944f..9c6cc34e 100644 --- a/drivers/upstream/upstream-http/worker.go +++ b/drivers/upstream/upstream-http/worker.go @@ -3,6 +3,10 @@ package upstream_http import ( "errors" "fmt" + "time" + + http_service "github.com/eolinker/eosc/http-service" + "github.com/eolinker/goku/plugin" "github.com/eolinker/eosc/log" @@ -15,17 +19,27 @@ import ( ) var ( - errorScheme = errors.New("error scheme.only support http-service or https") - ErrorStructType = errors.New("error struct type") - errorCreateWorker = errors.New("fail to create upstream worker") + errorScheme = errors.New("error scheme.only support http-service or https") + ErrorStructType = errors.New("error struct type") + errorCreateWorker = errors.New("fail to create upstream worker") + ErrorUpstreamNotInit = errors.New("upstream not init") ) +var _ upstream.IUpstreamCreate = (*httpUpstream)(nil) //Http org type httpUpstream struct { - *Upstream - id string - name string - desc string + upstream *Upstream + id string + name string + desc string + lastError error +} + +func (h *httpUpstream) Create(id string, configs map[string]*plugin.Config, retry int, time time.Duration) (http_service.IChain, error) { + if h.upstream == nil { + return nil, ErrorUpstreamNotInit + } + return h.upstream.Create(id, configs, retry, time), nil } //Id 返回worker id @@ -43,6 +57,7 @@ func (h *httpUpstream) Reset(conf interface{}, workers map[eosc.RequireId]interf if !ok || cfg == nil { return fmt.Errorf("need %s,now %s:%w", eosc.TypeNameOf((*Config)(nil)), eosc.TypeNameOf(conf), ErrorStructType) } + if factory, has := workers[cfg.Discovery]; has { discoveryFactory, ok := factory.(discovery.IDiscovery) if ok { @@ -58,20 +73,25 @@ func (h *httpUpstream) Reset(conf interface{}, workers map[eosc.RequireId]interf if err != nil { return err } - balanceHandler, err := balanceFactory.Create(h.app) + balanceHandler, err := balanceFactory.Create(app) if err != nil { return err } h.desc = cfg.Desc - old := h.Upstream - h.Upstream = NewUpstream(cfg.Scheme, app, balanceHandler) - closeError := old.app.Close() - if closeError != nil { + if h.upstream == nil { + old := h.upstream.app + h.upstream = NewUpstream(cfg.Scheme, app, balanceHandler, cfg.Plugins) + closeError := old.Close() + if closeError != nil { - log.Warn("close app:", closeError) + log.Warn("close app:", closeError) + } + } else { + h.upstream.Reset(cfg.Scheme, app, balanceHandler, cfg.Plugins) } + return nil } } @@ -80,9 +100,11 @@ func (h *httpUpstream) Reset(conf interface{}, workers map[eosc.RequireId]interf //Stop 停止http_proxy负载,并关闭相应的app func (h *httpUpstream) Stop() error { - if h.Upstream != nil { - h.Upstream.app.Close() - h.Upstream = nil + if h.upstream != nil { + h.upstream.app.Close() + + h.upstream.destroy() + h.upstream = nil } return nil diff --git a/plugin/plugin.go b/plugin/plugin.go index 42ba43e2..aa1c8922 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -16,3 +16,42 @@ type IPluginManager interface { CreateService(id string, conf map[string]*Config) IPlugin CreateUpstream(id string, conf map[string]*Config) IPlugin } + +func MergeConfig(high, low map[string]*Config) map[string]*Config { + if high == nil && low == nil { + return make(map[string]*Config) + } + if high == nil { + return clone(low) + } + if low == nil { + return clone(low) + } + + mv := clone(low) + + for k, hv := range high { + lv, has := mv[k] + if has { + *lv = *hv + } else { + c := new(Config) + *c = *hv + mv[k] = c + } + } + return mv + +} +func clone(v map[string]*Config) map[string]*Config { + cv := make(map[string]*Config) + if v == nil { + return cv + } + for k, v := range v { + c := new(Config) + *c = *v + cv[k] = c + } + return cv +} diff --git a/upstream/upstream.go b/upstream/upstream.go index b69baf97..7a06201e 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -1,9 +1,10 @@ package upstream import ( - http_context "github.com/eolinker/goku/node/http-context" + "time" + + http_service "github.com/eolinker/eosc/http-service" "github.com/eolinker/goku/plugin" - "github.com/valyala/fasthttp" ) //CheckSkill 检测目标技能是否符合 @@ -13,9 +14,9 @@ func CheckSkill(skill string) bool { //IUpstream 实现了负载发送请求方法 type IUpstream interface { - Send(ctx *http_context.Context) (*fasthttp.Response, error) + Send(ctx http_service.IHttpContext, retry int, timeout time.Duration) error } type IUpstreamCreate interface { - Create(id string, configs map[string]*plugin.Config) IUpstream + Create(id string, configs map[string]*plugin.Config, retry int, time time.Duration) (http_service.IChain, error) }