From c46bcef1cae0781335c9918a5eb59e3e812e650c Mon Sep 17 00:00:00 2001 From: zhangzeyi Date: Mon, 24 Oct 2022 11:22:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E7=86=94=E6=96=AD=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- drivers/strategy/cache-strategy/actuator.go | 8 +- drivers/strategy/fuse-strategy/actuator.go | 107 ++++++++++++++--- drivers/strategy/fuse-strategy/handler.go | 122 +++++--------------- 3 files changed, 124 insertions(+), 113 deletions(-) diff --git a/drivers/strategy/cache-strategy/actuator.go b/drivers/strategy/cache-strategy/actuator.go index cfaabd55..18e16e79 100644 --- a/drivers/strategy/cache-strategy/actuator.go +++ b/drivers/strategy/cache-strategy/actuator.go @@ -109,15 +109,15 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, iCa return nil } -type CacheGetCompleteHandler struct { +type CacheCompleteHandler struct { orgHandler eocontext.CompleteHandler validTime int uri string cache resources.ICache } -func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheGetCompleteHandler { - return &CacheGetCompleteHandler{ +func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheCompleteHandler { + return &CacheCompleteHandler{ orgHandler: orgHandler, validTime: validTime, uri: uri, @@ -125,7 +125,7 @@ func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime } } -func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error { +func (c *CacheCompleteHandler) Complete(ctx eocontext.EoContext) error { if c.orgHandler != nil { if err := c.orgHandler.Complete(ctx); err != nil { diff --git a/drivers/strategy/fuse-strategy/actuator.go b/drivers/strategy/fuse-strategy/actuator.go index 63da7934..f1deaa93 100644 --- a/drivers/strategy/fuse-strategy/actuator.go +++ b/drivers/strategy/fuse-strategy/actuator.go @@ -7,6 +7,7 @@ import ( http_service "github.com/eolinker/eosc/eocontext/http-context" "sort" "sync" + "time" ) var ( @@ -76,12 +77,13 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac handlers := a.handlers a.lock.RUnlock() + var fuseHandler *FuseHandler for _, handler := range handlers { //check筛选条件 if !handler.filter.Check(httpCtx) { continue } - if handler.Fusing(ctx, cache) { + if handler.IsFuse(ctx, cache) { res := handler.rule.response httpCtx.Response().SetStatus(res.statusCode, "") for _, h := range res.headers { @@ -90,38 +92,107 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac httpCtx.Response().SetHeader("Content-Type", fmt.Sprintf("%s; charset=%s", res.contentType, res.charset)) httpCtx.Response().SetBody([]byte(res.body)) return nil + } else { + fuseHandler = handler + break } + } if next != nil { - return next.DoChain(ctx) + if err = next.DoChain(ctx); err != nil { + return err + } } + + if fuseHandler != nil { + ctx.SetFinish(newFuseFinishHandler(ctx.GetFinish(), cache, fuseHandler)) + } + return nil } -func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error { +type fuseFinishHandler struct { + orgHandler eocontext.FinishHandler + cache resources.ICache + fuseHandler *FuseHandler +} - httpCtx, err := http_service.Assert(ctx) - if err != nil { - return err - } +func newFuseFinishHandler(orgHandler eocontext.FinishHandler, cache resources.ICache, fuseHandler *FuseHandler) *fuseFinishHandler { + return &fuseFinishHandler{orgHandler: orgHandler, cache: cache, fuseHandler: fuseHandler} +} - a.lock.RLock() - handlers := a.handlers - a.lock.RUnlock() - - for _, handler := range handlers { - //check筛选条件 - if handler.filter.Check(httpCtx) { - - break +func (f *fuseFinishHandler) Finish(eoCtx eocontext.EoContext) error { + if f.orgHandler != nil { + if err := f.orgHandler.Finish(eoCtx); err != nil { + return err } } - if next != nil { - return next.DoChain(ctx) + httpCtx, _ := http_service.Assert(eoCtx) + + fuseCondition := f.fuseHandler.rule.fuseCondition + recoverCondition := f.fuseHandler.rule.recoverCondition + fuseTime := f.fuseHandler.rule.fuseTime + + ctx := eoCtx.Context() + statusCode := httpCtx.Response().StatusCode() + + //熔断状态 + status := f.fuseHandler.getFuseStatus(eoCtx, f.cache) + + for _, code := range fuseCondition.statusCodes { + if statusCode != code { + continue + } + + //记录失败count + countKey := f.fuseHandler.getFuseCountKey(eoCtx) + + errCount, _ := f.cache.IncrBy(ctx, countKey, 1, time.Second).Result() + + //清除恢复的计数器 + f.cache.Del(ctx, f.fuseHandler.getRecoverCountKey(eoCtx)) + + if errCount >= fuseCondition.count { + surplus := errCount % fuseCondition.count + if surplus == 0 { + //熔断持续时间=连续熔断次数*持续时间 + exp := time.Second * time.Duration((errCount/fuseCondition.count)*fuseTime.time) + maxExp := time.Duration(fuseTime.maxTime) * time.Second + if exp >= maxExp { + exp = maxExp + } + + f.fuseHandler.setFuseStatus(eoCtx, f.cache, fuseStatusFusing, exp+time.Minute) + + //因为观察期是熔断时间结束后的一秒内才算观察期,所以多设置个key用来做观察期状态的判断 判断逻辑在getFuseStatus中 + f.cache.Set(ctx, f.fuseHandler.getFuseTimeKey(eoCtx), []byte(fuseStatusFusing), exp) + + } + } + break } + + for _, code := range recoverCondition.statusCodes { + if code != statusCode { + continue + } + if status == fuseStatusObserve { + successCount, _ := f.cache.IncrBy(ctx, f.fuseHandler.getRecoverCountKey(eoCtx), 1, time.Second).Result() + + //恢复正常期 + if successCount == recoverCondition.count { + exp := time.Duration(fuseTime.maxTime) * time.Second + f.fuseHandler.setFuseStatus(eoCtx, f.cache, fuseStatusHealthy, exp+time.Minute) + } + + } + break + } + return nil + } type handlerListSort []*FuseHandler diff --git a/drivers/strategy/fuse-strategy/handler.go b/drivers/strategy/fuse-strategy/handler.go index 09c483d5..6571ed0d 100644 --- a/drivers/strategy/fuse-strategy/handler.go +++ b/drivers/strategy/fuse-strategy/handler.go @@ -1,15 +1,12 @@ package fuse_strategy import ( - "context" "fmt" "github.com/coocood/freecache" "github.com/eolinker/apinto/metrics" "github.com/eolinker/apinto/resources" "github.com/eolinker/apinto/strategy" "github.com/eolinker/eosc/eocontext" - http_service "github.com/eolinker/eosc/eocontext/http-context" - "github.com/eolinker/eosc/log" "github.com/go-redis/redis/v8" "time" ) @@ -17,7 +14,6 @@ import ( type fuseStatus string const ( - fuseStatusNone fuseStatus = "none" //默认状态 fuseStatusHealthy fuseStatus = "healthy" //健康期间 fuseStatusFusing fuseStatus = "fusing" //熔断期间 fuseStatusObserve fuseStatus = "observe" //观察期 @@ -32,99 +28,43 @@ type FuseHandler struct { status fuseStatus //状态 } -func (f *FuseHandler) Fusing(eoCtx eocontext.EoContext, cache resources.ICache) bool { - httpCtx, _ := http_service.Assert(eoCtx) - - fuseCondition := f.rule.fuseCondition - recoverCondition := f.rule.recoverCondition - - ctx := context.Background() - statusCode := httpCtx.Response().StatusCode() - - for _, code := range fuseCondition.statusCodes { - if statusCode != code { - continue - } - tx := cache.Tx() - //记录失败count - countKey := f.getFuseCountKey(eoCtx) - - if f.status == fuseStatusFusing { - //缓存中拿不到数据 表示key过期 也就是熔断期已过 变成观察期 - if _, err := tx.Get(ctx, f.getFuseTimeKey(eoCtx)).Bytes(); err != nil && (err == freecache.ErrNotFound || err == redis.Nil) { - f.status = fuseStatusObserve - _ = tx.Exec(ctx) - return false - } - } - - result, err := tx.IncrBy(ctx, countKey, 1, time.Second).Result() - if err != nil { - log.Errorf("FuseHandler Fusing %v", err) - _ = tx.Exec(ctx) - return true - } - - //清除恢复的计数器 - tx.Del(ctx, f.getRecoverCountKey(eoCtx)) - - if result >= fuseCondition.count { - surplus := result % fuseCondition.count - if surplus == 0 { - //熔断持续时间=连续熔断次数*持续时间 - exp := time.Second * time.Duration((result/fuseCondition.count)*f.rule.fuseTime.time) - maxExp := time.Duration(f.rule.fuseTime.maxTime) * time.Second - if exp >= maxExp { - exp = maxExp - } - tx.Set(ctx, f.getFuseTimeKey(eoCtx), []byte(""), exp) - f.status = fuseStatusFusing - } - _ = tx.Exec(ctx) - return true - } - break - } - - for _, code := range recoverCondition.statusCodes { - if code != statusCode { - continue - } - if f.status == fuseStatusObserve || f.status == fuseStatusFusing { - tx := cache.Tx() - result, err := tx.IncrBy(ctx, f.getRecoverCountKey(eoCtx), 1, time.Second).Result() - if err != nil { - _ = tx.Exec(ctx) - log.Errorf("FuseHandler Fusing %v", err) - return true - } - - //恢复正常期 - if result == recoverCondition.count { - f.status = fuseStatusHealthy - } - _ = tx.Exec(ctx) - } - break - } - - if f.status == fuseStatusHealthy || f.status == fuseStatusNone || f.status == fuseStatusObserve { - return false - } - - return true +func (f *FuseHandler) IsFuse(eoCtx eocontext.EoContext, cache resources.ICache) bool { + return f.getFuseStatus(eoCtx, cache) == fuseStatusFusing } -func (f *FuseHandler) getFuseCountKey(label metrics.LabelReader) string { - return fmt.Sprintf("fuse_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second()) +func (f *FuseHandler) getFuseCountKey(eoCtx eocontext.EoContext) string { + return fmt.Sprintf("fuse_%s_%d", f.rule.metric.Metrics(eoCtx), time.Now().Second()) } -func (f *FuseHandler) getFuseTimeKey(label metrics.LabelReader) string { - return fmt.Sprintf("fuse_time_%s_%s", f.name, f.rule.metric.Metrics(label)) +func (f *FuseHandler) getRecoverCountKey(eoCtx eocontext.EoContext) string { + return fmt.Sprintf("fuse_recover_%s_%d", f.rule.metric.Metrics(eoCtx), time.Now().Second()) } -func (f *FuseHandler) getRecoverCountKey(label metrics.LabelReader) string { - return fmt.Sprintf("fuse_recover_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second()) +func (f *FuseHandler) getFuseTimeKey(eoCtx eocontext.EoContext) string { + return fmt.Sprintf("fuse_time_%s", f.rule.metric.Metrics(eoCtx)) +} + +func (f *FuseHandler) getFuseStatus(eoCtx eocontext.EoContext, cache resources.ICache) fuseStatus { + + ctx := eoCtx.Context() + + key := fmt.Sprintf("fuse_status_%s", f.rule.metric.Metrics(eoCtx)) + status, err := cache.Get(ctx, key).Result() + if err != nil { //拿不到默认健康期 + return fuseStatusHealthy + } + + _, err = cache.Get(ctx, f.getFuseTimeKey(eoCtx)).Result() + if err != nil && (err == freecache.ErrNotFound || err == redis.Nil) && fuseStatus(status) == fuseStatusFusing { //记录的状态如果是熔断期,此时熔断时间又过期了,则返回观察期 + return fuseStatusObserve + } + + return fuseStatus(status) +} + +func (f *FuseHandler) setFuseStatus(eoCtx eocontext.EoContext, cache resources.ICache, status fuseStatus, expiration time.Duration) { + key := fmt.Sprintf("fuse_status_%s", f.rule.metric.Metrics(eoCtx)) + cache.Set(eoCtx.Context(), key, []byte(status), expiration) } type ruleHandler struct {