mirror of
https://github.com/eolinker/apinto
synced 2025-09-26 21:01:19 +08:00
解决filter 临时拼接的问题
This commit is contained in:
@@ -57,7 +57,7 @@ func (p *PluginManager) ConfigType() reflect.Type {
|
||||
return reflect.TypeOf(new(PluginWorkerConfig))
|
||||
}
|
||||
|
||||
func (p *PluginManager) CreateRequest(id string, conf map[string]*plugin.Config) eocontext.IChain {
|
||||
func (p *PluginManager) CreateRequest(id string, conf map[string]*plugin.Config) eocontext.IChainPro {
|
||||
|
||||
return p.createChain(id, conf)
|
||||
}
|
||||
@@ -88,7 +88,7 @@ func (p *PluginManager) Reset(conf interface{}) error {
|
||||
|
||||
continue
|
||||
}
|
||||
v.Filters = p.createFilters(v.conf)
|
||||
v.fs = p.createFilters(v.conf)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -142,7 +142,7 @@ func (p *PluginManager) createChain(id string, conf map[string]*plugin.Config) *
|
||||
obj = NewPluginObj(chain, id, conf)
|
||||
p.pluginObjs.Set(id, obj)
|
||||
} else {
|
||||
obj.(*PluginObj).Filters = chain
|
||||
obj.(*PluginObj).fs = chain
|
||||
}
|
||||
log.Debug("create chain len: ", len(chain))
|
||||
return obj.(*PluginObj)
|
||||
|
@@ -6,20 +6,23 @@ import (
|
||||
)
|
||||
|
||||
type PluginObj struct {
|
||||
eoscContext.Filters
|
||||
fs eoscContext.Filters
|
||||
id string
|
||||
conf map[string]*plugin.Config
|
||||
}
|
||||
|
||||
func NewPluginObj(filters eoscContext.Filters, id string, conf map[string]*plugin.Config) *PluginObj {
|
||||
obj := &PluginObj{Filters: filters, id: id, conf: conf}
|
||||
obj := &PluginObj{fs: filters, id: id, conf: conf}
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
func (p *PluginObj) Chain(ctx eoscContext.EoContext, append ...eoscContext.IFilter) error {
|
||||
return eoscContext.DoChain(ctx, p.fs, append...)
|
||||
}
|
||||
func (p *PluginObj) Destroy() {
|
||||
|
||||
handler := p.Filters
|
||||
handler := p.fs
|
||||
if handler != nil {
|
||||
handler.Destroy()
|
||||
}
|
||||
|
@@ -1,4 +1,4 @@
|
||||
package http_router
|
||||
package router
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@@ -19,6 +19,10 @@ type HttpComplete struct {
|
||||
timeOut time.Duration
|
||||
}
|
||||
|
||||
func NewHttpComplete(retry int, timeOut time.Duration) *HttpComplete {
|
||||
return &HttpComplete{retry: retry, timeOut: timeOut}
|
||||
}
|
||||
|
||||
func (h *HttpComplete) Complete(org eocontext.EoContext) error {
|
||||
|
||||
ctx, err := http_service.Assert(org)
|
||||
@@ -72,3 +76,18 @@ func (h *HttpComplete) Complete(org eocontext.EoContext) error {
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
type httpCompleteCaller struct {
|
||||
}
|
||||
|
||||
func NewHttpCompleteCaller() *httpCompleteCaller {
|
||||
return &httpCompleteCaller{}
|
||||
}
|
||||
|
||||
func (h *httpCompleteCaller) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) (err error) {
|
||||
return ctx.GetComplete().Complete(ctx)
|
||||
}
|
||||
|
||||
func (h *httpCompleteCaller) Destroy() {
|
||||
|
||||
}
|
@@ -1,17 +1,20 @@
|
||||
package http_router
|
||||
|
||||
import (
|
||||
"github.com/eolinker/apinto/drivers/router"
|
||||
"github.com/eolinker/apinto/service"
|
||||
"github.com/eolinker/eosc/eocontext"
|
||||
http_context "github.com/eolinker/eosc/eocontext/http-context"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
var completeCaller = router.NewHttpCompleteCaller()
|
||||
|
||||
type Handler struct {
|
||||
completeHandler HttpComplete
|
||||
completeHandler *router.HttpComplete
|
||||
finisher Finisher
|
||||
service service.IService
|
||||
filters eocontext.IChain
|
||||
filters eocontext.IChainPro
|
||||
disable bool
|
||||
}
|
||||
|
||||
@@ -27,8 +30,8 @@ func (h *Handler) ServeHTTP(ctx eocontext.EoContext) {
|
||||
return
|
||||
}
|
||||
ctx.SetFinish(&h.finisher)
|
||||
ctx.SetCompleteHandler(&h.completeHandler)
|
||||
ctx.SetCompleteHandler(h.completeHandler)
|
||||
ctx.SetApp(h.service)
|
||||
ctx.SetBalance(h.service)
|
||||
h.filters.DoChain(ctx)
|
||||
h.filters.Chain(ctx, completeCaller)
|
||||
}
|
||||
|
@@ -3,6 +3,7 @@ package manager
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"github.com/eolinker/apinto/drivers/router"
|
||||
http_context "github.com/eolinker/apinto/node/http-context"
|
||||
http_router "github.com/eolinker/apinto/router/http-router"
|
||||
"github.com/eolinker/eosc/config"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
|
||||
var _ IManger = (*Manager)(nil)
|
||||
var notFound = new(NotFoundHandler)
|
||||
var completeCaller = router.NewHttpCompleteCaller()
|
||||
|
||||
type IManger interface {
|
||||
Set(id string, port int, hosts []string, method []string, path string, append []AppendRule, router http_router.IRouterHandler) error
|
||||
@@ -28,7 +30,7 @@ type Manager struct {
|
||||
matcher http_router.IMatcher
|
||||
|
||||
routersData IRouterData
|
||||
globalFilters eoscContext.IChain
|
||||
globalFilters eoscContext.IChainPro
|
||||
}
|
||||
|
||||
func (m *Manager) Set(id string, port int, hosts []string, method []string, path string, append []AppendRule, router http_router.IRouterHandler) error {
|
||||
@@ -61,7 +63,7 @@ func (m *Manager) Delete(id string) {
|
||||
var errNoCertificates = errors.New("tls: no certificates configured")
|
||||
|
||||
//NewManager 创建路由管理器
|
||||
func NewManager(tf traffic.ITraffic, listenCfg *config.ListensMsg, globalFilters eoscContext.IChain) *Manager {
|
||||
func NewManager(tf traffic.ITraffic, listenCfg *config.ListensMsg, globalFilters eoscContext.IChainPro) *Manager {
|
||||
log.Debug("new router manager")
|
||||
m := &Manager{
|
||||
globalFilters: globalFilters,
|
||||
@@ -118,7 +120,7 @@ func (m *Manager) FastHandler(port int, ctx *fasthttp.RequestCtx) {
|
||||
if !has {
|
||||
httpContext.SetFinish(notFound)
|
||||
httpContext.SetCompleteHandler(notFound)
|
||||
m.globalFilters.DoChain(httpContext)
|
||||
m.globalFilters.Chain(httpContext, completeCaller)
|
||||
} else {
|
||||
log.Debug("match has:", port)
|
||||
r.ServeHTTP(httpContext)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package http_router
|
||||
|
||||
import (
|
||||
"github.com/eolinker/apinto/drivers/router"
|
||||
"time"
|
||||
|
||||
"github.com/eolinker/apinto/drivers/router/http-router/manager"
|
||||
@@ -49,14 +50,11 @@ func (h *HttpRouter) reset(conf interface{}, workers map[eosc.RequireId]eosc.IWo
|
||||
return eosc.ErrorConfigFieldUnknown
|
||||
}
|
||||
handler := &Handler{
|
||||
completeHandler: HttpComplete{
|
||||
retry: cfg.Retry,
|
||||
timeOut: time.Duration(cfg.TimeOut) * time.Millisecond,
|
||||
},
|
||||
finisher: Finisher{},
|
||||
service: nil,
|
||||
filters: nil,
|
||||
disable: cfg.Disable,
|
||||
completeHandler: router.NewHttpComplete(cfg.Retry, time.Duration(cfg.TimeOut)*time.Millisecond),
|
||||
finisher: Finisher{},
|
||||
service: nil,
|
||||
filters: nil,
|
||||
disable: cfg.Disable,
|
||||
}
|
||||
if !cfg.Disable {
|
||||
|
||||
@@ -68,7 +66,7 @@ func (h *HttpRouter) reset(conf interface{}, workers map[eosc.RequireId]eosc.IWo
|
||||
if cfg.Plugins == nil {
|
||||
cfg.Plugins = map[string]*plugin.Config{}
|
||||
}
|
||||
var plugins eocontext.IChain
|
||||
var plugins eocontext.IChainPro
|
||||
if cfg.Template != "" {
|
||||
templateWorker, has := workers[cfg.Template]
|
||||
if !has || !templateWorker.CheckSkill(template.TemplateSkill) {
|
||||
|
@@ -12,7 +12,7 @@ func init() {
|
||||
|
||||
type ActuatorsHandler interface {
|
||||
Assert(ctx eocontext.EoContext) bool
|
||||
Check(ctx eocontext.EoContext, handlers []*LimitingHandler, queryScalar scalar.Manager, traffics scalar.Manager, name string) error
|
||||
Check(ctx eocontext.EoContext, handlers []*LimitingHandler, queryScalar scalar.Manager, traffics scalar.Manager) error
|
||||
}
|
||||
|
||||
var (
|
||||
|
@@ -9,15 +9,13 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
actuatorSet ActuatorSet
|
||||
actuatorHandler strategy.IStrategyHandler
|
||||
actuatorSet ActuatorSet
|
||||
)
|
||||
|
||||
func init() {
|
||||
actuator := newActuator()
|
||||
actuatorSet = actuator
|
||||
|
||||
strategy.AddStrategyHandler(actuatorHandler)
|
||||
strategy.AddStrategyHandler(actuator)
|
||||
}
|
||||
|
||||
type ActuatorSet interface {
|
||||
@@ -33,6 +31,10 @@ type tActuator struct {
|
||||
traffics scalar.Manager
|
||||
}
|
||||
|
||||
func (a *tActuator) Destroy() {
|
||||
|
||||
}
|
||||
|
||||
func (a *tActuator) Set(id string, limiting *LimitingHandler) {
|
||||
// 调用来源有锁
|
||||
a.all[id] = limiting
|
||||
@@ -63,7 +65,7 @@ func newActuator() *tActuator {
|
||||
return &tActuator{}
|
||||
}
|
||||
|
||||
func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain) error {
|
||||
func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
|
||||
|
||||
a.lock.RLock()
|
||||
handlers := a.handlers
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
)
|
||||
|
||||
type LimitingHandler struct {
|
||||
name string
|
||||
filter strategy.IFilter
|
||||
metrics metrics.Metrics
|
||||
query Threshold
|
||||
@@ -14,6 +15,10 @@ type LimitingHandler struct {
|
||||
stop bool
|
||||
}
|
||||
|
||||
func (l *LimitingHandler) Name() string {
|
||||
return l.name
|
||||
}
|
||||
|
||||
func (l *LimitingHandler) Filter() strategy.IFilter {
|
||||
return l.filter
|
||||
}
|
||||
|
@@ -1,13 +1,19 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"errors"
|
||||
limiting_stragety "github.com/eolinker/apinto/drivers/strategy/limiting-stragety"
|
||||
"github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar"
|
||||
"github.com/eolinker/eosc/eocontext"
|
||||
http_service "github.com/eolinker/eosc/eocontext/http-context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrorLimitingRefuse = errors.New("refuse by limiting strategy")
|
||||
)
|
||||
|
||||
type actuatorHttp struct {
|
||||
}
|
||||
|
||||
@@ -23,7 +29,7 @@ func (hd *actuatorHttp) Assert(ctx eocontext.EoContext) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*limiting_stragety.LimitingHandler, queryScalars scalar.Manager, trafficScalars scalar.Manager, name string) error {
|
||||
func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*limiting_stragety.LimitingHandler, queryScalars scalar.Manager, trafficScalars scalar.Manager) error {
|
||||
httpContext, err := http_service.Assert(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -43,7 +49,9 @@ func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*limiting_stra
|
||||
queryScalar := queryScalars.Get(metricsValue)
|
||||
trafficScalar := trafficScalars.Get(metricsValue)
|
||||
if !queryScalar.Second().CompareAndAdd(h.Query().Second, 1) {
|
||||
|
||||
httpContext.Response().SetStatus(http.StatusForbidden, http.StatusText(http.StatusForbidden))
|
||||
httpContext.Response().SetHeader("strategy", h.Name())
|
||||
return ErrorLimitingRefuse
|
||||
}
|
||||
if !queryScalar.Minute().CompareAndAdd(h.Query().Minute, 1) {
|
||||
|
||||
|
@@ -9,7 +9,7 @@ import (
|
||||
var _ iProxyDatas = (*ProxyDatas)(nil)
|
||||
|
||||
type Proxy struct {
|
||||
eoscContext.IChain
|
||||
eoscContext.IChainPro
|
||||
id string
|
||||
org map[string]*plugin.Config
|
||||
|
||||
@@ -22,11 +22,11 @@ func (p *Proxy) Destroy() {
|
||||
p.parent = nil
|
||||
parent.Del(p.id)
|
||||
}
|
||||
p.IChain.Destroy()
|
||||
p.IChainPro.Destroy()
|
||||
}
|
||||
|
||||
type iProxyDatas interface {
|
||||
Set(id string, plugins map[string]*plugin.Config) eoscContext.IChain
|
||||
Set(id string, plugins map[string]*plugin.Config) eoscContext.IChainPro
|
||||
Del(id string)
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ type ProxyDatas struct {
|
||||
plugins map[string]*plugin.Config
|
||||
}
|
||||
|
||||
func (p *ProxyDatas) Set(id string, conf map[string]*plugin.Config) eoscContext.IChain {
|
||||
func (p *ProxyDatas) Set(id string, conf map[string]*plugin.Config) eoscContext.IChainPro {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
cf := plugin.MergeConfig(conf, p.plugins)
|
||||
@@ -44,14 +44,14 @@ func (p *ProxyDatas) Set(id string, conf map[string]*plugin.Config) eoscContext.
|
||||
v, has := p.datas[id]
|
||||
if !has {
|
||||
v = &Proxy{
|
||||
IChain: filters,
|
||||
id: id,
|
||||
org: conf,
|
||||
parent: p,
|
||||
IChainPro: filters,
|
||||
id: id,
|
||||
org: conf,
|
||||
parent: p,
|
||||
}
|
||||
p.datas[id] = v
|
||||
} else {
|
||||
v.IChain = filters
|
||||
v.IChainPro = filters
|
||||
v.org = conf
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func (p *ProxyDatas) Reset(conf map[string]*plugin.Config) {
|
||||
p.plugins = conf
|
||||
for _, proxy := range p.datas {
|
||||
cf := plugin.MergeConfig(proxy.org, conf)
|
||||
proxy.IChain = pluginManger.CreateRequest(proxy.id, cf)
|
||||
proxy.IChainPro = pluginManger.CreateRequest(proxy.id, cf)
|
||||
}
|
||||
}
|
||||
func (p *ProxyDatas) Destroy() {
|
||||
@@ -80,7 +80,7 @@ func (p *ProxyDatas) Destroy() {
|
||||
p.lock.Unlock()
|
||||
for _, proxy := range data {
|
||||
proxy.parent = nil
|
||||
proxy.IChain.Destroy()
|
||||
proxy.IChainPro.Destroy()
|
||||
}
|
||||
}
|
||||
func NewProxyDatas() *ProxyDatas {
|
||||
|
@@ -54,6 +54,6 @@ func (t *Template) CheckSkill(skill string) bool {
|
||||
return template.CheckSkill(skill)
|
||||
}
|
||||
|
||||
func (t *Template) Create(id string, conf map[string]*plugin.Config) eocontext.IChain {
|
||||
func (t *Template) Create(id string, conf map[string]*plugin.Config) eocontext.IChainPro {
|
||||
return t.proxyDatas.Set(id, conf)
|
||||
}
|
||||
|
@@ -12,7 +12,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
type IPluginManager interface {
|
||||
CreateRequest(id string, conf map[string]*Config) eocontext.IChain
|
||||
CreateRequest(id string, conf map[string]*Config) eocontext.IChainPro
|
||||
GetConfigType(name string) (reflect.Type, bool)
|
||||
}
|
||||
|
||||
|
@@ -1,16 +1,20 @@
|
||||
package strategy
|
||||
|
||||
import eoscContext "github.com/eolinker/eosc/eocontext"
|
||||
import "github.com/eolinker/eosc/eocontext"
|
||||
|
||||
type IStrategyManager interface {
|
||||
AddStrategyHandler(handler IStrategyHandler)
|
||||
AddStrategyHandler(handler eocontext.IFilter)
|
||||
Strategy(ctx eocontext.EoContext, next eocontext.IChain) error
|
||||
}
|
||||
|
||||
func AddStrategyHandler(handler IStrategyHandler) {
|
||||
var (
|
||||
handlers eocontext.Filters
|
||||
)
|
||||
|
||||
func AddStrategyHandler(handler eocontext.IFilter) {
|
||||
handlers = append(handlers, handler)
|
||||
}
|
||||
|
||||
func Strategy(ctx eoscContext.EoContext, next eoscContext.IChain) error {
|
||||
|
||||
return nil
|
||||
func Strategy(ctx eocontext.EoContext, next eocontext.IChain) error {
|
||||
return eocontext.DoChain(ctx, handlers, eocontext.ToFilter(next))
|
||||
}
|
||||
|
@@ -1,2 +0,0 @@
|
||||
package strategy
|
||||
|
@@ -12,7 +12,7 @@ const (
|
||||
|
||||
type ITemplate interface {
|
||||
eosc.IWorker
|
||||
Create(id string, conf map[string]*plugin.Config) eoscContext.IChain
|
||||
Create(id string, conf map[string]*plugin.Config) eoscContext.IChainPro
|
||||
}
|
||||
|
||||
func CheckSkill(skill string) bool {
|
||||
|
Reference in New Issue
Block a user