mirror of
https://github.com/eolinker/apinto
synced 2025-10-28 02:51:34 +08:00
完善熔断策略
This commit is contained in:
@@ -109,15 +109,15 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, iCa
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type CacheGetCompleteHandler struct {
|
type CacheCompleteHandler struct {
|
||||||
orgHandler eocontext.CompleteHandler
|
orgHandler eocontext.CompleteHandler
|
||||||
validTime int
|
validTime int
|
||||||
uri string
|
uri string
|
||||||
cache resources.ICache
|
cache resources.ICache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheGetCompleteHandler {
|
func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime int, uri string, cache resources.ICache) *CacheCompleteHandler {
|
||||||
return &CacheGetCompleteHandler{
|
return &CacheCompleteHandler{
|
||||||
orgHandler: orgHandler,
|
orgHandler: orgHandler,
|
||||||
validTime: validTime,
|
validTime: validTime,
|
||||||
uri: uri,
|
uri: uri,
|
||||||
@@ -125,7 +125,7 @@ func NewCacheGetCompleteHandler(orgHandler eocontext.CompleteHandler, validTime
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CacheGetCompleteHandler) Complete(ctx eocontext.EoContext) error {
|
func (c *CacheCompleteHandler) Complete(ctx eocontext.EoContext) error {
|
||||||
|
|
||||||
if c.orgHandler != nil {
|
if c.orgHandler != nil {
|
||||||
if err := c.orgHandler.Complete(ctx); err != nil {
|
if err := c.orgHandler.Complete(ctx); err != nil {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
http_service "github.com/eolinker/eosc/eocontext/http-context"
|
http_service "github.com/eolinker/eosc/eocontext/http-context"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -76,12 +77,13 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac
|
|||||||
handlers := a.handlers
|
handlers := a.handlers
|
||||||
a.lock.RUnlock()
|
a.lock.RUnlock()
|
||||||
|
|
||||||
|
var fuseHandler *FuseHandler
|
||||||
for _, handler := range handlers {
|
for _, handler := range handlers {
|
||||||
//check筛选条件
|
//check筛选条件
|
||||||
if !handler.filter.Check(httpCtx) {
|
if !handler.filter.Check(httpCtx) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if handler.Fusing(ctx, cache) {
|
if handler.IsFuse(ctx, cache) {
|
||||||
res := handler.rule.response
|
res := handler.rule.response
|
||||||
httpCtx.Response().SetStatus(res.statusCode, "")
|
httpCtx.Response().SetStatus(res.statusCode, "")
|
||||||
for _, h := range res.headers {
|
for _, h := range res.headers {
|
||||||
@@ -90,38 +92,107 @@ func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain, cac
|
|||||||
httpCtx.Response().SetHeader("Content-Type", fmt.Sprintf("%s; charset=%s", res.contentType, res.charset))
|
httpCtx.Response().SetHeader("Content-Type", fmt.Sprintf("%s; charset=%s", res.contentType, res.charset))
|
||||||
httpCtx.Response().SetBody([]byte(res.body))
|
httpCtx.Response().SetBody([]byte(res.body))
|
||||||
return nil
|
return nil
|
||||||
|
} else {
|
||||||
|
fuseHandler = handler
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if next != nil {
|
if next != nil {
|
||||||
return next.DoChain(ctx)
|
if err = next.DoChain(ctx); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if fuseHandler != nil {
|
||||||
|
ctx.SetFinish(newFuseFinishHandler(ctx.GetFinish(), cache, fuseHandler))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *tActuator) DoFilter(ctx eocontext.EoContext, next eocontext.IChain) error {
|
type fuseFinishHandler struct {
|
||||||
|
orgHandler eocontext.FinishHandler
|
||||||
|
cache resources.ICache
|
||||||
|
fuseHandler *FuseHandler
|
||||||
|
}
|
||||||
|
|
||||||
httpCtx, err := http_service.Assert(ctx)
|
func newFuseFinishHandler(orgHandler eocontext.FinishHandler, cache resources.ICache, fuseHandler *FuseHandler) *fuseFinishHandler {
|
||||||
if err != nil {
|
return &fuseFinishHandler{orgHandler: orgHandler, cache: cache, fuseHandler: fuseHandler}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fuseFinishHandler) Finish(eoCtx eocontext.EoContext) error {
|
||||||
|
if f.orgHandler != nil {
|
||||||
|
if err := f.orgHandler.Finish(eoCtx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
a.lock.RLock()
|
httpCtx, _ := http_service.Assert(eoCtx)
|
||||||
handlers := a.handlers
|
|
||||||
a.lock.RUnlock()
|
|
||||||
|
|
||||||
for _, handler := range handlers {
|
fuseCondition := f.fuseHandler.rule.fuseCondition
|
||||||
//check筛选条件
|
recoverCondition := f.fuseHandler.rule.recoverCondition
|
||||||
if handler.filter.Check(httpCtx) {
|
fuseTime := f.fuseHandler.rule.fuseTime
|
||||||
|
|
||||||
|
ctx := eoCtx.Context()
|
||||||
|
statusCode := httpCtx.Response().StatusCode()
|
||||||
|
|
||||||
|
//熔断状态
|
||||||
|
status := f.fuseHandler.getFuseStatus(eoCtx, f.cache)
|
||||||
|
|
||||||
|
for _, code := range fuseCondition.statusCodes {
|
||||||
|
if statusCode != code {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
//记录失败count
|
||||||
|
countKey := f.fuseHandler.getFuseCountKey(eoCtx)
|
||||||
|
|
||||||
|
errCount, _ := f.cache.IncrBy(ctx, countKey, 1, time.Second).Result()
|
||||||
|
|
||||||
|
//清除恢复的计数器
|
||||||
|
f.cache.Del(ctx, f.fuseHandler.getRecoverCountKey(eoCtx))
|
||||||
|
|
||||||
|
if errCount >= fuseCondition.count {
|
||||||
|
surplus := errCount % fuseCondition.count
|
||||||
|
if surplus == 0 {
|
||||||
|
//熔断持续时间=连续熔断次数*持续时间
|
||||||
|
exp := time.Second * time.Duration((errCount/fuseCondition.count)*fuseTime.time)
|
||||||
|
maxExp := time.Duration(fuseTime.maxTime) * time.Second
|
||||||
|
if exp >= maxExp {
|
||||||
|
exp = maxExp
|
||||||
|
}
|
||||||
|
|
||||||
|
f.fuseHandler.setFuseStatus(eoCtx, f.cache, fuseStatusFusing, exp+time.Minute)
|
||||||
|
|
||||||
|
//因为观察期是熔断时间结束后的一秒内才算观察期,所以多设置个key用来做观察期状态的判断 判断逻辑在getFuseStatus中
|
||||||
|
f.cache.Set(ctx, f.fuseHandler.getFuseTimeKey(eoCtx), []byte(fuseStatusFusing), exp)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, code := range recoverCondition.statusCodes {
|
||||||
|
if code != statusCode {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if status == fuseStatusObserve {
|
||||||
|
successCount, _ := f.cache.IncrBy(ctx, f.fuseHandler.getRecoverCountKey(eoCtx), 1, time.Second).Result()
|
||||||
|
|
||||||
|
//恢复正常期
|
||||||
|
if successCount == recoverCondition.count {
|
||||||
|
exp := time.Duration(fuseTime.maxTime) * time.Second
|
||||||
|
f.fuseHandler.setFuseStatus(eoCtx, f.cache, fuseStatusHealthy, exp+time.Minute)
|
||||||
}
|
}
|
||||||
|
|
||||||
if next != nil {
|
|
||||||
return next.DoChain(ctx)
|
|
||||||
}
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type handlerListSort []*FuseHandler
|
type handlerListSort []*FuseHandler
|
||||||
|
|||||||
@@ -1,15 +1,12 @@
|
|||||||
package fuse_strategy
|
package fuse_strategy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/coocood/freecache"
|
"github.com/coocood/freecache"
|
||||||
"github.com/eolinker/apinto/metrics"
|
"github.com/eolinker/apinto/metrics"
|
||||||
"github.com/eolinker/apinto/resources"
|
"github.com/eolinker/apinto/resources"
|
||||||
"github.com/eolinker/apinto/strategy"
|
"github.com/eolinker/apinto/strategy"
|
||||||
"github.com/eolinker/eosc/eocontext"
|
"github.com/eolinker/eosc/eocontext"
|
||||||
http_service "github.com/eolinker/eosc/eocontext/http-context"
|
|
||||||
"github.com/eolinker/eosc/log"
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -17,7 +14,6 @@ import (
|
|||||||
type fuseStatus string
|
type fuseStatus string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
fuseStatusNone fuseStatus = "none" //默认状态
|
|
||||||
fuseStatusHealthy fuseStatus = "healthy" //健康期间
|
fuseStatusHealthy fuseStatus = "healthy" //健康期间
|
||||||
fuseStatusFusing fuseStatus = "fusing" //熔断期间
|
fuseStatusFusing fuseStatus = "fusing" //熔断期间
|
||||||
fuseStatusObserve fuseStatus = "observe" //观察期
|
fuseStatusObserve fuseStatus = "observe" //观察期
|
||||||
@@ -32,99 +28,43 @@ type FuseHandler struct {
|
|||||||
status fuseStatus //状态
|
status fuseStatus //状态
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FuseHandler) Fusing(eoCtx eocontext.EoContext, cache resources.ICache) bool {
|
func (f *FuseHandler) IsFuse(eoCtx eocontext.EoContext, cache resources.ICache) bool {
|
||||||
httpCtx, _ := http_service.Assert(eoCtx)
|
return f.getFuseStatus(eoCtx, cache) == fuseStatusFusing
|
||||||
|
|
||||||
fuseCondition := f.rule.fuseCondition
|
|
||||||
recoverCondition := f.rule.recoverCondition
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
statusCode := httpCtx.Response().StatusCode()
|
|
||||||
|
|
||||||
for _, code := range fuseCondition.statusCodes {
|
|
||||||
if statusCode != code {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
tx := cache.Tx()
|
|
||||||
//记录失败count
|
|
||||||
countKey := f.getFuseCountKey(eoCtx)
|
|
||||||
|
|
||||||
if f.status == fuseStatusFusing {
|
|
||||||
//缓存中拿不到数据 表示key过期 也就是熔断期已过 变成观察期
|
|
||||||
if _, err := tx.Get(ctx, f.getFuseTimeKey(eoCtx)).Bytes(); err != nil && (err == freecache.ErrNotFound || err == redis.Nil) {
|
|
||||||
f.status = fuseStatusObserve
|
|
||||||
_ = tx.Exec(ctx)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := tx.IncrBy(ctx, countKey, 1, time.Second).Result()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("FuseHandler Fusing %v", err)
|
|
||||||
_ = tx.Exec(ctx)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
//清除恢复的计数器
|
|
||||||
tx.Del(ctx, f.getRecoverCountKey(eoCtx))
|
|
||||||
|
|
||||||
if result >= fuseCondition.count {
|
|
||||||
surplus := result % fuseCondition.count
|
|
||||||
if surplus == 0 {
|
|
||||||
//熔断持续时间=连续熔断次数*持续时间
|
|
||||||
exp := time.Second * time.Duration((result/fuseCondition.count)*f.rule.fuseTime.time)
|
|
||||||
maxExp := time.Duration(f.rule.fuseTime.maxTime) * time.Second
|
|
||||||
if exp >= maxExp {
|
|
||||||
exp = maxExp
|
|
||||||
}
|
|
||||||
tx.Set(ctx, f.getFuseTimeKey(eoCtx), []byte(""), exp)
|
|
||||||
f.status = fuseStatusFusing
|
|
||||||
}
|
|
||||||
_ = tx.Exec(ctx)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, code := range recoverCondition.statusCodes {
|
|
||||||
if code != statusCode {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if f.status == fuseStatusObserve || f.status == fuseStatusFusing {
|
|
||||||
tx := cache.Tx()
|
|
||||||
result, err := tx.IncrBy(ctx, f.getRecoverCountKey(eoCtx), 1, time.Second).Result()
|
|
||||||
if err != nil {
|
|
||||||
_ = tx.Exec(ctx)
|
|
||||||
log.Errorf("FuseHandler Fusing %v", err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
//恢复正常期
|
|
||||||
if result == recoverCondition.count {
|
|
||||||
f.status = fuseStatusHealthy
|
|
||||||
}
|
|
||||||
_ = tx.Exec(ctx)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.status == fuseStatusHealthy || f.status == fuseStatusNone || f.status == fuseStatusObserve {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FuseHandler) getFuseCountKey(label metrics.LabelReader) string {
|
func (f *FuseHandler) getFuseCountKey(eoCtx eocontext.EoContext) string {
|
||||||
return fmt.Sprintf("fuse_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second())
|
return fmt.Sprintf("fuse_%s_%d", f.rule.metric.Metrics(eoCtx), time.Now().Second())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FuseHandler) getFuseTimeKey(label metrics.LabelReader) string {
|
func (f *FuseHandler) getRecoverCountKey(eoCtx eocontext.EoContext) string {
|
||||||
return fmt.Sprintf("fuse_time_%s_%s", f.name, f.rule.metric.Metrics(label))
|
return fmt.Sprintf("fuse_recover_%s_%d", f.rule.metric.Metrics(eoCtx), time.Now().Second())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FuseHandler) getRecoverCountKey(label metrics.LabelReader) string {
|
func (f *FuseHandler) getFuseTimeKey(eoCtx eocontext.EoContext) string {
|
||||||
return fmt.Sprintf("fuse_recover_%s_%s_%d", f.name, f.rule.metric.Metrics(label), time.Now().Second())
|
return fmt.Sprintf("fuse_time_%s", f.rule.metric.Metrics(eoCtx))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FuseHandler) getFuseStatus(eoCtx eocontext.EoContext, cache resources.ICache) fuseStatus {
|
||||||
|
|
||||||
|
ctx := eoCtx.Context()
|
||||||
|
|
||||||
|
key := fmt.Sprintf("fuse_status_%s", f.rule.metric.Metrics(eoCtx))
|
||||||
|
status, err := cache.Get(ctx, key).Result()
|
||||||
|
if err != nil { //拿不到默认健康期
|
||||||
|
return fuseStatusHealthy
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = cache.Get(ctx, f.getFuseTimeKey(eoCtx)).Result()
|
||||||
|
if err != nil && (err == freecache.ErrNotFound || err == redis.Nil) && fuseStatus(status) == fuseStatusFusing { //记录的状态如果是熔断期,此时熔断时间又过期了,则返回观察期
|
||||||
|
return fuseStatusObserve
|
||||||
|
}
|
||||||
|
|
||||||
|
return fuseStatus(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FuseHandler) setFuseStatus(eoCtx eocontext.EoContext, cache resources.ICache, status fuseStatus, expiration time.Duration) {
|
||||||
|
key := fmt.Sprintf("fuse_status_%s", f.rule.metric.Metrics(eoCtx))
|
||||||
|
cache.Set(eoCtx.Context(), key, []byte(status), expiration)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ruleHandler struct {
|
type ruleHandler struct {
|
||||||
|
|||||||
Reference in New Issue
Block a user