feat: add infra kit

This commit is contained in:
quanlong
2021-04-30 19:22:15 +08:00
parent 6b6b3d8c21
commit 5ae21eda0a
10 changed files with 697 additions and 0 deletions

1
pkg/breaker/README.md Normal file
View File

@@ -0,0 +1 @@
# 熔断器

73
pkg/breaker/breaker.go Normal file
View File

@@ -0,0 +1,73 @@
package breaker
import (
"sync"
"time"
xtime "github.com/1024casts/snake/pkg/time"
)
// Breaker is a CircuitBreaker pattern.
// FIXME on int32 atomic.LoadInt32(&b.on) == _switchOn
type Breaker interface {
Allow() error
MarkSuccess()
MarkFailed()
}
const (
// StateOpen when circuit breaker open, request not allowed, after sleep
// some duration, allow one single request for testing the health, if ok
// then state reset to closed, if not continue the step.
StateOpen int32 = iota
// StateClosed when circuit breaker closed, request allowed, the breaker
// calc the succeed ratio, if request num greater request setting and
// ratio lower than the setting ratio, then reset state to open.
StateClosed
// StateHalfopen when circuit breaker open, after slepp some duration, allow
// one request, but not state closed.
StateHalfopen
//_switchOn int32 = iota
// _switchOff
)
var (
_mu sync.RWMutex
_conf = &Config{
Window: xtime.Duration(3 * time.Second),
Bucket: 10,
Request: 100,
// Percentage of failures must be lower than 33.33%
K: 1.5,
// Pattern: "",
}
_group = NewGroup(_conf)
)
// Init init global breaker config, also can reload config after first time call.
func Init(conf *Config) {
if conf == nil {
return
}
_mu.Lock()
_conf = conf
_mu.Unlock()
}
// newBreaker new a breaker.
func newBreaker(c *Config) (b Breaker) {
// factory
return newSRE(c)
}
// Go runs your function while tracking the breaker state of default group.
func Go(name string, run, fallback func() error) error {
breaker := _group.Get(name)
if err := breaker.Allow(); err != nil {
return fallback()
}
return run()
}

View File

@@ -0,0 +1,94 @@
package breaker
import (
"errors"
"testing"
"time"
xtime "github.com/1024casts/snake/pkg/time"
)
func TestGroup(t *testing.T) {
g1 := NewGroup(nil)
g2 := NewGroup(_conf)
if g1.conf != g2.conf {
t.FailNow()
}
brk := g2.Get("key")
brk1 := g2.Get("key1")
if brk == brk1 {
t.FailNow()
}
brk2 := g2.Get("key")
if brk != brk2 {
t.FailNow()
}
g := NewGroup(_conf)
c := &Config{
Window: xtime.Duration(1 * time.Second),
Bucket: 10,
Request: 100,
SwitchOff: !_conf.SwitchOff,
}
g.Reload(c)
if g.conf.SwitchOff == _conf.SwitchOff {
t.FailNow()
}
}
func TestInit(t *testing.T) {
switchOff := _conf.SwitchOff
c := &Config{
Window: xtime.Duration(3 * time.Second),
Bucket: 10,
Request: 100,
SwitchOff: !switchOff,
}
Init(c)
if _conf.SwitchOff == switchOff {
t.FailNow()
}
}
func TestGo(t *testing.T) {
if err := Go("test_run", func() error {
t.Log("breaker allow,callback run()")
return nil
}, func() error {
t.Log("breaker not allow,callback fallback()")
return errors.New("breaker not allow")
}); err != nil {
t.Error(err)
}
_group.Reload(&Config{
Window: xtime.Duration(3 * time.Second),
Bucket: 10,
Request: 100,
SwitchOff: true,
})
if err := Go("test_fallback", func() error {
t.Log("breaker allow,callback run()")
return nil
}, func() error {
t.Log("breaker not allow,callback fallback()")
return nil
}); err != nil {
t.Error(err)
}
}
func markSuccess(b Breaker, count int) {
for i := 0; i < count; i++ {
b.MarkSuccess()
}
}
func markFailed(b Breaker, count int) {
for i := 0; i < count; i++ {
b.MarkFailed()
}
}

34
pkg/breaker/config.go Normal file
View File

@@ -0,0 +1,34 @@
package breaker
import (
"time"
xtime "github.com/1024casts/snake/pkg/time"
)
// Config broker config.
type Config struct {
SwitchOff bool // breaker switch,default off.
// Google
K float64
Window xtime.Duration
Bucket int
Request int64
}
func (conf *Config) fix() {
if conf.K == 0 {
conf.K = 1.5
}
if conf.Request == 0 {
conf.Request = 100
}
if conf.Bucket == 0 {
conf.Bucket = 10
}
if conf.Window == 0 {
conf.Window = xtime.Duration(3 * time.Second)
}
}

View File

@@ -0,0 +1,60 @@
package breaker
import (
"fmt"
"time"
"github.com/1024casts/snake/pkg/breaker"
xtime "github.com/1024casts/snake/pkg/time"
)
// ExampleGroup show group usage.
func ExampleGroup() {
c := &breaker.Config{
Window: xtime.Duration(3 * time.Second),
K: 1.5,
Bucket: 10,
Request: 100,
}
// init default config
breaker.Init(c)
// new group
g := breaker.NewGroup(c)
// reload group config
c.Bucket = 100
c.Request = 200
g.Reload(c)
// get breaker by key
g.Get("key")
}
// ExampleBreaker show breaker usage.
func ExampleBreaker() {
// new group,use default breaker config
g := breaker.NewGroup(nil)
brk := g.Get("key")
// mark request success
brk.MarkSuccess()
// mark request failed
brk.MarkFailed()
// check if breaker allow or not
if brk.Allow() == nil {
fmt.Println("breaker allow")
} else {
fmt.Println("breaker not allow")
}
}
// ExampleGo this example create a default group and show function callback
// according to the state of breaker.
func ExampleGo() {
run := func() error {
return nil
}
fallback := func() error {
return fmt.Errorf("unknown error")
}
if err := breaker.Go("example_go", run, fallback); err != nil {
fmt.Println(err)
}
}

67
pkg/breaker/group.go Normal file
View File

@@ -0,0 +1,67 @@
package breaker
import "sync"
// Group represents a class of CircuitBreaker and forms a namespace in which
// units of CircuitBreaker.
type Group struct {
mu sync.RWMutex
brks map[string]Breaker
conf *Config
}
// NewGroup new a breaker group container, if conf nil use default conf.
func NewGroup(conf *Config) *Group {
if conf == nil {
_mu.RLock()
conf = _conf
_mu.RUnlock()
} else {
conf.fix()
}
return &Group{
conf: conf,
brks: make(map[string]Breaker),
}
}
// Get get a breaker by a specified key, if breaker not exists then make a new one.
func (g *Group) Get(key string) Breaker {
g.mu.RLock()
brk, ok := g.brks[key]
conf := g.conf
g.mu.RUnlock()
if ok {
return brk
}
// NOTE here may new multi breaker for rarely case, let gc drop it.
brk = newBreaker(conf)
g.mu.Lock()
if _, ok = g.brks[key]; !ok {
g.brks[key] = brk
}
g.mu.Unlock()
return brk
}
// Reload reload the group by specified config, this may let all inner breaker
// reset to a new one.
func (g *Group) Reload(conf *Config) {
if conf == nil {
return
}
conf.fix()
g.mu.Lock()
g.conf = conf
g.brks = make(map[string]Breaker, len(g.brks))
g.mu.Unlock()
}
// Go runs your function while tracking the breaker state of group.
func (g *Group) Go(name string, run, fallback func() error) error {
breaker := g.Get(name)
if err := breaker.Allow(); err != nil {
return fallback()
}
return run()
}

100
pkg/breaker/sre_breaker.go Normal file
View File

@@ -0,0 +1,100 @@
package breaker
import (
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/1024casts/snake/pkg/errno"
"github.com/1024casts/snake/pkg/log"
"github.com/1024casts/snake/pkg/stat/metric"
)
// sreBreaker is a sre CircuitBreaker pattern.
type sreBreaker struct {
stat metric.RollingCounter
r *rand.Rand
// rand.New(...) returns a non thread safe object
randLock sync.Mutex
k float64
request int64
state int32
}
func newSRE(c *Config) Breaker {
counterOpts := metric.RollingCounterOpts{
Size: c.Bucket,
BucketDuration: time.Duration(int64(c.Window) / int64(c.Bucket)),
}
stat := metric.NewRollingCounter(counterOpts)
return &sreBreaker{
stat: stat,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: c.Request,
k: c.K,
state: StateClosed,
}
}
func (b *sreBreaker) summary() (success int64, total int64) {
b.stat.Reduce(func(iterator metric.Iterator) float64 {
for iterator.Next() {
bucket := iterator.Bucket()
total += bucket.Count
for _, p := range bucket.Points {
success += int64(p)
}
}
return 0
})
return
}
func (b *sreBreaker) Allow() error {
success, total := b.summary()
k := b.k * float64(success)
log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success)
// check overflow requests = K * success
if total < b.request || float64(total) < k {
if atomic.LoadInt32(&b.state) == StateOpen {
atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
}
return nil
}
if atomic.LoadInt32(&b.state) == StateClosed {
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
}
dr := math.Max(0, (float64(total)-k)/float64(total+1))
drop := b.trueOnProba(dr)
log.Info("breaker: drop ratio: %f, drop: %t", dr, drop)
if drop {
return errno.ErrServiceUnavailable
}
return nil
}
func (b *sreBreaker) MarkSuccess() {
b.stat.Add(1)
}
func (b *sreBreaker) MarkFailed() {
// NOTE: when client reject requets locally, continue add counter let the
// drop ratio higher.
b.stat.Add(0)
}
func (b *sreBreaker) trueOnProba(proba float64) (truth bool) {
b.randLock.Lock()
truth = b.r.Float64() < proba
b.randLock.Unlock()
return
}

View File

@@ -0,0 +1,177 @@
package breaker
import (
"math"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/1024casts/snake/pkg/stat/metric"
xtime "github.com/1024casts/snake/pkg/time"
)
func getSRE() Breaker {
return NewGroup(&Config{
Window: xtime.Duration(1 * time.Second),
Bucket: 10,
Request: 100,
K: 2,
}).Get("")
}
func getSREBreaker() *sreBreaker {
counterOpts := metric.RollingCounterOpts{
Size: 10,
BucketDuration: time.Millisecond * 100,
}
stat := metric.NewRollingCounter(counterOpts)
return &sreBreaker{
stat: stat,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: 100,
k: 2,
state: StateClosed,
}
}
func markSuccessWithDuration(b Breaker, count int, sleep time.Duration) {
for i := 0; i < count; i++ {
b.MarkSuccess()
time.Sleep(sleep)
}
}
func markFailedWithDuration(b Breaker, count int, sleep time.Duration) {
for i := 0; i < count; i++ {
b.MarkFailed()
time.Sleep(sleep)
}
}
func testSREClose(t *testing.T, b Breaker) {
markSuccess(b, 80)
assert.Equal(t, b.Allow(), nil)
markSuccess(b, 120)
assert.Equal(t, b.Allow(), nil)
}
func testSREOpen(t *testing.T, b Breaker) {
markSuccess(b, 100)
assert.Equal(t, b.Allow(), nil)
markFailed(b, 10000000)
assert.NotEqual(t, b.Allow(), nil)
}
func testSREHalfOpen(t *testing.T, b Breaker) {
// failback
assert.Equal(t, b.Allow(), nil)
t.Run("allow single failed", func(t *testing.T) {
markFailed(b, 10000000)
assert.NotEqual(t, b.Allow(), nil)
})
time.Sleep(2 * time.Second)
t.Run("allow single succeed", func(t *testing.T) {
assert.Equal(t, b.Allow(), nil)
markSuccess(b, 10000000)
assert.Equal(t, b.Allow(), nil)
})
}
func TestSRE(t *testing.T) {
b := getSRE()
testSREClose(t, b)
b = getSRE()
testSREOpen(t, b)
b = getSRE()
testSREHalfOpen(t, b)
}
func TestSRESelfProtection(t *testing.T) {
t.Run("total request < 100", func(t *testing.T) {
b := getSRE()
markFailed(b, 99)
assert.Equal(t, b.Allow(), nil)
})
t.Run("total request > 100, total < 2 * success", func(t *testing.T) {
b := getSRE()
size := rand.Intn(10000000)
succ := int(math.Ceil(float64(size))) + 1
markSuccess(b, succ)
markFailed(b, size-succ)
assert.Equal(t, b.Allow(), nil)
})
}
func TestSRESummary(t *testing.T) {
var (
b *sreBreaker
succ, total int64
)
sleep := 50 * time.Millisecond
t.Run("succ == total", func(t *testing.T) {
b = getSREBreaker()
markSuccessWithDuration(b, 10, sleep)
succ, total = b.summary()
assert.Equal(t, succ, int64(10))
assert.Equal(t, total, int64(10))
})
t.Run("fail == total", func(t *testing.T) {
b = getSREBreaker()
markFailedWithDuration(b, 10, sleep)
succ, total = b.summary()
assert.Equal(t, succ, int64(0))
assert.Equal(t, total, int64(10))
})
t.Run("succ = 1/2 * total, fail = 1/2 * total", func(t *testing.T) {
b = getSREBreaker()
markFailedWithDuration(b, 5, sleep)
markSuccessWithDuration(b, 5, sleep)
succ, total = b.summary()
assert.Equal(t, succ, int64(5))
assert.Equal(t, total, int64(10))
})
t.Run("auto reset rolling counter", func(t *testing.T) {
time.Sleep(time.Second)
succ, total = b.summary()
assert.Equal(t, succ, int64(0))
assert.Equal(t, total, int64(0))
})
}
func TestTrueOnProba(t *testing.T) {
const proba = math.Pi / 10
const total = 100000
const epsilon = 0.05
var count int
b := getSREBreaker()
for i := 0; i < total; i++ {
if b.trueOnProba(proba) {
count++
}
}
ratio := float64(count) / float64(total)
assert.InEpsilon(t, proba, ratio, epsilon)
}
func BenchmarkSreBreakerAllow(b *testing.B) {
breaker := getSRE()
b.ResetTimer()
for i := 0; i <= b.N; i++ {
breaker.Allow()
if i%2 == 0 {
breaker.MarkSuccess()
} else {
breaker.MarkFailed()
}
}
}

31
pkg/time/time.go Normal file
View File

@@ -0,0 +1,31 @@
package time
import (
"context"
xtime "time"
)
// Duration be used toml unmarshal string time, like 1s, 500ms.
type Duration xtime.Duration
// UnmarshalText unmarshal text to duration.
func (d *Duration) UnmarshalText(text []byte) error {
tmp, err := xtime.ParseDuration(string(text))
if err == nil {
*d = Duration(tmp)
}
return err
}
// Shrink will decrease the duration by comparing with context's timeout duration
// and return new timeout\context\CancelFunc.
func (d Duration) Shrink(c context.Context) (Duration, context.Context, context.CancelFunc) {
if deadline, ok := c.Deadline(); ok {
if ctimeout := xtime.Until(deadline); ctimeout < xtime.Duration(d) {
// deliver small timeout
return Duration(ctimeout), c, func() {}
}
}
ctx, cancel := context.WithTimeout(c, xtime.Duration(d))
return d, ctx, cancel
}

60
pkg/time/time_test.go Normal file
View File

@@ -0,0 +1,60 @@
package time
import (
"context"
"testing"
"time"
)
func TestShrink(t *testing.T) {
var d Duration
err := d.UnmarshalText([]byte("1s"))
if err != nil {
t.Fatalf("TestShrink: d.UnmarshalText failed!err:=%v", err)
}
c := context.Background()
to, ctx, cancel := d.Shrink(c)
defer cancel()
if time.Duration(to) != time.Second {
t.Fatalf("new timeout must be equal 1 second")
}
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > time.Second || time.Until(deadline) < time.Millisecond*500 {
t.Fatalf("ctx deadline must be less than 1s and greater than 500ms")
}
}
func TestShrinkWithTimeout(t *testing.T) {
var d Duration
err := d.UnmarshalText([]byte("1s"))
if err != nil {
t.Fatalf("TestShrink: d.UnmarshalText failed!err:=%v", err)
}
c, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
to, ctx, cancel := d.Shrink(c)
defer cancel()
if time.Duration(to) != time.Second {
t.Fatalf("new timeout must be equal 1 second")
}
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > time.Second || time.Until(deadline) < time.Millisecond*500 {
t.Fatalf("ctx deadline must be less than 1s and greater than 500ms")
}
}
func TestShrinkWithDeadline(t *testing.T) {
var d Duration
err := d.UnmarshalText([]byte("1s"))
if err != nil {
t.Fatalf("TestShrink: d.UnmarshalText failed!err:=%v", err)
}
c, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
to, ctx, cancel := d.Shrink(c)
defer cancel()
if time.Duration(to) >= time.Millisecond*500 {
t.Fatalf("new timeout must be less than 500 ms")
}
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > time.Millisecond*500 || time.Until(deadline) < time.Millisecond*200 {
t.Fatalf("ctx deadline must be less than 500ms and greater than 200ms")
}
}