限流策略实现

This commit is contained in:
黄孟柱
2022-09-15 17:29:12 +08:00
parent 426746a686
commit 268b569675
10 changed files with 434 additions and 25 deletions

View File

@@ -0,0 +1,29 @@
package limiting_stragety
import (
"github.com/eolinker/apinto/drivers/strategy/limiting-stragety/http"
"github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar"
"github.com/eolinker/eosc/eocontext"
)
func init() {
registerActuator(http.NewActuator())
}
type ActuatorsHandler interface {
Assert(ctx eocontext.EoContext) bool
Check(ctx eocontext.EoContext, handlers []*LimitingHandler, queryScalar scalar.Manager, traffics scalar.Manager, name string) error
}
var (
actuatorsHandlers []ActuatorsHandler
)
func registerActuator(handler ActuatorsHandler) {
actuatorsHandlers = append(actuatorsHandlers, handler)
}
func getActuatorsHandlers() []ActuatorsHandler {
return actuatorsHandlers
}

View File

@@ -1,8 +1,11 @@
package limiting_stragety
import (
"github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar"
"github.com/eolinker/apinto/strategy"
"github.com/eolinker/eosc/eocontext"
"sort"
"sync"
)
var (
@@ -18,26 +21,81 @@ func init() {
}
type ActuatorSet interface {
Set(id string, limiting *Limiting)
Set(id string, limiting *LimitingHandler)
Del(id string)
}
type tActuator struct {
lock sync.RWMutex
all map[string]*LimitingHandler
handlers []*LimitingHandler
queryScalar scalar.Manager
traffics scalar.Manager
}
func (a *tActuator) Set(id string, limiting *Limiting) {
func (a *tActuator) Set(id string, limiting *LimitingHandler) {
// 调用来源有锁
a.all[id] = limiting
a.rebuild()
}
func (a *tActuator) Del(id string) {
// 调用来源有锁
delete(a.all, id)
a.rebuild()
}
func (a *tActuator) rebuild() {
handlers := make([]*LimitingHandler, 0, len(a.all))
for _, h := range a.all {
if !h.stop {
handlers = append(handlers, h)
}
}
sort.Sort(handlerListSort(handlers))
a.lock.Lock()
defer a.lock.Unlock()
a.handlers = handlers
}
func newActuator() *tActuator {
return &tActuator{}
}
func (a *tActuator) Strategy(ctx eocontext.EoContext, next eocontext.IChain) error {
//TODO implement me
panic("implement me")
a.lock.RLock()
handlers := a.handlers
a.lock.RUnlock()
acs := getActuatorsHandlers()
for _, ach := range acs {
if ach.Assert(ctx) {
err := ach.Check(ctx, handlers, a.queryScalar, a.traffics)
if err != nil {
return err
}
break
}
}
if next != nil {
return next.DoChain(ctx)
}
return nil
}
type handlerListSort []*LimitingHandler
func (hs handlerListSort) Len() int {
return len(hs)
}
func (hs handlerListSort) Less(i, j int) bool {
return hs[i].priority < hs[j].priority
}
func (hs handlerListSort) Swap(i, j int) {
hs[i], hs[j] = hs[j], hs[i]
}

View File

@@ -3,22 +3,25 @@ package limiting_stragety
import "github.com/eolinker/apinto/strategy"
type Threshold struct {
Second int64 `json:"second" label:"每秒限制"`
Minute int64 `json:"minute" label:"每分钟限制"`
Hour int64 `json:"hour" label:"每小时限制"`
Second uint64 `json:"second" label:"每秒限制"`
Minute uint64 `json:"minute" label:"每分钟限制"`
Hour uint64 `json:"hour" label:"每小时限制"`
}
type Rule struct {
Metrics []string `json:"metrics" label:"限流计数器名"`
Query Threshold `json:"query" label:"请求限制" description:"按请求次数"`
Traffic Threshold `json:"traffic" label:"流量限制" description:"按请求内容大小"`
}
type Config struct {
Profession string `json:"profession" skip:"skip"`
Name string `json:"name" skip:"skip"`
Driver string `json:"driver" skip:"skip"`
Description string `json:"description" skip:"skip"`
Priority int `json:"priority" label:"优先级" description:"1-999"`
Filters strategy.FilterConfig `json:"filters" label:"过滤规则"`
Rule Rule `json:"limiting" label:"限流规则" description:"限流规则"`
type ConfigCore struct {
Stop bool `json:"stop" `
Priority int `json:"priority" label:"优先级" description:"1-999"`
Filters strategy.FilterConfig `json:"filters" label:"过滤规则"`
Rule Rule `json:"limiting" label:"限流规则" description:"限流规则"`
}
type Config struct {
Profession string `json:"profession" skip:"skip"`
Name string `json:"name" skip:"skip"`
Driver string `json:"driver" skip:"skip"`
Description string `json:"description" skip:"skip"`
ConfigCore
}

View File

@@ -42,7 +42,7 @@ func (d *driver) Create(id, name string, v interface{}, workers map[eosc.Require
lg := &Limiting{
id: id,
name: Name,
name: name,
}
controller.Store(id)
return lg, nil

View File

@@ -0,0 +1,57 @@
package limiting_stragety
import (
"github.com/eolinker/apinto/metrics"
"github.com/eolinker/apinto/strategy"
)
type LimitingHandler struct {
filter strategy.IFilter
metrics metrics.Metrics
query Threshold
traffic Threshold
priority int
stop bool
}
func (l *LimitingHandler) Filter() strategy.IFilter {
return l.filter
}
func (l *LimitingHandler) Metrics() metrics.Metrics {
return l.metrics
}
func (l *LimitingHandler) Query() Threshold {
return l.query
}
func (l *LimitingHandler) Traffic() Threshold {
return l.traffic
}
func (l *LimitingHandler) Priority() int {
return l.priority
}
func (l *LimitingHandler) Stop() bool {
return l.stop
}
func NewLimitingHandler(conf *ConfigCore) (*LimitingHandler, error) {
filter, err := strategy.ParseFilter(conf.Filters)
if err != nil {
return nil, err
}
mts := metrics.Parse(conf.Rule.Metrics)
return &LimitingHandler{
filter: filter,
metrics: mts,
stop: conf.Stop,
query: conf.Rule.Query,
traffic: conf.Rule.Traffic,
priority: conf.Priority,
}, nil
}

View File

@@ -0,0 +1,82 @@
package http
import (
limiting_stragety "github.com/eolinker/apinto/drivers/strategy/limiting-stragety"
"github.com/eolinker/apinto/drivers/strategy/limiting-stragety/scalar"
"github.com/eolinker/eosc/eocontext"
http_service "github.com/eolinker/eosc/eocontext/http-context"
"strconv"
)
type actuatorHttp struct {
}
func NewActuator() *actuatorHttp {
return &actuatorHttp{}
}
func (hd *actuatorHttp) Assert(ctx eocontext.EoContext) bool {
_, err := http_service.Assert(ctx)
if err != nil {
return false
}
return true
}
func (hd *actuatorHttp) Check(ctx eocontext.EoContext, handlers []*limiting_stragety.LimitingHandler, queryScalars scalar.Manager, trafficScalars scalar.Manager, name string) error {
httpContext, err := http_service.Assert(ctx)
if err != nil {
return err
}
length, _ := strconv.ParseUint(httpContext.Request().Header().GetHeader("content-length"), 10, 64)
metricsAlready := newSet(len(handlers))
for _, h := range handlers {
if h.Filter().Check(ctx) {
key := h.Metrics().Key()
if metricsAlready.Has(key) {
continue
}
metricsAlready.Add(key)
metricsValue := h.Metrics().Metrics(ctx)
queryScalar := queryScalars.Get(metricsValue)
trafficScalar := trafficScalars.Get(metricsValue)
if !queryScalar.Second().CompareAndAdd(h.Query().Second, 1) {
}
if !queryScalar.Minute().CompareAndAdd(h.Query().Minute, 1) {
}
if !queryScalar.Hour().CompareAndAdd(h.Query().Hour, 1) {
}
if !trafficScalar.Second().CompareAndAdd(h.Traffic().Second, length) {
}
if !trafficScalar.Minute().CompareAndAdd(h.Traffic().Minute, length) {
}
if !trafficScalar.Hour().CompareAndAdd(h.Traffic().Hour, length) {
}
}
}
return nil
}
type Set map[string]struct{}
func newSet(l int) Set {
return make(Set, l)
}
func (s Set) Has(key string) bool {
_, has := s[key]
return has
}
func (s Set) Add(key string) {
s[key] = struct{}{}
}

View File

@@ -1,8 +1,9 @@
package limiting_stragety
import (
"github.com/eolinker/apinto/strategy"
"fmt"
"github.com/eolinker/eosc"
"reflect"
)
var (
@@ -11,9 +12,11 @@ var (
)
type Limiting struct {
id string
name string
filter strategy.IFilter
id string
name string
handler *LimitingHandler
config *ConfigCore
isRunning int
}
func (l *Limiting) Destroy() error {
@@ -26,16 +29,44 @@ func (l *Limiting) Id() string {
}
func (l *Limiting) Start() error {
actuatorSet.Set(l.id, l)
if l.isRunning == 0 {
l.isRunning = 1
actuatorSet.Set(l.id, l.handler)
}
return nil
}
func (l *Limiting) Reset(conf interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
func (l *Limiting) Reset(v interface{}, workers map[eosc.RequireId]eosc.IWorker) error {
conf, ok := v.(*Config)
if !ok {
return eosc.ErrorConfigIsNil
}
if conf.Priority > 999 || conf.Priority < 1 {
return fmt.Errorf("priority value %d not allow ", conf.Priority)
}
confCore := &conf.ConfigCore
if reflect.DeepEqual(l.config, confCore) {
return nil
}
handler, err := NewLimitingHandler(confCore)
if err != nil {
return err
}
l.config = confCore
l.handler = handler
if l.isRunning != 0 {
actuatorSet.Set(l.id, l.handler)
}
return nil
}
func (l *Limiting) Stop() error {
actuatorSet.Del(l.id)
if l.isRunning != 0 {
l.isRunning = 0
actuatorSet.Del(l.id)
}
return nil
}

View File

@@ -0,0 +1,64 @@
package scalar
import "sync"
type Manager interface {
Get(key string) Scalar
}
type Scalar interface {
Second() Vectors
Minute() Vectors
Hour() Vectors
}
type _Scalar struct {
second Vectors
minute Vectors
hour Vectors
}
func (s *_Scalar) Second() Vectors {
return s.second
}
func (s _Scalar) Minute() Vectors {
return s.minute
}
func (s _Scalar) Hour() Vectors {
return s.hour
}
type _Manager struct {
lock sync.RWMutex
values map[string]Scalar
}
func (m *_Manager) Get(key string) Scalar {
m.lock.RLock()
scalar, has := m.values[key]
m.lock.RUnlock()
if has {
return scalar
}
m.lock.Lock()
defer m.lock.Unlock()
scalar, has = m.values[key]
if has {
return scalar
}
scalar = &_Scalar{
second: newVectors(1000, 500),
minute: newVectors(60000, 5000),
hour: newVectors(3600000, 360000),
}
m.values[key] = scalar
return scalar
}
func NewManager() *_Manager {
return &_Manager{
values: make(map[string]Scalar),
}
}

View File

@@ -0,0 +1,67 @@
package scalar
import (
"sync/atomic"
"time"
)
type Vectors interface {
CompareAndAdd(threshold, delta uint64) (added bool)
}
type _Vectors struct {
vectors []uint64
size uint64
lastIndex uint64
step uint64
}
//newVectors 统计时长,滑动窗口步进,单位都是 毫秒,
func newVectors(uni, step uint64) *_Vectors {
if uni < 1000 {
uni = 1000
}
if step < 500 {
step = 500
}
size := uni / step
if size > 20 {
size = 20
step = uni / size
}
return &_Vectors{size: size, step: step, vectors: make([]uint64, size)}
}
func (s *_Vectors) CompareAndAdd(threshold, delta uint64) bool {
index := s.refresh()
value := uint64(0)
for i := range s.vectors {
value += atomic.LoadUint64(&s.vectors[i])
}
if 0 == threshold || value < threshold {
atomic.AddUint64(&s.vectors[index%s.size], delta)
return true
}
return false
}
func (s *_Vectors) refresh() uint64 {
seconds := uint64(time.Now().Unix())
index := seconds / s.step
last := atomic.SwapUint64(&s.lastIndex, index)
if index > last {
if index-last > s.step {
last = index - s.step - 1
}
for i := index; i > last; i-- {
atomic.StoreUint64(&s.vectors[i%s.size], 0)
}
}
return index
}

View File

@@ -8,10 +8,12 @@ type LabelReader interface {
GetLabel(name string) string
}
type metricsReader interface {
key() string
reader(labels LabelReader) string
}
type Metrics interface {
Metrics(ctx LabelReader) string
Key() string
}
func Parse(metrics []string) Metrics {
@@ -41,18 +43,34 @@ func Parse(metrics []string) Metrics {
type metricsLabelReader string
func (m metricsLabelReader) key() string {
return string(m)
}
func (m metricsLabelReader) reader(labels LabelReader) string {
return labels.GetLabel(string(m))
}
type metricsConst string
func (m metricsConst) key() string {
return string(m)
}
func (m metricsConst) reader(labels LabelReader) string {
return string(m)
}
type metricsList []metricsReader
func (ms metricsList) Key() string {
vs := make([]string, len(ms))
for i, r := range ms {
vs[i] = r.key()
}
return strings.Join(vs, "-")
}
func (ms metricsList) Metrics(ctx LabelReader) string {
vs := make([]string, len(ms))
for i, r := range ms {