Merge remote-tracking branch 'gitlab/develop' into feature/websocket

This commit is contained in:
Liujian
2022-10-27 11:11:37 +08:00
30 changed files with 424 additions and 357 deletions

View File

@@ -24,7 +24,6 @@ func init() {
type IApp interface {
Id() string
Name() string
Labels() map[string]string
Disable() bool
IAppExecutor

View File

@@ -37,7 +37,7 @@ type IAuthFactoryRegister interface {
//driverRegister 驱动注册器
type driverRegister struct {
register eosc.IRegister
register eosc.IRegister[IAuthFactory]
keys []string
driverAlias map[string]string
render map[string]interface{}
@@ -83,7 +83,7 @@ func (dm *driverRegister) ReadOnly() bool {
//newAuthFactoryManager 创建auth工厂管理器
func newAuthFactoryManager() *driverRegister {
return &driverRegister{
register: eosc.NewRegister(),
register: eosc.NewRegister[IAuthFactory](),
keys: make([]string, 0, 10),
driverAlias: make(map[string]string),
render: map[string]interface{}{},
@@ -92,12 +92,7 @@ func newAuthFactoryManager() *driverRegister {
//GetFactoryByKey 获取指定auth工厂
func (dm *driverRegister) GetFactoryByKey(key string) (IAuthFactory, bool) {
o, has := dm.register.Get(key)
if has {
f, ok := o.(IAuthFactory)
return f, ok
}
return nil, false
return dm.register.Get(key)
}
//RegisterFactoryByKey 注册auth工厂

View File

@@ -41,8 +41,8 @@ type IUserManager interface {
type UserManager struct {
// users map[string]IUser
users eosc.IUntyped
connApp eosc.IUntyped
users eosc.Untyped[string, *UserInfo]
connApp eosc.Untyped[string, []string]
}
func (u *UserManager) Check(appID string, driver string, users []IUser) error {
@@ -67,16 +67,12 @@ func (u *UserManager) Count() int {
}
func (u *UserManager) List() []*UserInfo {
users := u.users.List()
us := make([]*UserInfo, 0, len(users))
for _, user := range users {
us = append(us, user.(*UserInfo))
}
return us
return u.users.List()
}
func NewUserManager() *UserManager {
return &UserManager{users: eosc.NewUntyped(), connApp: eosc.NewUntyped()}
return &UserManager{users: eosc.BuildUntyped[string, *UserInfo](), connApp: eosc.BuildUntyped[string, []string]()}
}
func (u *UserManager) Get(name string) (*UserInfo, bool) {
@@ -84,21 +80,16 @@ func (u *UserManager) Get(name string) (*UserInfo, bool) {
}
func (u *UserManager) get(name string) (*UserInfo, bool) {
user, has := u.users.Get(name)
if !has {
return nil, false
}
return user.(*UserInfo), true
return u.users.Get(name)
}
func (u *UserManager) Set(appID string, users []*UserInfo) {
userMap := make(map[string]bool)
userMap := make(map[string]struct{})
names, has := u.getByAppID(appID)
if has {
for _, name := range names {
userMap[name] = true
userMap[name] = struct{}{}
}
}
@@ -130,19 +121,11 @@ func (u *UserManager) DelByAppID(appID string) {
}
func (u *UserManager) delByAppID(appID string) ([]string, bool) {
names, has := u.connApp.Del(appID)
if !has {
return nil, false
}
return names.([]string), true
return u.connApp.Del(appID)
}
func (u *UserManager) getByAppID(appID string) ([]string, bool) {
names, has := u.connApp.Get(appID)
if !has {
return nil, false
}
return names.([]string), true
return u.connApp.Get(appID)
}
type Auth struct {

View File

@@ -1,23 +1,25 @@
# 名称apinto镜像携带了部署k8s集群所需要的脚本
# 创建时间2022-3-30
FROM centos:latest
FROM centos:7.9.2009
MAINTAINER eolink
#声明端口
EXPOSE 9400 8080
EXPOSE 9400 8099
#定义数据卷
VOLUME /var/lib/apinto
#设置环境变量
ENV APP=APINTO
ENV APP=apinto
#解压网关程序压缩包
COPY ./apinto.linux.x64.tar.gz /
RUN tar -zxvf apinto.linux.x64.tar.gz && rm -rf apinto.linux.x64.tar.gz
#修改脚本权限以及复制程序默认配置文件
RUN chmod 777 /apinto/start.sh && chmod 777 /apinto/join.sh && chmod 777 /apinto/leave.sh && cp /apinto/config.yml.tmp /apinto/config.yml
#复制程序默认配置文件以及修改脚本权限
RUN mkdir /etc/apinto
RUN cp /apinto/apinto.yml.tpl /etc/apinto/apinto.yml && cp /apinto/config.yml.tpl /etc/apinto/config.yml
RUN chmod 777 /apinto/start.sh && chmod 777 /apinto/join.sh && chmod 777 /apinto/leave.sh
WORKDIR /apinto

View File

@@ -1,29 +0,0 @@
# 数据文件放置目录
# data_dir: /var/lib/goku
# pid文件放置地址
# pid_dir: /var/run/goku/
# 日志放置目录
# log_dir: /var/log/goku
# socket放置目录
# socket_dir: /tmp/goku
# goku运行配置地址
# config: /etc/goku/config.yml
# 扩展仓库目录
# extends_dir: /var/lib/goku/extenders/
# 错误日志文件名
# error_log_name: error.log
# 错误日志等级
# error_log_level: error
# 错误日志过期时间默认单位为天d|天h|小时
# error_log_expire: 7d
# 错误日志切割周期仅支持day、hour
# error_log_period: day

View File

@@ -0,0 +1,29 @@
# 数据文件放置目录
data_dir: /var/lib/apinto
# pid文件放置地址
pid_dir: /var/run/apinto/
# 日志放置目录
log_dir: /var/log/apinto
# socket放置目录
socket_dir: /tmp/apinto
# apinto运行配置地址
config: /etc/apinto/config.yml
# 扩展仓库目录
extends_dir: /var/lib/apinto/extenders/
# 错误日志文件名
error_log_name: error.log
# 错误日志等级
error_log_level: error
# 错误日志过期时间默认单位为天d|天h|小时
error_log_expire: 7d
# 错误日志切割周期仅支持day、hour
error_log_period: day

View File

@@ -1,7 +0,0 @@
listen: # node listen port
- 8080
admin: # openAPI request info
scheme: http # listen scheme
listen: 9400 # listen port
ip: 0.0.0.0 # listen ip

View File

@@ -0,0 +1,15 @@
listen: # node listen port
- 8099
admin: # openAPI request info
scheme: http # listen scheme
listen: 9400 # listen port
ip: 0.0.0.0 # listen ip
#ssl:
# listen:
# - port: 443 #https端口
# certificate: # 不配表示使用所有 cert_dir中的证书默认pem文件后缀为pemkey后缀为key
# - cert: cert.pem
# key: cert.key
#certificate:
# dir: ./cert # 证书文件目录不填则默认从cert目录下载

View File

@@ -0,0 +1,36 @@
#!/bin/bash
set -e
CURRENT_PATH="$(pwd)"
install() {
mkdir -p /etc/apinto
#将模板配置文件复制到/etc/apinto目录, 若已存在则不覆盖
cp -in ./apinto.yml.tpl /etc/apinto/apinto.yml
cp -in ./config.yml.tpl /etc/apinto/config.yml
#将程序链接至/usr/sbin
ln -snf $CURRENT_PATH/apinto /usr/sbin/apinto
}
upgrade() {
apinto stop
install
apinto start
}
case "$1" in
install)
install
exit 0
;;
upgrade)
upgrade
exit 0
;;
**)
echo "Usage: $0 {install|upgrade} " 1>&2
exit 1
;;
esac

View File

@@ -85,39 +85,11 @@ type INodesData interface {
}
type NodesData struct {
data eosc.IUntyped
eosc.Untyped[string, map[string]INode]
}
func NewNodesData() *NodesData {
return &NodesData{data: eosc.NewUntyped()}
}
func (n *NodesData) Get(name string) (map[string]INode, bool) {
nodes, has := n.data.Get(name)
if !has {
return nil, false
}
ns, ok := nodes.(map[string]INode)
if !ok {
return nil, false
}
return ns, true
}
func (n *NodesData) Set(name string, nodes map[string]INode) {
n.data.Set(name, nodes)
}
func (n *NodesData) Del(name string) (map[string]INode, bool) {
nodes, has := n.data.Del(name)
if !has {
return nil, false
}
ns, ok := nodes.(map[string]INode)
if !ok {
return nil, false
}
return ns, true
func NewNodesData() INodesData {
return &NodesData{Untyped: eosc.BuildUntyped[string, map[string]INode]()}
}
type Nodes map[string]INode

View File

@@ -4,24 +4,22 @@ import (
"github.com/eolinker/eosc"
)
type serviceHandler eosc.Untyped[string, IApp]
type services struct {
apps eosc.IUntyped
appNameOfID eosc.IUntyped
apps eosc.Untyped[string, serviceHandler]
appNameOfID eosc.Untyped[string, string]
}
//NewServices 创建服务发现的服务app集合
func NewServices() IServices {
return &services{apps: eosc.NewUntyped(), appNameOfID: eosc.NewUntyped()}
return &services{apps: eosc.BuildUntyped[string, serviceHandler](), appNameOfID: eosc.BuildUntyped[string, string]()}
}
//get 获取对应服务名的节点列表
func (s *services) get(serviceName string) (eosc.IUntyped, bool) {
func (s *services) get(serviceName string) (serviceHandler, bool) {
v, ok := s.apps.Get(serviceName)
if !ok {
return nil, ok
}
apps, ok := v.(eosc.IUntyped)
return apps, ok
return v, ok
}
//func (s *services) GetStatus(serviceName string) (IApp, bool) {
@@ -43,7 +41,7 @@ func (s *services) Set(serviceName string, id string, app IApp) error {
apps.Set(id, app)
return nil
}
apps := eosc.NewUntyped()
apps := eosc.BuildUntyped[string, IApp]()
apps.Set(id, app)
s.apps.Set(serviceName, apps)
return nil
@@ -51,9 +49,8 @@ func (s *services) Set(serviceName string, id string, app IApp) error {
//Remove 将目标app从其对应服务的app列表中删除传入值为目标app的id
func (s *services) Remove(appID string) (string, int) {
v, has := s.appNameOfID.Del(appID)
name, has := s.appNameOfID.Del(appID)
if has {
name := v.(string)
apps, ok := s.get(name)
if ok {
apps.Del(appID)
@@ -67,11 +64,8 @@ func (s *services) Remove(appID string) (string, int) {
//Update 更新目标服务所有app的节点列表
func (s *services) Update(serviceName string, nodes Nodes) error {
if apps, ok := s.get(serviceName); ok {
for _, r := range apps.List() {
v, ok := r.(IApp)
if ok {
v.Reset(nodes)
}
for _, v := range apps.List() {
v.Reset(nodes)
}
}
return nil

View File

@@ -1,16 +1,17 @@
package app
import (
http_service "github.com/eolinker/eosc/eocontext/http-context"
"errors"
"github.com/eolinker/apinto/application"
"github.com/eolinker/apinto/application/auth"
"github.com/eolinker/eosc"
http_service "github.com/eolinker/eosc/eocontext/http-context"
)
type app struct {
id string
name string
id string
//name string
driverIDs []string
config *Config
executor application.IAppExecutor
@@ -23,10 +24,6 @@ func (a *app) Execute(ctx http_service.IHttpContext) error {
return a.executor.Execute(ctx)
}
func (a *app) Name() string {
return a.name
}
func (a *app) Labels() map[string]string {
if a.config == nil {
return nil
@@ -70,14 +67,20 @@ func (a *app) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) e
}
func (a *app) set(cfg *Config) error {
filters, users, err := createFilters(a.id, cfg.Auth)
if err != nil {
return err
if cfg.Anonymous {
anonymousApp := appManager.AnonymousApp()
if anonymousApp != nil && anonymousApp.Id() != a.id {
return errors.New("anonymous app is already exists")
}
appManager.SetAnonymousApp(a)
} else {
filters, users, err := createFilters(a.id, cfg.Auth)
if err != nil {
return err
}
appManager.Set(a, filters, users)
}
//cfg.Labels["application"] = strings.TrimSuffix(app., "@app")
appManager.Set(a, filters, users)
e := newExecutor()
e.append(newAdditionalParam(cfg.Additional))
a.executor = e

View File

@@ -14,10 +14,11 @@ import (
//Config App驱动配置
type Config struct {
Labels map[string]string `json:"labels" label:"应用标签"`
Anonymous bool `json:"anonymous" label:"匿名" `
Disable bool `json:"disable" label:"是否禁用"`
Additional []*Additional `json:"additional" label:"额外参数"`
Auth []*Auth `json:"auth" label:"鉴权列表" eotype:"interface"`
Auth []*Auth `json:"auth" label:"鉴权列表" eotype:"interface" switch:"anonymous===false"`
Labels map[string]string `json:"labels" label:"应用标签"`
}
type Auth struct {

View File

@@ -51,7 +51,7 @@ func (d *driver) Create(id, name string, v interface{}, workers map[eosc.Require
}
err = a.set(cfg)
return a, nil
return a, err
}
func checkConfig(v interface{}) (*Config, error) {
@@ -59,6 +59,9 @@ func checkConfig(v interface{}) (*Config, error) {
if !ok {
return nil, errorConfigType
}
if conf.Anonymous && len(conf.Auth) > 0 {
return nil, errors.New("it is anonymous app,auths should be empty")
}
for _, a := range conf.Auth {
err := application.CheckPosition(a.Position)
if err != nil {
@@ -71,5 +74,6 @@ func checkConfig(v interface{}) (*Config, error) {
return nil, err
}
}
return conf, nil
}

View File

@@ -19,36 +19,35 @@ type IManager interface {
Set(app application.IApp, filters []application.IAuth, users map[string][]application.ITransformConfig)
Del(appID string)
Count() int
AnonymousApp() application.IApp
SetAnonymousApp(app application.IApp)
}
type Manager struct {
// filters map[string]application.IAuthUser
filters eosc.IUntyped
eosc.Untyped[string, application.IAuth]
appManager *AppManager
driverAlias map[string]string
drivers []string
locker sync.RWMutex
app application.IApp
}
func (m *Manager) Count() int {
return m.filters.Count()
func (m *Manager) AnonymousApp() application.IApp {
m.locker.RLock()
app := m.app
m.locker.RUnlock()
return app
}
func NewManager(driverAlias map[string]string, drivers []string) *Manager {
return &Manager{filters: eosc.NewUntyped(), appManager: NewAppManager(), driverAlias: driverAlias, drivers: drivers}
func (m *Manager) SetAnonymousApp(app application.IApp) {
m.locker.Lock()
m.app = app
m.locker.Unlock()
}
func (m *Manager) Get(id string) (application.IAuth, bool) {
return m.get(id)
}
func (m *Manager) get(id string) (application.IAuth, bool) {
filter, has := m.filters.Get(id)
if !has {
return nil, false
}
f, ok := filter.(application.IAuth)
return f, ok
func NewManager(driverAlias map[string]string, drivers []string) IManager {
return &Manager{Untyped: eosc.BuildUntyped[string, application.IAuth](), appManager: NewAppManager(), driverAlias: driverAlias, drivers: drivers}
}
func (m *Manager) List() []application.IAuthUser {
@@ -71,7 +70,7 @@ func (m *Manager) getByDriver(driver string) []application.IAuthUser {
ids := m.appManager.GetByDriver(driver)
filters := make([]application.IAuthUser, 0, len(ids))
for _, id := range ids {
filter, has := m.get(id)
filter, has := m.Get(id)
if has {
filters = append(filters, filter)
}
@@ -80,30 +79,18 @@ func (m *Manager) getByDriver(driver string) []application.IAuthUser {
}
func (m *Manager) all() []application.IAuthUser {
keys := m.filters.Keys()
filters := make([]application.IAuthUser, 0, 10*len(keys))
for _, key := range keys {
filter, has := m.filters.Get(key)
if !has {
continue
}
f, ok := filter.(application.IAuthUser)
if !ok {
continue
}
filters = append(filters, f)
list := m.List()
filters := make([]application.IAuthUser, 0, len(list))
for _, filter := range list {
filters = append(filters, filter)
}
return filters
}
func (m *Manager) All() []application.IAuthUser {
return m.all()
}
func (m *Manager) Set(app application.IApp, filters []application.IAuth, users map[string][]application.ITransformConfig) {
idMap := make(map[string][]string)
for _, filter := range filters {
f, has := m.get(filter.ID())
f, has := m.Untyped.Get(filter.ID())
if !has {
f = filter
}
@@ -112,7 +99,7 @@ func (m *Manager) Set(app application.IApp, filters []application.IAuth, users m
continue
}
f.Set(app, v)
m.filters.Set(filter.ID(), filter)
m.Untyped.Set(filter.ID(), filter)
if _, ok := idMap[filter.Driver()]; !ok {
idMap[filter.Driver()] = make([]string, 0, len(filters))
}
@@ -128,11 +115,11 @@ func (m *Manager) Set(app application.IApp, filters []application.IAuth, users m
func (m *Manager) Del(appID string) {
ids := m.appManager.GetByAppID(appID)
for _, id := range ids {
filter, has := m.get(id)
filter, has := m.Untyped.Get(id)
if has {
filter.Del(appID)
if filter.UserCount() == 0 {
m.filters.Del(id)
m.Untyped.Del(id)
}
}
}

View File

@@ -88,9 +88,9 @@ func (c *consul) Stop() error {
func (c *consul) Remove(id string) error {
c.locker.Lock()
defer c.locker.Unlock()
name, count := c.services.Remove(id)
n, count := c.services.Remove(id)
if count == 0 {
c.nodes.Del(name)
c.nodes.Del(n)
}
return nil
}

View File

@@ -25,7 +25,7 @@ type PluginManager struct {
name string
extenderDrivers eosc.IExtenderDrivers
plugins Plugins
pluginObjs eosc.IUntyped
pluginObjs eosc.Untyped[string, *PluginObj]
workers eosc.IWorkers
}
@@ -82,12 +82,7 @@ func (p *PluginManager) Reset(conf interface{}) error {
p.plugins = plugins
list := p.pluginObjs.List()
// 遍历,全量更新
for _, obj := range list {
v, ok := obj.(*PluginObj)
if !ok {
continue
}
for _, v := range list {
v.fs = p.createFilters(v.conf)
}
@@ -142,10 +137,10 @@ func (p *PluginManager) createChain(id string, conf map[string]*plugin.Config) *
obj = NewPluginObj(chain, id, conf)
p.pluginObjs.Set(id, obj)
} else {
obj.(*PluginObj).fs = chain
obj.fs = chain
}
log.Debug("create chain len: ", len(chain))
return obj.(*PluginObj)
return obj
}
func (p *PluginManager) check(conf interface{}) (Plugins, error) {
@@ -185,7 +180,7 @@ func NewPluginManager() *PluginManager {
pm := &PluginManager{
name: "plugin",
plugins: make(Plugins, 0),
pluginObjs: eosc.NewUntyped(),
pluginObjs: eosc.BuildUntyped[string, *PluginObj](),
}
log.Debug("autowired extenderDrivers")
bean.Autowired(&pm.extenderDrivers)

View File

@@ -67,6 +67,11 @@ func (a *App) auth(ctx http_service.IHttpContext) error {
return user.App.Execute(ctx)
}
}
if app := appManager.AnonymousApp(); app != nil && !app.Disable() {
setLabels(ctx, app.Labels())
ctx.SetLabel("application", app.Id())
return app.Execute(ctx)
}
return errors.New("invalid user")
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/eolinker/eosc"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"net/http"
"strconv"
"strings"
)
@@ -30,7 +31,7 @@ func (c *CorsFilter) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (e
}
func (c *CorsFilter) DoHttpFilter(ctx http_service.IHttpContext, next eocontext.IChain) (err error) {
if ctx.Request().Method() == "OPTION" {
if ctx.Request().Method() == http.MethodOptions {
return c.doOption(ctx)
}
err = c.doFilter(ctx)
@@ -125,17 +126,17 @@ func (c *CorsFilter) Start() error {
}
func (c *CorsFilter) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
cfg, err := c.check(conf)
if err != nil {
return err
}
c.option = cfg.genOptionHandler()
c.originChecker = NewChecker(cfg.AllowOrigins, "Access-Control-Allow-Origin")
c.methodChecker = NewChecker(cfg.AllowMethods, "Access-Control-Allow-Methods")
c.headerChecker = NewChecker(cfg.AllowHeaders, "Access-Control-Allow-Headers")
c.exposeChecker = NewChecker(cfg.ExposeHeaders, "Access-Control-Expose-Headers")
c.allowCredentials = cfg.AllowCredentials
return nil
cfg, err := c.check(conf)
if err != nil {
return err
}
c.option = cfg.genOptionHandler()
c.originChecker = NewChecker(cfg.AllowOrigins, "Access-Control-Allow-Origin")
c.methodChecker = NewChecker(cfg.AllowMethods, "Access-Control-Allow-Methods")
c.headerChecker = NewChecker(cfg.AllowHeaders, "Access-Control-Allow-Headers")
c.exposeChecker = NewChecker(cfg.ExposeHeaders, "Access-Control-Expose-Headers")
c.allowCredentials = cfg.AllowCredentials
return nil
}
func (c *CorsFilter) Stop() error {

View File

@@ -109,15 +109,15 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, iCa
return nil
}
type CacheGetCompleteHandler struct {
type CacheCompleteHandler struct {
orgHandler eocontext.CompleteHandler
validTime int
uri string
cache resources.ICache
}
func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheGetCompleteHandler {
return &CacheGetCompleteHandler{
func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheCompleteHandler {
return &CacheCompleteHandler{
orgHandler: orgHandler,
validTime: validTime,
uri: uri,
@@ -125,7 +125,7 @@ func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime
}
}
func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error {
func (c *CacheCompleteHandler) Complete(ctx eocontext.EoContext) error {
if c.orgHandler != nil {
if err := c.orgHandler.Complete(ctx); err != nil {

View File

@@ -5,14 +5,21 @@ import (
"github.com/eolinker/apinto/resources"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
"sort"
"strconv"
"sync"
"time"
)
var (
actuatorSet ActuatorSet
)
const (
fuseStatusTime = time.Minute * 30
)
func init() {
actuator := newtActuator()
actuatorSet = actuator
@@ -81,7 +88,10 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac
if !handler.filter.Check(httpCtx) {
continue
}
if handler.Fusing(ctx, cache) {
metrics := handler.rule.metric.Metrics(ctx)
if handler.IsFuse(ctx.Context(), metrics, cache) {
res := handler.rule.response
httpCtx.Response().SetStatus(res.statusCode, "")
for _, h := range res.headers {
@@ -90,38 +100,129 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac
httpCtx.Response().SetHeader("Content-Type", fmt.Sprintf("%s; charset=%s", res.contentType, res.charset))
httpCtx.Response().SetBody([]byte(res.body))
return nil
} else {
ctx.SetFinish(newFuseFinishHandler(ctx.GetFinish(), cache, handler, metrics))
break
}
}
if next != nil {
return next.DoChain(ctx)
if err = next.DoChain(ctx); err != nil {
return err
}
}
return nil
}
func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
type fuseFinishHandler struct {
orgHandler eocontext.FinishHandler
cache resources.ICache
fuseHandler *FuseHandler
metrics string
}
httpCtx, err := http_service.Assert(ctx)
if err != nil {
return err
func newFuseFinishHandler(orgHandler eocontext.FinishHandler, cache resources.ICache, fuseHandler *FuseHandler, metrics string) *fuseFinishHandler {
return &fuseFinishHandler{
orgHandler: orgHandler,
cache: cache,
fuseHandler: fuseHandler,
metrics: metrics,
}
}
a.lock.RLock()
handlers := a.handlers
a.lock.RUnlock()
func (f *fuseFinishHandler) Finish(eoCtx eocontext.EoContext) error {
for _, handler := range handlers {
//check筛选条件
if handler.filter.Check(httpCtx) {
defer func() {
if f.orgHandler != nil {
f.orgHandler.Finish(eoCtx)
}
}()
httpCtx, _ := http_service.Assert(eoCtx)
fuseTime := f.fuseHandler.rule.fuseTime
ctx := eoCtx.Context()
statusCode := httpCtx.Response().StatusCode()
//熔断状态
status := getFuseStatus(ctx, f.metrics, f.cache)
switch f.fuseHandler.rule.codeStatusMap[statusCode] {
case codeStatusError:
//记录失败count
tx := f.cache.Tx()
errCount, _ := tx.IncrBy(ctx, getErrorCountKey(f.metrics), 1, time.Second).Result()
//清除恢复的计数器
tx.Del(ctx, getSuccessCountKey(f.metrics))
tx.Exec(ctx)
if errCount == f.fuseHandler.rule.fuseConditionCount {
lockerKey := fmt.Sprintf("fuse_locker_%s", f.metrics)
ok, err := f.cache.SetNX(ctx, lockerKey, []byte(fuseStatusObserve), time.Second).Result()
if err != nil {
log.Infof("fuse strategy locker %s to %s fail:%s", status, fuseStatusFusing, err.Error())
return err
}
if !ok {
return nil
}
fuseCountKey := getFuseCountKey(f.metrics)
expUnix := int64(0)
fuseCount, _ := f.cache.IncrBy(ctx, fuseCountKey, 1, time.Hour).Result()
txDone := f.cache.Tx()
if status == fuseStatusHealthy {
fuseCount = 1
txDone.Set(ctx, fuseCountKey, []byte("1"), time.Hour)
}
exp := time.Duration(fuseCount) * fuseTime.time
if exp >= fuseTime.maxTime {
exp = fuseTime.maxTime
}
expUnix = time.Now().Add(exp).UnixNano()
txDone.Set(ctx, getFuseStatusKey(f.metrics), []byte(strconv.FormatInt(expUnix, 16)), fuseStatusTime)
txDone.Del(ctx, lockerKey)
txDone.Exec(ctx)
}
case codeStatusSuccess:
if status == fuseStatusObserve {
successCount, _ := f.cache.IncrBy(ctx, getSuccessCountKey(f.metrics), 1, time.Second).Result()
//恢复正常期
if successCount == f.fuseHandler.rule.recoverConditionCount {
lockerKey := fmt.Sprintf("fuse_locker_%s", f.metrics)
ok, err := f.cache.SetNX(ctx, lockerKey, []byte(fuseStatusObserve), time.Second).Result()
if err != nil {
log.Infof("fuse strategy locker %s to %s fail:%s", fuseStatusObserve, fuseStatusHealthy, err.Error())
return err
}
if ok {
tx := f.cache.Tx()
//删除熔断状态的key就是恢复正常期
tx.Del(ctx, getFuseStatusKey(f.metrics))
//删除已记录的熔断次数
tx.Del(ctx, getFuseCountKey(f.metrics))
tx.Del(ctx, lockerKey)
tx.Exec(ctx)
}
}
break
}
}
if next != nil {
return next.DoChain(ctx)
}
return nil
}
type handlerListSort []*FuseHandler

View File

@@ -10,7 +10,7 @@ type Config struct {
Stop bool `json:"stop"`
Priority int `json:"priority" label:"优先级" description:"1-999"`
Filters strategy.FilterConfig `json:"filters" label:"过滤规则"`
Rule Rule `json:"rule" label:"熔断规则"`
Rule Rule `json:"fuse" label:"熔断规则"`
}
type Rule struct {

View File

@@ -1,138 +1,83 @@
package fuse_strategy
import (
"context"
"fmt"
"github.com/coocood/freecache"
"github.com/eolinker/apinto/metrics"
"github.com/eolinker/apinto/resources"
"github.com/eolinker/apinto/strategy"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"github.com/eolinker/eosc/log"
"github.com/go-redis/redis/v8"
"golang.org/x/net/context"
"strconv"
"time"
)
type fuseStatus string
const (
fuseStatusNone fuseStatus = "none" //默认状态
fuseStatusHealthy fuseStatus = "healthy" //健康期间
fuseStatusFusing fuseStatus = "fusing" //熔断期间
fuseStatusObserve fuseStatus = "observe" //观察期
)
type codeStatus int
const (
codeStatusSuccess codeStatus = 1
codeStatusError codeStatus = 2
)
type FuseHandler struct {
name string
filter strategy.IFilter
priority int
stop bool
rule *ruleHandler
status fuseStatus //状态
}
func (f *FuseHandler) Fusing(eoCtx eocontext.EoContext, cache resources.ICache) bool {
httpCtx, _ := http_service.Assert(eoCtx)
func (f *FuseHandler) IsFuse(ctx context.Context, metrics string, cache resources.ICache) bool {
return getFuseStatus(ctx, metrics, cache) == fuseStatusFusing
}
fuseCondition := f.rule.fuseCondition
recoverCondition := f.rule.recoverCondition
//熔断次数的key
func getFuseCountKey(metrics string) string {
return fmt.Sprintf("fuse_count_%s_%d", metrics, time.Now().Unix())
}
ctx := context.Background()
statusCode := httpCtx.Response().StatusCode()
//失败次数的key
func getErrorCountKey(metrics string) string {
return fmt.Sprintf("fuse_error_count_%s_%d", metrics, time.Now().Unix())
}
for _, code := range fuseCondition.statusCodes {
if statusCode != code {
continue
}
tx := cache.Tx()
//记录失败count
countKey := f.getFuseCountKey(eoCtx)
func getSuccessCountKey(metrics string) string {
return fmt.Sprintf("fuse_success_count_%s_%d", metrics, time.Now().Unix())
}
func getFuseStatusKey(metrics string) string {
return fmt.Sprintf("fuse_status_%s", metrics)
}
if f.status == fuseStatusFusing {
//缓存中拿不到数据 表示key过期 也就是熔断期已过 变成观察期
if _, err := tx.Get(ctx, f.getFuseTimeKey(eoCtx)).Bytes(); err != nil && (err == freecache.ErrNotFound || err == redis.Nil) {
f.status = fuseStatusObserve
_ = tx.Exec(ctx)
return false
}
}
func getFuseStatus(ctx context.Context, metrics string, cache resources.ICache) fuseStatus {
result, err := tx.IncrBy(ctx, countKey, 1, time.Second).Result()
if err != nil {
log.Errorf("FuseHandler Fusing %v", err)
_ = tx.Exec(ctx)
return true
}
//清除恢复的计数器
tx.Del(ctx, f.getRecoverCountKey(eoCtx))
if result >= fuseCondition.count {
surplus := result % fuseCondition.count
if surplus == 0 {
//熔断持续时间=连续熔断次数*持续时间
exp := time.Second * time.Duration((result/fuseCondition.count)*f.rule.fuseTime.time)
maxExp := time.Duration(f.rule.fuseTime.maxTime) * time.Second
if exp >= maxExp {
exp = maxExp
}
tx.Set(ctx, f.getFuseTimeKey(eoCtx), []byte(""), exp)
f.status = fuseStatusFusing
}
_ = tx.Exec(ctx)
return true
}
break
key := getFuseStatusKey(metrics)
expUnixStr, err := cache.Get(ctx, key).Result()
if err != nil { //拿不到默认健康期
return fuseStatusHealthy
}
for _, code := range recoverCondition.statusCodes {
if code != statusCode {
continue
}
if f.status == fuseStatusObserve || f.status == fuseStatusFusing {
tx := cache.Tx()
result, err := tx.IncrBy(ctx, f.getRecoverCountKey(eoCtx), 1, time.Second).Result()
if err != nil {
_ = tx.Exec(ctx)
log.Errorf("FuseHandler Fusing %v", err)
return true
}
expUnix, _ := strconv.ParseInt(expUnixStr, 16, 64)
//恢复正常
if result == recoverCondition.count {
f.status = fuseStatusHealthy
}
_ = tx.Exec(ctx)
}
break
//过了熔断期是观察
if time.Now().UnixNano() > expUnix {
return fuseStatusObserve
}
if f.status == fuseStatusHealthy || f.status == fuseStatusNone || f.status == fuseStatusObserve {
return false
}
return true
}
func (f *FuseHandler) getFuseCountKey(label metrics.LabelReader) string {
return fmt.Sprintf("fuse_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second())
}
func (f *FuseHandler) getFuseTimeKey(label metrics.LabelReader) string {
return fmt.Sprintf("fuse_time_%s_%s", f.name, f.rule.metric.Metrics(label))
}
func (f *FuseHandler) getRecoverCountKey(label metrics.LabelReader) string {
return fmt.Sprintf("fuse_recover_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second())
return fuseStatusFusing
}
type ruleHandler struct {
metric metrics.Metrics //熔断维度
fuseCondition statusConditionConf
fuseTime fuseTimeConf
recoverCondition statusConditionConf
response strategyResponseConf
metric metrics.Metrics //熔断维度
fuseConditionCount int64
fuseTime fuseTimeConf
recoverConditionCount int64
response strategyResponseConf
codeStatusMap map[int]codeStatus
}
type statusConditionConf struct {
@@ -141,8 +86,8 @@ type statusConditionConf struct {
}
type fuseTimeConf struct {
time int64
maxTime int64
time time.Duration
maxTime time.Duration
}
type strategyResponseConf struct {
@@ -171,20 +116,24 @@ func NewFuseHandler(conf *Config) (*FuseHandler, error) {
})
}
codeStatusMap := make(map[int]codeStatus)
for _, code := range conf.Rule.RecoverCondition.StatusCodes {
codeStatusMap[code] = codeStatusSuccess
}
for _, code := range conf.Rule.FuseCondition.StatusCodes {
codeStatusMap[code] = codeStatusError
}
rule := &ruleHandler{
metric: metrics.Parse([]string{conf.Rule.Metric}),
fuseCondition: statusConditionConf{
statusCodes: conf.Rule.FuseCondition.StatusCodes,
count: conf.Rule.FuseCondition.Count,
},
metric: metrics.Parse([]string{conf.Rule.Metric}),
fuseConditionCount: conf.Rule.FuseCondition.Count,
fuseTime: fuseTimeConf{
time: conf.Rule.FuseTime.Time,
maxTime: conf.Rule.FuseTime.MaxTime,
},
recoverCondition: statusConditionConf{
statusCodes: conf.Rule.RecoverCondition.StatusCodes,
count: conf.Rule.RecoverCondition.Count,
time: time.Duration(conf.Rule.FuseTime.Time) * time.Second,
maxTime: time.Duration(conf.Rule.FuseTime.MaxTime) * time.Second,
},
recoverConditionCount: conf.Rule.RecoverCondition.Count,
response: strategyResponseConf{
statusCode: conf.Rule.Response.StatusCode,
contentType: conf.Rule.Response.ContentType,
@@ -192,6 +141,7 @@ func NewFuseHandler(conf *Config) (*FuseHandler, error) {
headers: headers,
body: conf.Rule.Response.Body,
},
codeStatusMap: codeStatusMap,
}
return &FuseHandler{
name: conf.Name,

View File

@@ -21,7 +21,7 @@ type Config struct {
Stop bool `json:"stop"`
Priority int `json:"priority" label:"优先级" description:"1-999"`
Filters strategy.FilterConfig `json:"filters" label:"过滤规则"`
Rule *Rule `json:"rule" label:"灰度规则"`
Rule Rule `json:"grey" label:"灰度规则"`
}
type Rule struct {

View File

@@ -16,7 +16,7 @@ type Config struct {
Stop bool `json:"stop"`
Priority int `json:"priority" label:"优先级" description:"1-999"`
Filters strategy.FilterConfig `json:"filters" label:"过滤规则"`
Rule Rule `json:"rule" label:"规则"`
Rule Rule `json:"visit" label:"规则"`
}
type Rule struct {

4
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/eolinker/apinto
go 1.17
go 1.19
require (
github.com/Shopify/sarama v1.32.0
@@ -18,6 +18,7 @@ require (
github.com/satori/go.uuid v1.2.0
github.com/valyala/fasthttp v1.33.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd
)
require (
@@ -98,7 +99,6 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect

View File

@@ -122,11 +122,10 @@ func (ctx *Context) Response() http_service.IResponse {
}
func (ctx *Context) SendTo(address string, timeout time.Duration) error {
clone := ctx.proxyRequest.clone()
_, host := readAddress(address)
clone.URI().SetHost(host)
ctx.proxyRequests = append(ctx.proxyRequests, clone)
request := ctx.proxyRequest.Request()
ctx.proxyRequests = append(ctx.proxyRequests, newRequestAgent(ctx.proxyRequest, host))
passHost, targethost := ctx.GetUpstreamHostHandler().PassHost()
switch passHost {
@@ -202,12 +201,7 @@ func (ctx *Context) FastFinish() {
ctx.requestReader.Finish()
ctx.proxyRequest.Finish()
for _, request := range ctx.proxyRequests {
r, ok := request.(*ProxyRequest)
if ok {
r.Finish()
}
}
return
}

View File

@@ -0,0 +1,36 @@
package http_context
import http_service "github.com/eolinker/eosc/eocontext/http-context"
type requestAgent struct {
http_service.IRequest
host string
hostAgent *UrlAgent
}
func newRequestAgent(IRequest http_service.IRequest, host string) *requestAgent {
return &requestAgent{IRequest: IRequest, host: host}
}
func (a *requestAgent) URI() http_service.IURIWriter {
if a.hostAgent == nil {
a.hostAgent = NewUrlAgent(a.IRequest.URI(), a.host)
}
return a.hostAgent
}
type UrlAgent struct {
http_service.IURIWriter
host string
}
func (u *UrlAgent) Host() string {
return u.host
}
func (u *UrlAgent) SetHost(host string) {
u.host = host
}
func NewUrlAgent(IURIWriter http_service.IURIWriter, host string) *UrlAgent {
return &UrlAgent{IURIWriter: IURIWriter, host: host}
}

View File

@@ -2,8 +2,8 @@ package resources
import (
"context"
"encoding/binary"
"github.com/coocood/freecache"
"strconv"
"sync"
"time"
)
@@ -120,12 +120,15 @@ func (n *cacheLocal) IncrBy(ctx context.Context, key string, incr int64, expirat
return NewIntResult(value, nil)
}
func ToInt(b []byte) int64 {
return int64(binary.LittleEndian.Uint64(b))
v, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return 0
}
return v
}
func ToBytes(v int64) []byte {
var b [8]byte
binary.LittleEndian.PutUint64(b[:], uint64(v))
return b[:]
return []byte(strconv.FormatInt(v, 10))
}
func (n *cacheLocal) Get(ctx context.Context, key string) StringResult {
data, err := n.client.Get([]byte(key))

View File

@@ -30,14 +30,14 @@ type IBalanceFactoryRegister interface {
//driverRegister 实现了IBalanceFactoryRegister接口
type driverRegister struct {
register eosc.IRegister
register eosc.IRegister[IBalanceFactory]
keys []string
}
//newBalanceFactoryManager 创建负载均衡算法工厂管理器
func newBalanceFactoryManager() IBalanceFactoryRegister {
return &driverRegister{
register: eosc.NewRegister(),
register: eosc.NewRegister[IBalanceFactory](),
keys: make([]string, 0, 10),
}
}
@@ -47,10 +47,8 @@ func (dm *driverRegister) GetFactoryByKey(key string) (IBalanceFactory, bool) {
o, has := dm.register.Get(key)
if has {
log.Debug("GetFactoryByKey:", key, ":has")
f, ok := o.(IBalanceFactory)
return f, ok
}
return nil, false
return o, has
}
//RegisterFactoryByKey 注册balance工厂