diff --git a/application/app.go b/application/app.go index a6bd4ac0..a08901b9 100644 --- a/application/app.go +++ b/application/app.go @@ -24,7 +24,6 @@ func init() { type IApp interface { Id() string - Name() string Labels() map[string]string Disable() bool IAppExecutor diff --git a/application/auth/factory.go b/application/auth/factory.go index 746175f3..0dc7bbd7 100644 --- a/application/auth/factory.go +++ b/application/auth/factory.go @@ -37,7 +37,7 @@ type IAuthFactoryRegister interface { //driverRegister 驱动注册器 type driverRegister struct { - register eosc.IRegister + register eosc.IRegister[IAuthFactory] keys []string driverAlias map[string]string render map[string]interface{} @@ -83,7 +83,7 @@ func (dm *driverRegister) ReadOnly() bool { //newAuthFactoryManager 创建auth工厂管理器 func newAuthFactoryManager() *driverRegister { return &driverRegister{ - register: eosc.NewRegister(), + register: eosc.NewRegister[IAuthFactory](), keys: make([]string, 0, 10), driverAlias: make(map[string]string), render: map[string]interface{}{}, @@ -92,12 +92,7 @@ func newAuthFactoryManager() *driverRegister { //GetFactoryByKey 获取指定auth工厂 func (dm *driverRegister) GetFactoryByKey(key string) (IAuthFactory, bool) { - o, has := dm.register.Get(key) - if has { - f, ok := o.(IAuthFactory) - return f, ok - } - return nil, false + return dm.register.Get(key) } //RegisterFactoryByKey 注册auth工厂 diff --git a/application/user.go b/application/user.go index 1dc2a9f0..e29bae9a 100644 --- a/application/user.go +++ b/application/user.go @@ -41,8 +41,8 @@ type IUserManager interface { type UserManager struct { // users map[string]IUser - users eosc.IUntyped - connApp eosc.IUntyped + users eosc.Untyped[string, *UserInfo] + connApp eosc.Untyped[string, []string] } func (u *UserManager) Check(appID string, driver string, users []IUser) error { @@ -67,16 +67,12 @@ func (u *UserManager) Count() int { } func (u *UserManager) List() []*UserInfo { - users := u.users.List() - us := make([]*UserInfo, 0, len(users)) - for _, user := range users { - us = append(us, user.(*UserInfo)) - } - return us + return u.users.List() + } func NewUserManager() *UserManager { - return &UserManager{users: eosc.NewUntyped(), connApp: eosc.NewUntyped()} + return &UserManager{users: eosc.BuildUntyped[string, *UserInfo](), connApp: eosc.BuildUntyped[string, []string]()} } func (u *UserManager) Get(name string) (*UserInfo, bool) { @@ -84,21 +80,16 @@ func (u *UserManager) Get(name string) (*UserInfo, bool) { } func (u *UserManager) get(name string) (*UserInfo, bool) { - user, has := u.users.Get(name) - if !has { - return nil, false - } - - return user.(*UserInfo), true + return u.users.Get(name) } func (u *UserManager) Set(appID string, users []*UserInfo) { - userMap := make(map[string]bool) + userMap := make(map[string]struct{}) names, has := u.getByAppID(appID) if has { for _, name := range names { - userMap[name] = true + userMap[name] = struct{}{} } } @@ -130,19 +121,11 @@ func (u *UserManager) DelByAppID(appID string) { } func (u *UserManager) delByAppID(appID string) ([]string, bool) { - names, has := u.connApp.Del(appID) - if !has { - return nil, false - } - return names.([]string), true + return u.connApp.Del(appID) } func (u *UserManager) getByAppID(appID string) ([]string, bool) { - names, has := u.connApp.Get(appID) - if !has { - return nil, false - } - return names.([]string), true + return u.connApp.Get(appID) } type Auth struct { diff --git a/build/resources/Dockerfile b/build/resources/Dockerfile index b982afc7..7a72c74a 100644 --- a/build/resources/Dockerfile +++ b/build/resources/Dockerfile @@ -1,23 +1,25 @@ # 名称:apinto镜像,携带了部署k8s集群所需要的脚本 # 创建时间:2022-3-30 -FROM centos:latest +FROM centos:7.9.2009 MAINTAINER eolink #声明端口 -EXPOSE 9400 8080 +EXPOSE 9400 8099 #定义数据卷 VOLUME /var/lib/apinto #设置环境变量 -ENV APP=APINTO +ENV APP=apinto #解压网关程序压缩包 COPY ./apinto.linux.x64.tar.gz / RUN tar -zxvf apinto.linux.x64.tar.gz && rm -rf apinto.linux.x64.tar.gz -#修改脚本权限以及复制程序默认配置文件 -RUN chmod 777 /apinto/start.sh && chmod 777 /apinto/join.sh && chmod 777 /apinto/leave.sh && cp /apinto/config.yml.tmp /apinto/config.yml +#复制程序默认配置文件以及修改脚本权限 +RUN mkdir /etc/apinto +RUN cp /apinto/apinto.yml.tpl /etc/apinto/apinto.yml && cp /apinto/config.yml.tpl /etc/apinto/config.yml +RUN chmod 777 /apinto/start.sh && chmod 777 /apinto/join.sh && chmod 777 /apinto/leave.sh WORKDIR /apinto diff --git a/build/resources/apinto.yml b/build/resources/apinto.yml deleted file mode 100644 index d490fb5a..00000000 --- a/build/resources/apinto.yml +++ /dev/null @@ -1,29 +0,0 @@ -# 数据文件放置目录 -# data_dir: /var/lib/goku - -# pid文件放置地址 -# pid_dir: /var/run/goku/ - -# 日志放置目录 -# log_dir: /var/log/goku - -# socket放置目录 -# socket_dir: /tmp/goku - -# goku运行配置地址 -# config: /etc/goku/config.yml - -# 扩展仓库目录 -# extends_dir: /var/lib/goku/extenders/ - -# 错误日志文件名 -# error_log_name: error.log - -# 错误日志等级 -# error_log_level: error - -# 错误日志过期时间,默认单位为天,d|天,h|小时 -# error_log_expire: 7d - -# 错误日志切割周期,仅支持day、hour -# error_log_period: day \ No newline at end of file diff --git a/build/resources/apinto.yml.tpl b/build/resources/apinto.yml.tpl new file mode 100644 index 00000000..2d9b4b7d --- /dev/null +++ b/build/resources/apinto.yml.tpl @@ -0,0 +1,29 @@ +# 数据文件放置目录 +data_dir: /var/lib/apinto + +# pid文件放置地址 +pid_dir: /var/run/apinto/ + +# 日志放置目录 +log_dir: /var/log/apinto + +# socket放置目录 +socket_dir: /tmp/apinto + +# apinto运行配置地址 +config: /etc/apinto/config.yml + +# 扩展仓库目录 +extends_dir: /var/lib/apinto/extenders/ + +# 错误日志文件名 +error_log_name: error.log + +# 错误日志等级 +error_log_level: error + +# 错误日志过期时间,默认单位为天,d|天,h|小时 +error_log_expire: 7d + +# 错误日志切割周期,仅支持day、hour +error_log_period: day \ No newline at end of file diff --git a/build/resources/config.yml.tmp b/build/resources/config.yml.tmp deleted file mode 100644 index f01a5a56..00000000 --- a/build/resources/config.yml.tmp +++ /dev/null @@ -1,7 +0,0 @@ -listen: # node listen port - - 8080 - -admin: # openAPI request info - scheme: http # listen scheme - listen: 9400 # listen port - ip: 0.0.0.0 # listen ip diff --git a/build/resources/config.yml.tpl b/build/resources/config.yml.tpl new file mode 100644 index 00000000..915de576 --- /dev/null +++ b/build/resources/config.yml.tpl @@ -0,0 +1,15 @@ +listen: # node listen port + - 8099 + +admin: # openAPI request info + scheme: http # listen scheme + listen: 9400 # listen port + ip: 0.0.0.0 # listen ip +#ssl: +# listen: +# - port: 443 #https端口 +# certificate: # 不配表示使用所有 cert_dir中的证书,默认pem文件后缀为pem,key后缀为key +# - cert: cert.pem +# key: cert.key +#certificate: +# dir: ./cert # 证书文件目录,不填则默认从cert目录下载 \ No newline at end of file diff --git a/build/resources/install.sh b/build/resources/install.sh new file mode 100644 index 00000000..3573927f --- /dev/null +++ b/build/resources/install.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -e + +CURRENT_PATH="$(pwd)" + +install() { + mkdir -p /etc/apinto + + #将模板配置文件复制到/etc/apinto目录, 若已存在则不覆盖 + cp -in ./apinto.yml.tpl /etc/apinto/apinto.yml + cp -in ./config.yml.tpl /etc/apinto/config.yml + + #将程序链接至/usr/sbin + ln -snf $CURRENT_PATH/apinto /usr/sbin/apinto +} + +upgrade() { + apinto stop + install + apinto start +} + +case "$1" in + install) + install + exit 0 + ;; + upgrade) + upgrade + exit 0 + ;; + **) + echo "Usage: $0 {install|upgrade} " 1>&2 + exit 1 + ;; +esac \ No newline at end of file diff --git a/discovery/node.go b/discovery/node.go index bd88a05f..fe9d76bc 100644 --- a/discovery/node.go +++ b/discovery/node.go @@ -85,39 +85,11 @@ type INodesData interface { } type NodesData struct { - data eosc.IUntyped + eosc.Untyped[string, map[string]INode] } -func NewNodesData() *NodesData { - return &NodesData{data: eosc.NewUntyped()} -} - -func (n *NodesData) Get(name string) (map[string]INode, bool) { - nodes, has := n.data.Get(name) - if !has { - return nil, false - } - ns, ok := nodes.(map[string]INode) - if !ok { - return nil, false - } - return ns, true -} - -func (n *NodesData) Set(name string, nodes map[string]INode) { - n.data.Set(name, nodes) -} - -func (n *NodesData) Del(name string) (map[string]INode, bool) { - nodes, has := n.data.Del(name) - if !has { - return nil, false - } - ns, ok := nodes.(map[string]INode) - if !ok { - return nil, false - } - return ns, true +func NewNodesData() INodesData { + return &NodesData{Untyped: eosc.BuildUntyped[string, map[string]INode]()} } type Nodes map[string]INode diff --git a/discovery/services.go b/discovery/services.go index c4d6f204..1108a20c 100644 --- a/discovery/services.go +++ b/discovery/services.go @@ -4,24 +4,22 @@ import ( "github.com/eolinker/eosc" ) +type serviceHandler eosc.Untyped[string, IApp] type services struct { - apps eosc.IUntyped - appNameOfID eosc.IUntyped + apps eosc.Untyped[string, serviceHandler] + appNameOfID eosc.Untyped[string, string] } //NewServices 创建服务发现的服务app集合 func NewServices() IServices { - return &services{apps: eosc.NewUntyped(), appNameOfID: eosc.NewUntyped()} + return &services{apps: eosc.BuildUntyped[string, serviceHandler](), appNameOfID: eosc.BuildUntyped[string, string]()} } //get 获取对应服务名的节点列表 -func (s *services) get(serviceName string) (eosc.IUntyped, bool) { +func (s *services) get(serviceName string) (serviceHandler, bool) { v, ok := s.apps.Get(serviceName) - if !ok { - return nil, ok - } - apps, ok := v.(eosc.IUntyped) - return apps, ok + + return v, ok } //func (s *services) GetStatus(serviceName string) (IApp, bool) { @@ -43,7 +41,7 @@ func (s *services) Set(serviceName string, id string, app IApp) error { apps.Set(id, app) return nil } - apps := eosc.NewUntyped() + apps := eosc.BuildUntyped[string, IApp]() apps.Set(id, app) s.apps.Set(serviceName, apps) return nil @@ -51,9 +49,8 @@ func (s *services) Set(serviceName string, id string, app IApp) error { //Remove 将目标app从其对应服务的app列表中删除,传入值为目标app的id func (s *services) Remove(appID string) (string, int) { - v, has := s.appNameOfID.Del(appID) + name, has := s.appNameOfID.Del(appID) if has { - name := v.(string) apps, ok := s.get(name) if ok { apps.Del(appID) @@ -67,11 +64,8 @@ func (s *services) Remove(appID string) (string, int) { //Update 更新目标服务所有app的节点列表 func (s *services) Update(serviceName string, nodes Nodes) error { if apps, ok := s.get(serviceName); ok { - for _, r := range apps.List() { - v, ok := r.(IApp) - if ok { - v.Reset(nodes) - } + for _, v := range apps.List() { + v.Reset(nodes) } } return nil diff --git a/drivers/app/app.go b/drivers/app/app.go index 914f0282..96d2eb2f 100644 --- a/drivers/app/app.go +++ b/drivers/app/app.go @@ -1,16 +1,17 @@ package app import ( - http_service "github.com/eolinker/eosc/eocontext/http-context" + "errors" "github.com/eolinker/apinto/application" "github.com/eolinker/apinto/application/auth" "github.com/eolinker/eosc" + http_service "github.com/eolinker/eosc/eocontext/http-context" ) type app struct { - id string - name string + id string + //name string driverIDs []string config *Config executor application.IAppExecutor @@ -23,10 +24,6 @@ func (a *app) Execute(ctx http_service.IHttpContext) error { return a.executor.Execute(ctx) } -func (a *app) Name() string { - return a.name -} - func (a *app) Labels() map[string]string { if a.config == nil { return nil @@ -70,14 +67,20 @@ func (a *app) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) e } func (a *app) set(cfg *Config) error { - filters, users, err := createFilters(a.id, cfg.Auth) - if err != nil { - return err + + if cfg.Anonymous { + anonymousApp := appManager.AnonymousApp() + if anonymousApp != nil && anonymousApp.Id() != a.id { + return errors.New("anonymous app is already exists") + } + appManager.SetAnonymousApp(a) + } else { + filters, users, err := createFilters(a.id, cfg.Auth) + if err != nil { + return err + } + appManager.Set(a, filters, users) } - - //cfg.Labels["application"] = strings.TrimSuffix(app., "@app") - - appManager.Set(a, filters, users) e := newExecutor() e.append(newAdditionalParam(cfg.Additional)) a.executor = e diff --git a/drivers/app/config.go b/drivers/app/config.go index 4eaeaa09..c933db0c 100644 --- a/drivers/app/config.go +++ b/drivers/app/config.go @@ -14,10 +14,11 @@ import ( //Config App驱动配置 type Config struct { - Labels map[string]string `json:"labels" label:"应用标签"` + Anonymous bool `json:"anonymous" label:"匿名" ` Disable bool `json:"disable" label:"是否禁用"` Additional []*Additional `json:"additional" label:"额外参数"` - Auth []*Auth `json:"auth" label:"鉴权列表" eotype:"interface"` + Auth []*Auth `json:"auth" label:"鉴权列表" eotype:"interface" switch:"anonymous===false"` + Labels map[string]string `json:"labels" label:"应用标签"` } type Auth struct { diff --git a/drivers/app/driver.go b/drivers/app/driver.go index 5a58f783..a54df971 100644 --- a/drivers/app/driver.go +++ b/drivers/app/driver.go @@ -51,7 +51,7 @@ func (d *driver) Create(id, name string, v interface{}, workers map[eosc.Require } err = a.set(cfg) - return a, nil + return a, err } func checkConfig(v interface{}) (*Config, error) { @@ -59,6 +59,9 @@ func checkConfig(v interface{}) (*Config, error) { if !ok { return nil, errorConfigType } + if conf.Anonymous && len(conf.Auth) > 0 { + return nil, errors.New("it is anonymous app,auths should be empty") + } for _, a := range conf.Auth { err := application.CheckPosition(a.Position) if err != nil { @@ -71,5 +74,6 @@ func checkConfig(v interface{}) (*Config, error) { return nil, err } } + return conf, nil } diff --git a/drivers/app/manager/manager.go b/drivers/app/manager/manager.go index 66414a23..39255b18 100644 --- a/drivers/app/manager/manager.go +++ b/drivers/app/manager/manager.go @@ -19,36 +19,35 @@ type IManager interface { Set(app application.IApp, filters []application.IAuth, users map[string][]application.ITransformConfig) Del(appID string) Count() int + AnonymousApp() application.IApp + SetAnonymousApp(app application.IApp) } type Manager struct { // filters map[string]application.IAuthUser - filters eosc.IUntyped + eosc.Untyped[string, application.IAuth] appManager *AppManager driverAlias map[string]string drivers []string locker sync.RWMutex + app application.IApp } -func (m *Manager) Count() int { - return m.filters.Count() +func (m *Manager) AnonymousApp() application.IApp { + m.locker.RLock() + app := m.app + m.locker.RUnlock() + return app } -func NewManager(driverAlias map[string]string, drivers []string) *Manager { - return &Manager{filters: eosc.NewUntyped(), appManager: NewAppManager(), driverAlias: driverAlias, drivers: drivers} +func (m *Manager) SetAnonymousApp(app application.IApp) { + m.locker.Lock() + m.app = app + m.locker.Unlock() } -func (m *Manager) Get(id string) (application.IAuth, bool) { - return m.get(id) -} - -func (m *Manager) get(id string) (application.IAuth, bool) { - filter, has := m.filters.Get(id) - if !has { - return nil, false - } - f, ok := filter.(application.IAuth) - return f, ok +func NewManager(driverAlias map[string]string, drivers []string) IManager { + return &Manager{Untyped: eosc.BuildUntyped[string, application.IAuth](), appManager: NewAppManager(), driverAlias: driverAlias, drivers: drivers} } func (m *Manager) List() []application.IAuthUser { @@ -71,7 +70,7 @@ func (m *Manager) getByDriver(driver string) []application.IAuthUser { ids := m.appManager.GetByDriver(driver) filters := make([]application.IAuthUser, 0, len(ids)) for _, id := range ids { - filter, has := m.get(id) + filter, has := m.Get(id) if has { filters = append(filters, filter) } @@ -80,30 +79,18 @@ func (m *Manager) getByDriver(driver string) []application.IAuthUser { } func (m *Manager) all() []application.IAuthUser { - keys := m.filters.Keys() - filters := make([]application.IAuthUser, 0, 10*len(keys)) - for _, key := range keys { - filter, has := m.filters.Get(key) - if !has { - continue - } - f, ok := filter.(application.IAuthUser) - if !ok { - continue - } - filters = append(filters, f) + list := m.List() + filters := make([]application.IAuthUser, 0, len(list)) + for _, filter := range list { + filters = append(filters, filter) } return filters } -func (m *Manager) All() []application.IAuthUser { - return m.all() -} - func (m *Manager) Set(app application.IApp, filters []application.IAuth, users map[string][]application.ITransformConfig) { idMap := make(map[string][]string) for _, filter := range filters { - f, has := m.get(filter.ID()) + f, has := m.Untyped.Get(filter.ID()) if !has { f = filter } @@ -112,7 +99,7 @@ func (m *Manager) Set(app application.IApp, filters []application.IAuth, users m continue } f.Set(app, v) - m.filters.Set(filter.ID(), filter) + m.Untyped.Set(filter.ID(), filter) if _, ok := idMap[filter.Driver()]; !ok { idMap[filter.Driver()] = make([]string, 0, len(filters)) } @@ -128,11 +115,11 @@ func (m *Manager) Set(app application.IApp, filters []application.IAuth, users m func (m *Manager) Del(appID string) { ids := m.appManager.GetByAppID(appID) for _, id := range ids { - filter, has := m.get(id) + filter, has := m.Untyped.Get(id) if has { filter.Del(appID) if filter.UserCount() == 0 { - m.filters.Del(id) + m.Untyped.Del(id) } } } diff --git a/drivers/discovery/consul/consul.go b/drivers/discovery/consul/consul.go index d50dc2fc..25dcf8cc 100644 --- a/drivers/discovery/consul/consul.go +++ b/drivers/discovery/consul/consul.go @@ -88,9 +88,9 @@ func (c *consul) Stop() error { func (c *consul) Remove(id string) error { c.locker.Lock() defer c.locker.Unlock() - name, count := c.services.Remove(id) + n, count := c.services.Remove(id) if count == 0 { - c.nodes.Del(name) + c.nodes.Del(n) } return nil } diff --git a/drivers/plugin-manager/manager.go b/drivers/plugin-manager/manager.go index 925f40c3..21817c7f 100644 --- a/drivers/plugin-manager/manager.go +++ b/drivers/plugin-manager/manager.go @@ -25,7 +25,7 @@ type PluginManager struct { name string extenderDrivers eosc.IExtenderDrivers plugins Plugins - pluginObjs eosc.IUntyped + pluginObjs eosc.Untyped[string, *PluginObj] workers eosc.IWorkers } @@ -82,12 +82,7 @@ func (p *PluginManager) Reset(conf interface{}) error { p.plugins = plugins list := p.pluginObjs.List() // 遍历,全量更新 - for _, obj := range list { - v, ok := obj.(*PluginObj) - if !ok { - - continue - } + for _, v := range list { v.fs = p.createFilters(v.conf) } @@ -142,10 +137,10 @@ func (p *PluginManager) createChain(id string, conf map[string]*plugin.Config) * obj = NewPluginObj(chain, id, conf) p.pluginObjs.Set(id, obj) } else { - obj.(*PluginObj).fs = chain + obj.fs = chain } log.Debug("create chain len: ", len(chain)) - return obj.(*PluginObj) + return obj } func (p *PluginManager) check(conf interface{}) (Plugins, error) { @@ -185,7 +180,7 @@ func NewPluginManager() *PluginManager { pm := &PluginManager{ name: "plugin", plugins: make(Plugins, 0), - pluginObjs: eosc.NewUntyped(), + pluginObjs: eosc.BuildUntyped[string, *PluginObj](), } log.Debug("autowired extenderDrivers") bean.Autowired(&pm.extenderDrivers) diff --git a/drivers/plugins/app/app.go b/drivers/plugins/app/app.go index 71f87a77..a314d6b5 100644 --- a/drivers/plugins/app/app.go +++ b/drivers/plugins/app/app.go @@ -67,6 +67,11 @@ func (a *App) auth(ctx http_service.IHttpContext) error { return user.App.Execute(ctx) } } + if app := appManager.AnonymousApp(); app != nil && !app.Disable() { + setLabels(ctx, app.Labels()) + ctx.SetLabel("application", app.Id()) + return app.Execute(ctx) + } return errors.New("invalid user") } diff --git a/drivers/plugins/cors/cors.go b/drivers/plugins/cors/cors.go index 3f45b318..938599ba 100644 --- a/drivers/plugins/cors/cors.go +++ b/drivers/plugins/cors/cors.go @@ -6,6 +6,7 @@ import ( "github.com/eolinker/eosc" "github.com/eolinker/eosc/eocontext" http_service "github.com/eolinker/eosc/eocontext/http-context" + "net/http" "strconv" "strings" ) @@ -30,7 +31,7 @@ func (c *CorsFilter) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (e } func (c *CorsFilter) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) { - if ctx.Request().Method() == "OPTION" { + if ctx.Request().Method() == http.MethodOptions { return c.doOption(ctx) } err = c.doFilter(ctx) @@ -125,17 +126,17 @@ func (c *CorsFilter) Start() error { } func (c *CorsFilter) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error { -cfg, err := c.check(conf) -if err != nil { -return err -} -c.option = cfg.genOptionHandler() -c.originChecker = NewChecker(cfg.AllowOrigins, "Access-Control-Allow-Origin") -c.methodChecker = NewChecker(cfg.AllowMethods, "Access-Control-Allow-Methods") -c.headerChecker = NewChecker(cfg.AllowHeaders, "Access-Control-Allow-Headers") -c.exposeChecker = NewChecker(cfg.ExposeHeaders, "Access-Control-Expose-Headers") -c.allowCredentials = cfg.AllowCredentials -return nil + cfg, err := c.check(conf) + if err != nil { + return err + } + c.option = cfg.genOptionHandler() + c.originChecker = NewChecker(cfg.AllowOrigins, "Access-Control-Allow-Origin") + c.methodChecker = NewChecker(cfg.AllowMethods, "Access-Control-Allow-Methods") + c.headerChecker = NewChecker(cfg.AllowHeaders, "Access-Control-Allow-Headers") + c.exposeChecker = NewChecker(cfg.ExposeHeaders, "Access-Control-Expose-Headers") + c.allowCredentials = cfg.AllowCredentials + return nil } func (c *CorsFilter) Stop() error { diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index cfaabd55..18e16e79 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -109,15 +109,15 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, iCa return nil } -type CacheGetCompleteHandler struct { +type CacheCompleteHandler struct { orgHandler eocontext.CompleteHandler validTime int uri string cache resources.ICache } -func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheGetCompleteHandler { - return &CacheGetCompleteHandler{ +func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheCompleteHandler { + return &CacheCompleteHandler{ orgHandler: orgHandler, validTime: validTime, uri: uri, @@ -125,7 +125,7 @@ func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime } } -func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error { +func (c *CacheCompleteHandler) Complete(ctx eocontext.EoContext) error { if c.orgHandler != nil { if err := c.orgHandler.Complete(ctx); err != nil { diff --git a/drivers/strategy/fuse-strategy/actuator.go b/drivers/strategy/fuse-strategy/actuator.go index 63da7934..3a1ad68e 100644 --- a/drivers/strategy/fuse-strategy/actuator.go +++ b/drivers/strategy/fuse-strategy/actuator.go @@ -5,14 +5,21 @@ import ( "github.com/eolinker/apinto/resources" "github.com/eolinker/eosc/eocontext" http_service "github.com/eolinker/eosc/eocontext/http-context" + "github.com/eolinker/eosc/log" "sort" + "strconv" "sync" + "time" ) var ( actuatorSet ActuatorSet ) +const ( + fuseStatusTime = time.Minute * 30 +) + func init() { actuator := newtActuator() actuatorSet = actuator @@ -81,7 +88,10 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac if !handler.filter.Check(httpCtx) { continue } - if handler.Fusing(ctx, cache) { + + metrics := handler.rule.metric.Metrics(ctx) + + if handler.IsFuse(ctx.Context(), metrics, cache) { res := handler.rule.response httpCtx.Response().SetStatus(res.statusCode, "") for _, h := range res.headers { @@ -90,38 +100,129 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac httpCtx.Response().SetHeader("Content-Type", fmt.Sprintf("%s; charset=%s", res.contentType, res.charset)) httpCtx.Response().SetBody([]byte(res.body)) return nil + } else { + ctx.SetFinish(newFuseFinishHandler(ctx.GetFinish(), cache, handler, metrics)) + break } + } if next != nil { - return next.DoChain(ctx) + if err = next.DoChain(ctx); err != nil { + return err + } } + return nil } -func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error { +type fuseFinishHandler struct { + orgHandler eocontext.FinishHandler + cache resources.ICache + fuseHandler *FuseHandler + metrics string +} - httpCtx, err := http_service.Assert(ctx) - if err != nil { - return err +func newFuseFinishHandler(orgHandler eocontext.FinishHandler, cache resources.ICache, fuseHandler *FuseHandler, metrics string) *fuseFinishHandler { + return &fuseFinishHandler{ + orgHandler: orgHandler, + cache: cache, + fuseHandler: fuseHandler, + metrics: metrics, } +} - a.lock.RLock() - handlers := a.handlers - a.lock.RUnlock() +func (f *fuseFinishHandler) Finish(eoCtx eocontext.EoContext) error { - for _, handler := range handlers { - //check筛选条件 - if handler.filter.Check(httpCtx) { + defer func() { + if f.orgHandler != nil { + f.orgHandler.Finish(eoCtx) + } + }() + + httpCtx, _ := http_service.Assert(eoCtx) + + fuseTime := f.fuseHandler.rule.fuseTime + + ctx := eoCtx.Context() + statusCode := httpCtx.Response().StatusCode() + + //熔断状态 + status := getFuseStatus(ctx, f.metrics, f.cache) + + switch f.fuseHandler.rule.codeStatusMap[statusCode] { + case codeStatusError: + //记录失败count + + tx := f.cache.Tx() + errCount, _ := tx.IncrBy(ctx, getErrorCountKey(f.metrics), 1, time.Second).Result() + //清除恢复的计数器 + tx.Del(ctx, getSuccessCountKey(f.metrics)) + tx.Exec(ctx) + + if errCount == f.fuseHandler.rule.fuseConditionCount { + + lockerKey := fmt.Sprintf("fuse_locker_%s", f.metrics) + ok, err := f.cache.SetNX(ctx, lockerKey, []byte(fuseStatusObserve), time.Second).Result() + if err != nil { + log.Infof("fuse strategy locker %s to %s fail:%s", status, fuseStatusFusing, err.Error()) + return err + } + if !ok { + return nil + } + + fuseCountKey := getFuseCountKey(f.metrics) + expUnix := int64(0) + + fuseCount, _ := f.cache.IncrBy(ctx, fuseCountKey, 1, time.Hour).Result() + txDone := f.cache.Tx() + if status == fuseStatusHealthy { + fuseCount = 1 + txDone.Set(ctx, fuseCountKey, []byte("1"), time.Hour) + } + + exp := time.Duration(fuseCount) * fuseTime.time + if exp >= fuseTime.maxTime { + exp = fuseTime.maxTime + } + + expUnix = time.Now().Add(exp).UnixNano() + + txDone.Set(ctx, getFuseStatusKey(f.metrics), []byte(strconv.FormatInt(expUnix, 16)), fuseStatusTime) + txDone.Del(ctx, lockerKey) + txDone.Exec(ctx) + } + + case codeStatusSuccess: + if status == fuseStatusObserve { + successCount, _ := f.cache.IncrBy(ctx, getSuccessCountKey(f.metrics), 1, time.Second).Result() + + //恢复正常期 + if successCount == f.fuseHandler.rule.recoverConditionCount { + lockerKey := fmt.Sprintf("fuse_locker_%s", f.metrics) + ok, err := f.cache.SetNX(ctx, lockerKey, []byte(fuseStatusObserve), time.Second).Result() + if err != nil { + log.Infof("fuse strategy locker %s to %s fail:%s", fuseStatusObserve, fuseStatusHealthy, err.Error()) + return err + } + if ok { + + tx := f.cache.Tx() + //删除熔断状态的key就是恢复正常期 + tx.Del(ctx, getFuseStatusKey(f.metrics)) + //删除已记录的熔断次数 + tx.Del(ctx, getFuseCountKey(f.metrics)) + tx.Del(ctx, lockerKey) + tx.Exec(ctx) + } + } - break } } - if next != nil { - return next.DoChain(ctx) - } return nil + } type handlerListSort []*FuseHandler diff --git a/drivers/strategy/fuse-strategy/config.go b/drivers/strategy/fuse-strategy/config.go index ad500f3f..49c0dbb2 100644 --- a/drivers/strategy/fuse-strategy/config.go +++ b/drivers/strategy/fuse-strategy/config.go @@ -10,7 +10,7 @@ type Config struct { Stop bool `json:"stop"` Priority int `json:"priority" label:"优先级" description:"1-999"` Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` - Rule Rule `json:"rule" label:"熔断规则"` + Rule Rule `json:"fuse" label:"熔断规则"` } type Rule struct { diff --git a/drivers/strategy/fuse-strategy/handler.go b/drivers/strategy/fuse-strategy/handler.go index 09c483d5..cff9456d 100644 --- a/drivers/strategy/fuse-strategy/handler.go +++ b/drivers/strategy/fuse-strategy/handler.go @@ -1,138 +1,83 @@ package fuse_strategy import ( - "context" "fmt" - "github.com/coocood/freecache" "github.com/eolinker/apinto/metrics" "github.com/eolinker/apinto/resources" "github.com/eolinker/apinto/strategy" - "github.com/eolinker/eosc/eocontext" - http_service "github.com/eolinker/eosc/eocontext/http-context" - "github.com/eolinker/eosc/log" - "github.com/go-redis/redis/v8" + "golang.org/x/net/context" + "strconv" "time" ) type fuseStatus string const ( - fuseStatusNone fuseStatus = "none" //默认状态 fuseStatusHealthy fuseStatus = "healthy" //健康期间 fuseStatusFusing fuseStatus = "fusing" //熔断期间 fuseStatusObserve fuseStatus = "observe" //观察期 ) +type codeStatus int + +const ( + codeStatusSuccess codeStatus = 1 + codeStatusError codeStatus = 2 +) + type FuseHandler struct { name string filter strategy.IFilter priority int stop bool rule *ruleHandler - status fuseStatus //状态 } -func (f *FuseHandler) Fusing(eoCtx eocontext.EoContext, cache resources.ICache) bool { - httpCtx, _ := http_service.Assert(eoCtx) +func (f *FuseHandler) IsFuse(ctx context.Context, metrics string, cache resources.ICache) bool { + return getFuseStatus(ctx, metrics, cache) == fuseStatusFusing +} - fuseCondition := f.rule.fuseCondition - recoverCondition := f.rule.recoverCondition +//熔断次数的key +func getFuseCountKey(metrics string) string { + return fmt.Sprintf("fuse_count_%s_%d", metrics, time.Now().Unix()) +} - ctx := context.Background() - statusCode := httpCtx.Response().StatusCode() +//失败次数的key +func getErrorCountKey(metrics string) string { + return fmt.Sprintf("fuse_error_count_%s_%d", metrics, time.Now().Unix()) +} - for _, code := range fuseCondition.statusCodes { - if statusCode != code { - continue - } - tx := cache.Tx() - //记录失败count - countKey := f.getFuseCountKey(eoCtx) +func getSuccessCountKey(metrics string) string { + return fmt.Sprintf("fuse_success_count_%s_%d", metrics, time.Now().Unix()) +} +func getFuseStatusKey(metrics string) string { + return fmt.Sprintf("fuse_status_%s", metrics) +} - if f.status == fuseStatusFusing { - //缓存中拿不到数据 表示key过期 也就是熔断期已过 变成观察期 - if _, err := tx.Get(ctx, f.getFuseTimeKey(eoCtx)).Bytes(); err != nil && (err == freecache.ErrNotFound || err == redis.Nil) { - f.status = fuseStatusObserve - _ = tx.Exec(ctx) - return false - } - } +func getFuseStatus(ctx context.Context, metrics string, cache resources.ICache) fuseStatus { - result, err := tx.IncrBy(ctx, countKey, 1, time.Second).Result() - if err != nil { - log.Errorf("FuseHandler Fusing %v", err) - _ = tx.Exec(ctx) - return true - } - - //清除恢复的计数器 - tx.Del(ctx, f.getRecoverCountKey(eoCtx)) - - if result >= fuseCondition.count { - surplus := result % fuseCondition.count - if surplus == 0 { - //熔断持续时间=连续熔断次数*持续时间 - exp := time.Second * time.Duration((result/fuseCondition.count)*f.rule.fuseTime.time) - maxExp := time.Duration(f.rule.fuseTime.maxTime) * time.Second - if exp >= maxExp { - exp = maxExp - } - tx.Set(ctx, f.getFuseTimeKey(eoCtx), []byte(""), exp) - f.status = fuseStatusFusing - } - _ = tx.Exec(ctx) - return true - } - break + key := getFuseStatusKey(metrics) + expUnixStr, err := cache.Get(ctx, key).Result() + if err != nil { //拿不到默认健康期 + return fuseStatusHealthy } - for _, code := range recoverCondition.statusCodes { - if code != statusCode { - continue - } - if f.status == fuseStatusObserve || f.status == fuseStatusFusing { - tx := cache.Tx() - result, err := tx.IncrBy(ctx, f.getRecoverCountKey(eoCtx), 1, time.Second).Result() - if err != nil { - _ = tx.Exec(ctx) - log.Errorf("FuseHandler Fusing %v", err) - return true - } + expUnix, _ := strconv.ParseInt(expUnixStr, 16, 64) - //恢复正常期 - if result == recoverCondition.count { - f.status = fuseStatusHealthy - } - _ = tx.Exec(ctx) - } - break + //过了熔断期是观察期 + if time.Now().UnixNano() > expUnix { + return fuseStatusObserve } - - if f.status == fuseStatusHealthy || f.status == fuseStatusNone || f.status == fuseStatusObserve { - return false - } - - return true -} - -func (f *FuseHandler) getFuseCountKey(label metrics.LabelReader) string { - return fmt.Sprintf("fuse_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second()) -} - -func (f *FuseHandler) getFuseTimeKey(label metrics.LabelReader) string { - return fmt.Sprintf("fuse_time_%s_%s", f.name, f.rule.metric.Metrics(label)) -} - -func (f *FuseHandler) getRecoverCountKey(label metrics.LabelReader) string { - return fmt.Sprintf("fuse_recover_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second()) + return fuseStatusFusing } type ruleHandler struct { - metric metrics.Metrics //熔断维度 - fuseCondition statusConditionConf - fuseTime fuseTimeConf - recoverCondition statusConditionConf - response strategyResponseConf + metric metrics.Metrics //熔断维度 + fuseConditionCount int64 + fuseTime fuseTimeConf + recoverConditionCount int64 + response strategyResponseConf + codeStatusMap map[int]codeStatus } type statusConditionConf struct { @@ -141,8 +86,8 @@ type statusConditionConf struct { } type fuseTimeConf struct { - time int64 - maxTime int64 + time time.Duration + maxTime time.Duration } type strategyResponseConf struct { @@ -171,20 +116,24 @@ func NewFuseHandler(conf *Config) (*FuseHandler, error) { }) } + codeStatusMap := make(map[int]codeStatus) + for _, code := range conf.Rule.RecoverCondition.StatusCodes { + codeStatusMap[code] = codeStatusSuccess + } + + for _, code := range conf.Rule.FuseCondition.StatusCodes { + codeStatusMap[code] = codeStatusError + } rule := &ruleHandler{ - metric: metrics.Parse([]string{conf.Rule.Metric}), - fuseCondition: statusConditionConf{ - statusCodes: conf.Rule.FuseCondition.StatusCodes, - count: conf.Rule.FuseCondition.Count, - }, + metric: metrics.Parse([]string{conf.Rule.Metric}), + fuseConditionCount: conf.Rule.FuseCondition.Count, + fuseTime: fuseTimeConf{ - time: conf.Rule.FuseTime.Time, - maxTime: conf.Rule.FuseTime.MaxTime, - }, - recoverCondition: statusConditionConf{ - statusCodes: conf.Rule.RecoverCondition.StatusCodes, - count: conf.Rule.RecoverCondition.Count, + time: time.Duration(conf.Rule.FuseTime.Time) * time.Second, + maxTime: time.Duration(conf.Rule.FuseTime.MaxTime) * time.Second, }, + recoverConditionCount: conf.Rule.RecoverCondition.Count, + response: strategyResponseConf{ statusCode: conf.Rule.Response.StatusCode, contentType: conf.Rule.Response.ContentType, @@ -192,6 +141,7 @@ func NewFuseHandler(conf *Config) (*FuseHandler, error) { headers: headers, body: conf.Rule.Response.Body, }, + codeStatusMap: codeStatusMap, } return &FuseHandler{ name: conf.Name, diff --git a/drivers/strategy/grey-strategy/config.go b/drivers/strategy/grey-strategy/config.go index 9ce37b58..502240bb 100644 --- a/drivers/strategy/grey-strategy/config.go +++ b/drivers/strategy/grey-strategy/config.go @@ -21,7 +21,7 @@ type Config struct { Stop bool `json:"stop"` Priority int `json:"priority" label:"优先级" description:"1-999"` Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` - Rule *Rule `json:"rule" label:"灰度规则"` + Rule Rule `json:"grey" label:"灰度规则"` } type Rule struct { diff --git a/drivers/strategy/visit-strategy/config.go b/drivers/strategy/visit-strategy/config.go index a79d0020..a697fe04 100644 --- a/drivers/strategy/visit-strategy/config.go +++ b/drivers/strategy/visit-strategy/config.go @@ -16,7 +16,7 @@ type Config struct { Stop bool `json:"stop"` Priority int `json:"priority" label:"优先级" description:"1-999"` Filters strategy.FilterConfig `json:"filters" label:"过滤规则"` - Rule Rule `json:"rule" label:"规则"` + Rule Rule `json:"visit" label:"规则"` } type Rule struct { diff --git a/go.mod b/go.mod index 38d7a9f3..3e93492e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/eolinker/apinto -go 1.17 +go 1.19 require ( github.com/Shopify/sarama v1.32.0 @@ -18,6 +18,7 @@ require ( github.com/satori/go.uuid v1.2.0 github.com/valyala/fasthttp v1.33.0 golang.org/x/crypto v0.0.0-20220214200702-86341886e292 + golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd ) require ( @@ -98,7 +99,6 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.1 // indirect - golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect diff --git a/node/http-context/context.go b/node/http-context/context.go index 8fd5d691..b7e71939 100644 --- a/node/http-context/context.go +++ b/node/http-context/context.go @@ -122,11 +122,10 @@ func (ctx *Context) Response() http_service.IResponse { } func (ctx *Context) SendTo(address string, timeout time.Duration) error { - clone := ctx.proxyRequest.clone() + _, host := readAddress(address) - clone.URI().SetHost(host) - ctx.proxyRequests = append(ctx.proxyRequests, clone) request := ctx.proxyRequest.Request() + ctx.proxyRequests = append(ctx.proxyRequests, newRequestAgent(ctx.proxyRequest, host)) passHost, targethost := ctx.GetUpstreamHostHandler().PassHost() switch passHost { @@ -202,12 +201,7 @@ func (ctx *Context) FastFinish() { ctx.requestReader.Finish() ctx.proxyRequest.Finish() - for _, request := range ctx.proxyRequests { - r, ok := request.(*ProxyRequest) - if ok { - r.Finish() - } - } + return } diff --git a/node/http-context/proxy-agent.go b/node/http-context/proxy-agent.go new file mode 100644 index 00000000..0bb643ec --- /dev/null +++ b/node/http-context/proxy-agent.go @@ -0,0 +1,36 @@ +package http_context + +import http_service "github.com/eolinker/eosc/eocontext/http-context" + +type requestAgent struct { + http_service.IRequest + host string + hostAgent *UrlAgent +} + +func newRequestAgent(IRequest http_service.IRequest, host string) *requestAgent { + return &requestAgent{IRequest: IRequest, host: host} +} +func (a *requestAgent) URI() http_service.IURIWriter { + if a.hostAgent == nil { + a.hostAgent = NewUrlAgent(a.IRequest.URI(), a.host) + } + return a.hostAgent +} + +type UrlAgent struct { + http_service.IURIWriter + host string +} + +func (u *UrlAgent) Host() string { + return u.host +} + +func (u *UrlAgent) SetHost(host string) { + u.host = host +} + +func NewUrlAgent(IURIWriter http_service.IURIWriter, host string) *UrlAgent { + return &UrlAgent{IURIWriter: IURIWriter, host: host} +} diff --git a/resources/cache-local.go b/resources/cache-local.go index 27a5ad4c..41e855db 100644 --- a/resources/cache-local.go +++ b/resources/cache-local.go @@ -2,8 +2,8 @@ package resources import ( "context" - "encoding/binary" "github.com/coocood/freecache" + "strconv" "sync" "time" ) @@ -120,12 +120,15 @@ func (n *cacheLocal) IncrBy(ctx context.Context, key string, incr int64, expirat return NewIntResult(value, nil) } func ToInt(b []byte) int64 { - return int64(binary.LittleEndian.Uint64(b)) + v, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return 0 + } + return v } func ToBytes(v int64) []byte { - var b [8]byte - binary.LittleEndian.PutUint64(b[:], uint64(v)) - return b[:] + + return []byte(strconv.FormatInt(v, 10)) } func (n *cacheLocal) Get(ctx context.Context, key string) StringResult { data, err := n.client.Get([]byte(key)) diff --git a/upstream/balance/balance.go b/upstream/balance/balance.go index 476f6a12..dc6c7b55 100644 --- a/upstream/balance/balance.go +++ b/upstream/balance/balance.go @@ -30,14 +30,14 @@ type IBalanceFactoryRegister interface { //driverRegister 实现了IBalanceFactoryRegister接口 type driverRegister struct { - register eosc.IRegister + register eosc.IRegister[IBalanceFactory] keys []string } //newBalanceFactoryManager 创建负载均衡算法工厂管理器 func newBalanceFactoryManager() IBalanceFactoryRegister { return &driverRegister{ - register: eosc.NewRegister(), + register: eosc.NewRegister[IBalanceFactory](), keys: make([]string, 0, 10), } } @@ -47,10 +47,8 @@ func (dm *driverRegister) GetFactoryByKey(key string) (IBalanceFactory, bool) { o, has := dm.register.Get(key) if has { log.Debug("GetFactoryByKey:", key, ":has") - f, ok := o.(IBalanceFactory) - return f, ok } - return nil, false + return o, has } //RegisterFactoryByKey 注册balance工厂