feat: ratelimit

This commit is contained in:
chengjian1997
2022-03-25 10:58:40 +08:00
parent 37e8816dc9
commit 2f8ad94b4f
12 changed files with 1428 additions and 1 deletions

6
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394
github.com/bitly/go-simplejson v0.5.0
github.com/btcsuite/winsvc v1.0.0
github.com/garyburd/redigo v1.6.3
github.com/go-redis/redis/v8 v8.4.11
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0
@@ -16,6 +17,7 @@ require (
github.com/jander/golog v0.0.0-20150917071935-954a5be801fc
github.com/jinzhu/gorm v1.9.12
github.com/jroimartin/gocui v0.4.0
github.com/juju/ratelimit v1.0.1
github.com/kardianos/service v1.0.0
github.com/muesli/cache2go v0.0.0-20200423001931-a100c5aac93f
github.com/nicksnyder/go-i18n/v2 v2.0.3
@@ -25,10 +27,14 @@ require (
github.com/spf13/cobra v1.0.0
github.com/syndtr/goleveldb v1.0.0
github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285
github.com/yudeguang/iox v0.0.0-20180519090448-bffdb29c87c0 // indirect
github.com/yudeguang/ratelimit v0.0.0-20220109125206-af2bdcdaf64a
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
golang.org/x/text v0.3.3
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/grpc v1.29.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/eapache/queue.v1 v1.1.0

16
go.sum
View File

@@ -5,6 +5,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/ant0ine/go-json-rest v3.3.2+incompatible h1:nBixrkLFiDNAW0hauKDLc8yJI6XfrQumWvytE1Hk14E=
github.com/ant0ine/go-json-rest v3.3.2+incompatible/go.mod h1:q6aCt0GfU6LhpBsnZ/2U+mwe+0XB5WStbmwyoPfc+sk=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@@ -52,6 +54,8 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/garyburd/redigo v1.6.3 h1:HCeeRluvAgMusMomi1+6Y5dmFOdYV/JzoRrrbFlkGIc=
github.com/garyburd/redigo v1.6.3/go.mod h1:rTb6epsqigu3kYKBnaF028A7Tf/Aw5s0cqA47doKKqw=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
@@ -116,6 +120,8 @@ github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jroimartin/gocui v0.4.0 h1:52jnalstgmc25FmtGcWqa0tcbMEWS6RpFLsOIO+I+E8=
github.com/jroimartin/gocui v0.4.0/go.mod h1:7i7bbj99OgFHzo7kB2zPb8pXLqMBSQegY7azfqXMkyY=
github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kardianos/service v1.0.0 h1:HgQS3mFfOlyntWX8Oke98JcJLqt1DBcHR4kxShpYef0=
github.com/kardianos/service v1.0.0/go.mod h1:8CzDhVuCuugtsHyZoTvsOBuvonN/UDBvl0kH+BUxvbo=
@@ -211,13 +217,20 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:
github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285 h1:gbdax2ZvHZwe8zxu7by/HMuDUS47iHR2zmEzlgAHBMw=
github.com/xxjwxc/gowp v0.0.0-20200603130651-4d7368b0e285/go.mod h1:yJ/fY5BorWARfDDsxBU/MyQTHc5MVyNcqBQQYD6MN0k=
github.com/xxjwxc/public v0.0.0-20200603115833-341beff27850/go.mod h1:fp3M+FEQrCgWD1fZ/PLwZkCTglf086OEhC9LcydAUnc=
github.com/yudeguang/iox v0.0.0-20180519090448-bffdb29c87c0 h1:EIjQmYpnyudINP5M6Y3hFT/AA9SEaZ6La0MtHRkb0X0=
github.com/yudeguang/iox v0.0.0-20180519090448-bffdb29c87c0/go.mod h1:/yeZ8yPyE9g4jM7Z8LPKwi1L9lDGmLGQ0ywR4rtdNdY=
github.com/yudeguang/ratelimit v0.0.0-20220109125206-af2bdcdaf64a h1:z/xYclBL+mgRK5R8RI/jkUuLFXFYjxWI4aaRLwxi85c=
github.com/yudeguang/ratelimit v0.0.0-20220109125206-af2bdcdaf64a/go.mod h1:NcFk/p88iJxUWYrlDIat7mJLufpsHExnYvxUkApkhJc=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opentelemetry.io/otel v0.16.0 h1:uIWEbdeb4vpKPGITLsRVUS44L5oDbDUCZxn8lkxhmgw=
go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -272,6 +285,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

344
ratelimit/demo0/bucket.go Normal file
View File

@@ -0,0 +1,344 @@
// Copyright 2014 Canonical Ltd.
// Licensed under the LGPLv3 with static-linking exception.
// See LICENCE file for details.
// Package ratelimit provides an efficient token bucket implementation
// that can be used to limit the rate of arbitrary things.
// See http://en.wikipedia.org/wiki/Token_bucket.
package demo0
import (
"math"
"sync"
"time"
)
// The algorithm that this implementation uses does computational work
// only when tokens are removed from the bucket, and that work completes
// in short, bounded-constant time (Bucket.Wait benchmarks at 175ns on
// my laptop).
//
// Time is measured in equal measured ticks, a given interval
// (fillInterval) apart. On each tick a number of tokens (quantum) are
// added to the bucket.
//
// When any of the methods are called the bucket updates the number of
// tokens that are in the bucket, and it records the current tick
// number too. Note that it doesn't record the current time - by
// keeping things in units of whole ticks, it's easy to dish out tokens
// at exactly the right intervals as measured from the start time.
//
// This allows us to calculate the number of tokens that will be
// available at some time in the future with a few simple arithmetic
// operations.
//
// The main reason for being able to transfer multiple tokens on each tick
// is so that we can represent rates greater than 1e9 (the resolution of the Go
// time package) tokens per second, but it's also useful because
// it means we can easily represent situations like "a person gets
// five tokens an hour, replenished on the hour".
// Bucket represents a token bucket that fills at a predetermined rate.
// Methods on Bucket may be called concurrently.
type Bucket struct {
Clock Clock
// startTime holds the moment when the bucket was
// first created and ticks began.
StartTime time.Time
// capacity holds the overall capacity of the bucket.
Capacity int64
// quantum holds how many tokens are added on
// each tick.
Quantum int64
// fillInterval holds the interval between each tick.
FillInterval time.Duration
// mu guards the fields below it.
Mu sync.Mutex
// availableTokens holds the number of available
// tokens as of the associated latestTick.
// It will be negative when there are consumers
// waiting for tokens.
AvailableTokens int64
// latestTick holds the latest tick for which
// we know the number of tokens in the bucket.
LatestTick int64
}
// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}
// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}
// rateMargin specifes the allowed variance of actual
// rate from specified rate. 1% seems reasonable.
const rateMargin = 0.01
// NewBucketWithRate returns a token bucket that fills the bucket
// at the rate of rate tokens per second up to the given
// maximum capacity. Because of limited clock resolution,
// at high rates, the actual rate may be up to 1% different from the
// specified rate.
func NewBucketWithRate(rate float64, capacity int64) *Bucket {
return NewBucketWithRateAndClock(rate, capacity, nil)
}
// NewBucketWithRateAndClock is identical to NewBucketWithRate but injects a
// testable clock interface.
func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket {
// Use the same bucket each time through the loop
// to save allocations.
tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock)
for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
fillInterval := time.Duration(1e9 * float64(quantum) / rate)
if fillInterval <= 0 {
continue
}
tb.FillInterval = fillInterval
tb.Quantum = quantum
if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
return tb
}
}
//panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64))
return tb
}
// nextQuantum returns the next quantum to try after q.
// We grow the quantum exponentially, but slowly, so we
// get a good fit in the lower numbers.
func nextQuantum(q int64) int64 {
q1 := q * 11 / 10
if q1 == q {
q1++
}
return q1
}
// NewBucketWithQuantum is similar to NewBucket, but allows
// the specification of the quantum size - quantum tokens
// are added every fillInterval.
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, nil)
}
// NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but
// also has a clock argument that allows clients to fake the passing
// of time. If clock is nil, the system clock will be used.
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
if fillInterval <= 0 {
//panic("token bucket fill interval is not > 0")
}
if capacity <= 0 {
//panic("token bucket capacity is not > 0")
}
if quantum <= 0 {
//panic("token bucket quantum is not > 0")
}
return &Bucket{
Clock: clock,
StartTime: clock.Now(),
LatestTick: 0,
FillInterval: fillInterval,
Capacity: capacity,
Quantum: quantum,
AvailableTokens: capacity,
}
}
// Wait takes count tokens from the bucket, waiting until they are
// available.
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.Clock.Sleep(d)
}
}
// WaitMaxDuration is like Wait except that it will
// only take tokens from the bucket if it needs to wait
// for no greater than maxWait. It reports whether
// any tokens have been removed from the bucket
// If no tokens have been removed, it returns immediately.
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
d, ok := tb.TakeMaxDuration(count, maxWait)
if d > 0 {
tb.Clock.Sleep(d)
}
return ok
}
const infinityDuration time.Duration = 0x7fffffffffffffff
// Take takes count tokens from the bucket without blocking. It returns
// the time that the caller should wait until the tokens are actually
// available.
//
// Note that if the request is irrevocable - there is no way to return
// tokens to the bucket once this method commits us to taking them.
func (tb *Bucket) Take(count int64) time.Duration {
tb.Mu.Lock()
defer tb.Mu.Unlock()
d, _ := tb.take(tb.Clock.Now(), count, infinityDuration)
return d
}
// TakeMaxDuration is like Take, except that
// it will only take tokens from the bucket if the wait
// time for the tokens is no greater than maxWait.
//
// If it would take longer than maxWait for the tokens
// to become available, it does nothing and reports false,
// otherwise it returns the time that the caller should
// wait until the tokens are actually available, and reports
// true.
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
tb.Mu.Lock()
defer tb.Mu.Unlock()
return tb.take(tb.Clock.Now(), count, maxWait)
}
// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.Mu.Lock()
defer tb.Mu.Unlock()
return tb.takeAvailable(tb.Clock.Now(), count)
}
// takeAvailable is the internal version of TakeAvailable - it takes the
// current time as an argument to enable easy testing.
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.AvailableTokens <= 0 {
return 0
}
if count > tb.AvailableTokens {
count = tb.AvailableTokens
}
tb.AvailableTokens -= count
return count
}
// Available returns the number of available tokens. It will be negative
// when there are consumers waiting for tokens. Note that if this
// returns greater than zero, it does not guarantee that calls that take
// tokens from the buffer will succeed, as the number of available
// tokens could have changed in the meantime. This method is intended
// primarily for metrics reporting and debugging.
func (tb *Bucket) Available() int64 {
return tb.available(tb.Clock.Now())
}
// available is the internal version of available - it takes the current time as
// an argument to enable easy testing.
func (tb *Bucket) available(now time.Time) int64 {
tb.Mu.Lock()
defer tb.Mu.Unlock()
tb.adjustavailableTokens(tb.currentTick(now))
return tb.AvailableTokens
}
// GetCapacity returns the capacity that the bucket was created with.
func (tb *Bucket) GetCapacity() int64 {
return tb.Capacity
}
// Rate returns the fill rate of the bucket, in tokens per second.
func (tb *Bucket) Rate() float64 {
return 1e9 * float64(tb.Quantum) / float64(tb.FillInterval)
}
// take is the internal version of Take - it takes the current time as
// an argument to enable easy testing.
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
if count <= 0 {
return 0, true
}
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
avail := tb.AvailableTokens - count
if avail >= 0 {
tb.AvailableTokens = avail
return 0, true
}
// Round up the missing tokens to the nearest multiple
// of quantum - the tokens won't be available until
// that tick.
// endTick holds the tick when all the requested tokens will
// become available.
endTick := tick + (-avail+tb.Quantum-1)/tb.Quantum
endTime := tb.StartTime.Add(time.Duration(endTick) * tb.FillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
return 0, false
}
tb.AvailableTokens = avail
return waitTime, true
}
// currentTick returns the current time tick, measured
// from tb.startTime.
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.StartTime) / tb.FillInterval)
}
// adjustavailableTokens adjusts the current number of tokens
// available in the bucket at the given time, which must
// be in the future (positive) with respect to tb.latestTick.
func (tb *Bucket) adjustavailableTokens(tick int64) {
if tb.AvailableTokens >= tb.Capacity {
return
}
tb.AvailableTokens += (tick - tb.LatestTick) * tb.Quantum
if tb.AvailableTokens > tb.Capacity {
tb.AvailableTokens = tb.Capacity
}
tb.LatestTick = tick
return
}
// Clock represents the passage of time in a way that
// can be faked out for tests.
type Clock interface {
// Now returns the current time.
Now() time.Time
// Sleep sleeps for at least the given duration.
Sleep(d time.Duration)
}
// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now implements Clock.Now by calling time.Now.
func (realClock) Now() time.Time {
return time.Now()
}
// Now implements Clock.Sleep by calling time.Sleep.
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}

View File

@@ -0,0 +1,63 @@
package demo0
import (
"fmt"
"github.com/xxjwxc/public/myredis"
"time"
)
var _redis myredis.RedisDial
func init() {
_redis = InitRedis()
}
func InitRedis() myredis.RedisDial {
conf := myredis.InitRedis(myredis.WithAddr("127.0.0.1:6379"), myredis.WithClientName(""),
myredis.WithPool(2, 2),
myredis.WithTimeout(10*time.Second), myredis.WithReadTimeout(10*time.Second), myredis.WithWriteTimeout(10*time.Second),
myredis.WithPwd(""), myredis.WithGroupName("gggg"), myredis.WithDB(0))
//获取
conn, err := myredis.NewRedis(conf)
if err != nil {
fmt.Printf("Redis init error: %v\n", err.Error())
}
return conn
}
// NewRateLimit 创建一个桶
func NewRateLimit(bucketName string, fillInterval time.Duration, capacity int64) *Bucket {
var limiter *Bucket
if !_redis.IsExist(bucketName) { // 不存在 创建一个新的桶
limiter = NewBucket(fillInterval, capacity) // 新的桶
err := _redis.Add(bucketName, &limiter, 0) // 添加redis
if err != nil {
fmt.Printf("Redis add error: %v\n", err.Error())
}
} else { // 存在 直接取
err := _redis.Value(bucketName, &limiter)
if err != nil {
fmt.Printf("Redis get value error: %v\n", err.Error())
}
}
return limiter
}
// GetAvailable 取出桶中count个令牌
func (b *Bucket) GetAvailable(bucketName string, fillInterval time.Duration, count int64) int64 {
if fillInterval != 0 {
if time.Now().Sub(b.StartTime) - fillInterval > 0 { // 大于周期 清除redis重新建桶
err := _redis.Delete(bucketName)
if err != nil {
fmt.Printf("Redis delete error: %v\n", err.Error())
}
limit := NewRateLimit(bucketName, fillInterval, count)
return limit.TakeAvailable(1)
}
}
// 正常取令牌
return b.TakeAvailable(count)
}

View File

@@ -0,0 +1,105 @@
package demo0
import (
"fmt"
"github.com/garyburd/redigo/redis"
ratelimit2 "github.com/juju/ratelimit"
"github.com/xxjwxc/public/myredis"
ratelimit1 "go.uber.org/ratelimit"
"golang.org/x/time/rate"
"testing"
"time"
)
func TestTimeRate(t *testing.T) {
limiter := rate.NewLimiter(rate.Every(2*time.Second), 1)
for i := 0; i < 10; i++ {
prev := time.Now()
now := limiter.Reserve()
if !now.OK() {
fmt.Println("no")
}
fmt.Println(i, prev)
time.Sleep(time.Second)
}
}
func TestRateLimit(t *testing.T) {
limiter := ratelimit1.New(1, ratelimit1.Per(time.Second*2))
prev := time.Now()
for i := 0; i < 10; i++ {
now := limiter.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
}
func TestRateLimit2(t *testing.T) {
bucket := ratelimit2.NewBucket(time.Minute*2, 100)
fmt.Println(bucket.Available())
fmt.Println(bucket.TakeAvailable(1))
fmt.Println(bucket.Available())
for i := 0; i < 120; i++ {
before := bucket.Available()
tokenGet := bucket.TakeAvailable(1)
if tokenGet != 0 {
fmt.Println("获取到令牌 index=", i+1, "前后数量-> 前:", before, ", 后: ", bucket.Available(), ", tokenGet=", tokenGet)
} else {
fmt.Println("未获取到令牌,拒绝", i+1)
}
time.Sleep(1 * time.Second)
}
}
func TestRedis(t *testing.T) {
//通过go向redis写入数据和读取数据
//1. 链接到redis
conf := myredis.InitRedis(myredis.WithAddr("127.0.0.1:6379"), myredis.WithClientName(""),
myredis.WithPool(2, 2),
myredis.WithTimeout(10*time.Second), myredis.WithReadTimeout(10*time.Second), myredis.WithWriteTimeout(10*time.Second),
myredis.WithPwd(""), myredis.WithGroupName("gggg"), myredis.WithDB(0))
//获取
conn, err := myredis.NewRedis(conf)
//2. 通过go向redis写入数据strinf [key-val]
_, err = conn.Do("Set", "name", "tomjerry 猫猫")
if err != nil {
fmt.Println("set err = ", err)
return
}
//3. 通过go 向redis读取数据string [key-val]
r, err := redis.String(conn.Do("Get", "name"))
if err != nil {
fmt.Println("set err = ", err)
return
}
//因为返回r是interface{}
//因为name对应的值是string ,因此我们需要转换
//nameString := r.(string)
fmt.Println("操作OK", r)
}
func TestNewRateLimit(t *testing.T) {
bucketName, fillInterval, count := "test1", time.Second*30, 10
_redis.Delete(bucketName)
//bucket := NewBucket(fillInterval, 10)
//
//_redis.Add(bucketName, &bucket, 0) // 添加redis
//
limit := NewRateLimit(bucketName, fillInterval, int64(count))
for i := 0; i < 130; i++ {
//var limiter Bucket
//_redis.Value(bucketName, &limiter)
//fmt.Println("----", limiter.Available())
before := limit.Available()
tokenGet := limit.GetAvailable(bucketName, fillInterval, 1)
if tokenGet != 0 {
fmt.Println("获取到令牌 index=", i+1, "前后数量-> 前:", before, ", 后: ", limit.Available(), ", tokenGet=", tokenGet)
} else {
fmt.Println("未获取到令牌,拒绝", i+1)
}
time.Sleep(1 * time.Second)
}
}

View File

@@ -0,0 +1,192 @@
package richie
import (
"errors"
"sync"
"time"
)
/*
用环形队列做为底层数据结构来存储用户访问数据,并能实现自动增长以及收缩
*/
//使用切片实现的队列
type autoGrowCircleQueueInt64 struct {
key interface{}
//注意maxSize比实际存储长度大1
maxSize int
//maxSizeTemp与visitorRecord长度相同,visitorRecord长度设计根据实际情况成自动增长
maxSizeTemp int
visitorRecord []int64
head int //头
tail int //尾
//存盘时临时用到的虚拟队列的头和尾
headForCopy int
tailForCopy int
locker *sync.Mutex
}
//初始化环形队列,长度超过1023的队列暂时只分配1023的空间
func newAutoGrowCircleQueueInt64(size int) *autoGrowCircleQueueInt64 {
var c autoGrowCircleQueueInt64
c.maxSize = size + 1
if c.maxSize > 1024 {
c.maxSizeTemp = 1024
} else {
c.maxSizeTemp = c.maxSize
}
c.visitorRecord = make([]int64, c.maxSizeTemp)
c.locker = new(sync.Mutex)
return &c
}
//队列无人使用时,对于队列实际使用空间长度大于1023的需要对此队列做收缩操作以节省空间
func (q *autoGrowCircleQueueInt64) reSet() {
q.locker.Lock()
defer q.locker.Unlock()
if q.maxSize > 1024 && q.maxSizeTemp > 1024 {
newVisitorRecord := make([]int64, 1024)
q.visitorRecord = newVisitorRecord
q.maxSizeTemp = 1024
q.head = 0
q.tail = 0
}
}
//队列是否需要扩容
func (q *autoGrowCircleQueueInt64) needGrow() bool {
if q.maxSizeTemp == q.maxSize {
return false
}
if q.tempQueueIsFull() {
return true
}
return false
}
//对队列进行扩容操作
func (q *autoGrowCircleQueueInt64) grow() {
newVisitorRecordLen := len(q.visitorRecord) * 2
if newVisitorRecordLen > q.maxSize {
newVisitorRecordLen = q.maxSize
}
newVisitorRecord := make([]int64, newVisitorRecordLen)
//复制数据
oldQueueLen := q.tempQueueLen()
for i := 0; i < oldQueueLen; i++ {
newVisitorRecord[i] = q.visitorRecord[q.head]
q.head = (q.head + 1) % q.maxSizeTemp
}
//新旧数据替换
q.visitorRecord = newVisitorRecord
q.maxSizeTemp = newVisitorRecordLen
q.head = 0
q.tail = oldQueueLen
}
//访问时间入对列,只用于从本地备份文件加载历史访问数据,本身是线性访问,无并发安全问题
func (q *autoGrowCircleQueueInt64) push(val int64) (err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.needGrow() {
q.grow()
}
if q.tempQueueIsFull() {
return errors.New("queue is full")
}
q.visitorRecord[q.tail] = val
q.tail = (q.tail + 1) % q.maxSizeTemp
return
}
//访问时间入对列,并发安全,由于不同协程在高并发的时候,极端情况下,也即前后两次访问的时间差,与两协程的系统切换时间非常接近的情况下
//由调用者自己生成时间容易出现紊乱的情况,所以访问时间只能到这个地方来统一生成,也即有极小的概率,先访问的时间比后访问的时间大
func (q *autoGrowCircleQueueInt64) pushWithConcurrencysafety(defaultExpiration time.Duration) (err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.needGrow() {
q.grow()
}
if q.tempQueueIsFull() {
return errors.New("queue is full")
}
q.visitorRecord[q.tail] = time.Now().Add(defaultExpiration).UnixNano()
q.tail = (q.tail + 1) % q.maxSizeTemp
return
}
//出对列
func (q *autoGrowCircleQueueInt64) pop() (val int64, err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.tempQueueIsEmpty() {
return 0, errors.New("queue is empty")
}
val = q.visitorRecord[q.head]
q.head = (q.head + 1) % q.maxSizeTemp
return
}
//用于备份数据的时候,虚拟队列的出队列操作,但实际未进行出队列操作
func (q *autoGrowCircleQueueInt64) tempQueuePopForCopy() (val int64, err error) {
if q.tempQueueIsEmptyForCopy() {
return 0, errors.New("queue is empty")
}
val = q.visitorRecord[q.headForCopy]
q.headForCopy = (q.headForCopy + 1) % q.maxSizeTemp
return
}
//用于备份数据的时候,判断虚拟队列是否已满
func (q *autoGrowCircleQueueInt64) tempQueueIsFull() bool {
return (q.tail+1)%q.maxSizeTemp == q.head
}
//判断队列是否为空
func (q *autoGrowCircleQueueInt64) tempQueueIsEmpty() bool {
return q.tail == q.head
}
//用于备份数据的时候,判断虚拟队列是否为空
func (q *autoGrowCircleQueueInt64) tempQueueIsEmptyForCopy() bool {
return q.tailForCopy == q.headForCopy
}
//判断队列已使用多少个元素
func (q *autoGrowCircleQueueInt64) usedSize() int {
return (q.tail + q.maxSizeTemp - q.head) % q.maxSizeTemp
}
//判断队列中还有多少空间未使用
func (q *autoGrowCircleQueueInt64) tempQueueUnUsedSize() int {
return q.maxSizeTemp - 1 - q.usedSize()
}
//判断队列中还有多少空间未使用
func (q *autoGrowCircleQueueInt64) unUsedSize() int {
return q.maxSize - 1 - ((q.tail + q.maxSizeTemp - q.head) % q.maxSizeTemp)
}
//队列总的可用空间长度
func (q *autoGrowCircleQueueInt64) tempQueueLen() int {
return q.maxSizeTemp - 1
}
//删除过期数据
func (q *autoGrowCircleQueueInt64) deleteExpired(key interface{}) {
q.locker.Lock()
defer q.locker.Unlock()
now := time.Now().UnixNano()
size := q.usedSize()
if size == 0 {
return
}
//依次删除过期数据
for i := 0; i < size; i++ {
if now > q.visitorRecord[q.head] {
q.head = (q.head + 1) % q.maxSizeTemp
} else {
return
}
}
}

View File

@@ -0,0 +1,13 @@
package richie
import (
"log"
)
func (r *Rule) RateLimit(username string) {
if r.AllowVisit(username) {
log.Println(username, "访问1次,剩余:", r.RemainingVisits(username))
} else {
log.Println(username, "访问过多,稍后再试")
}
}

View File

@@ -0,0 +1,127 @@
package richie
import (
"fmt"
"github.com/yudeguang/ratelimit"
"log"
"strconv"
"sync"
"testing"
"time"
)
func TestRateLimit(t *testing.T) {
r := NewRule()
//步骤二:增加一条或者多条规则组成复合规则,规则必须至少包含一条规则
r.AddRule(time.Second*3, 1)
r.AddRule(time.Second*40, 10)
for i := 0; i < 120; i++ {
r.RateLimit("Richie")
time.Sleep(time.Second)
}
}
func Test1(t *testing.T) {
var Visits int //因并发问题num比实际数量稍小
fmt.Println("\r\n测试1,性能测试预计耗时1分钟请耐心等待:")
//步骤一:初始化
r := NewRule()
//步骤二:增加一条或者多条规则组成复合规则,规则必须至少包含一条规则
//此处对于性能测试,为方便准确计数,只需要添加一条规则
r.AddRule(time.Second*10, 1000) //每10秒只允许访问1000次
/*
r.AddRule(time.Second*10, 10) //每10秒只允许访问10次
r.AddRule(time.Minute*30, 1000) //每30分钟只允许访问1000次
r.AddRule(time.Hour*24, 5000) //每天只允许访问500次
*/
//步骤三(可选):从本地磁盘加载历史访问数据
//r.LoadingAndAutoSaveToDisc("test1", time.Second*10) //设置10秒备份一次(不填写则默认60秒备份一次)备份到程序当前文件夹下文件名为test1.ratelimit
log.Println("性能测试正式开始")
//步骤四:调用函数判断某用户是否允许访问
/*
allow:= r.AllowVisit(user)
*/
//构建若干个用户,模拟用户访问
var users = make(map[string]bool)
for i := 1; i < 1000; i++ {
users["user_"+strconv.Itoa(i)] = true
}
begin := time.Now()
//模拟多个协程访问
chanNum := 200
var wg sync.WaitGroup
wg.Add(chanNum)
for i := 0; i < chanNum; i++ {
go func(i int, wg *sync.WaitGroup) {
for ii := 0; ii < 5000; ii++ {
for user := range users {
for {
Visits++
if !r.AllowVisit(user) {
break
}
}
}
}
wg.Done()
}(i, &wg)
}
//所有线程结束,完工
wg.Wait()
t1 := int(time.Now().Sub(begin).Seconds())
log.Println("性能测试完成:共计访问", Visits, "次,", "耗时", t, "秒,即每秒约完成", Visits/t1, "次操作")
//步骤五(可选):程序退出前主动手动存盘
//err := r.SaveToDiscOnce() //在自动备份的同时,还支持手动备份,一般在程序要退出时调用此函数
//if err == nil {
// log.Println("完成手动数据备份")
//} else {
// log.Println(err)
//}
}
func Test2(t *testing.T) {
fmt.Println("\r\n测试2模拟用户访问并打印:")
//步骤一:初始化
r := ratelimit.NewRule()
//步骤二:增加一条或者多条规则组成复合规则,规则必须至少包含一条规则
r.AddRule(time.Second*10, 5) //每10秒只允许访问5次
r.AddRule(time.Minute*30, 50) //每30分钟只允许访问50次
r.AddRule(time.Hour*24, 500) //每天只允许访问500次
//步骤三:调用函数判断某用户是否允许访问
/*
allow:= r.AllowVisit(user)
*/
//构建若干个用户,模拟用户访问
users := []string{"andyyu", "tony", "chery"}
for _, user := range users {
fmt.Println("\r\n开始模拟以下用户访问:", user)
for {
if r.AllowVisit(user) {
log.Println(user, "访问1次,剩余:", r.RemainingVisits(user))
} else {
log.Println(user, "访问过多,稍后再试")
break
}
time.Sleep(time.Second * 1)
}
}
//打印所有用户访问数据情况
fmt.Println("开始打印所有用户在相关时间段内详细的剩余访问次数情况:")
for _, user := range users {
fmt.Println(user)
fmt.Println(" 概述:", r.RemainingVisits(user))
fmt.Println(" 具体:")
r.PrintRemainingVisits(user)
fmt.Println("")
}
/*
在实际的平台运行过程中,往往会因为各种原因,某个客户的访问量过大,被系统临时禁止访问,这时候
这个客户就可能会投诉之类的,根据运营的实际需要,就需要手动清除掉某用户的访问记录,让其可以再继续访问。
对于函数ManualEmptyVisitorRecordsOf(),一般需要自行通过合理的方式,比如自行封装一个HTTP服务来间接调用
*/
log.Println("开始测试手动清楚某用户访问记录.")
log.Println("chery清空访问记录前,剩余:", r.RemainingVisits("chery"))
r.ManualEmptyVisitorRecordsOf("chery")
log.Println("chery清空访问记录后,剩余:", r.RemainingVisits("chery"))
}

104
ratelimit/demo1/rule.go Normal file
View File

@@ -0,0 +1,104 @@
package richie
import (
"math"
"sort"
"sync"
"time"
)
//用户访问控制策略,可由一个或多个访问控制规则组成
type Rule struct {
rules []*singleRule
//以下用于备份数据,在需要备份时才在作用
needBackup bool //是否需要把数据备份到硬盘开启备份之后不允许再临时增加规则singleRule
backupFileName string //缓存存到硬盘上的文件名
backUpInterval time.Duration //默认多长时间需要执行一次数据备份操作
lockerForBackup *sync.Mutex //用于数据备份
loadBackupFileOnce sync.Once
}
/*
初始化一个多重规则的频率控制策略,例:
r := NewRule()
初始化之后紧跟着需要调用AddRule方法增加一条或若干条用户访问控制策略增加用户访问控制策略后才可以正式使用
*/
func NewRule() *Rule {
return new(Rule)
}
/*
增加用户访问控制策略,例:
r.AddRule(time.Minute*5, 20)
r.AddRule(time.Minute*30, 50)
r.AddRule(time.Hour*24, 200)
它表示:
在5分钟内每个用户最多允许访问20次
在30分钟内每个用户最多允许访问50次
在24小时内每个用户最多允许访问200次
其中:
defaultExpiration 表示在某个时间段内
numberOfAllowedAccesses 表示允许访问的次数
estimatedNumberOfOnlineUserNum 表示预计可能有多少人访问,此参数为可变参数,可不填写
以上任何一条用户访问控制策略没通过,都不允许访问注意单条规则中不宜设定监控时间段过大的规则比如设定监控某个用户一个月甚至是1年的访问规则它会占用大多的内存
*/
func (r *Rule) AddRule(defaultExpiration time.Duration, numberOfAllowedAccesses int, estimatedNumberOfOnlineUserNum ...int) {
//开启备份之后,不下允许添加规则
if r.needBackup {
//panic("cant't use AddRule after LoadingAndAutoSaveToDisc")
}
r.rules = append(r.rules, newsingleRule(defaultExpiration, numberOfAllowedAccesses, estimatedNumberOfOnlineUserNum...))
//把时间控制调整为从小到大排列,防止用户在实例化的时候,未按照预期的时间顺序添加,导致某些规则失效
sort.Slice(r.rules, func(i int, j int) bool {
return r.rules[i].defaultExpiration < r.rules[j].defaultExpiration
})
//如果有多条规则,单位时间内所承载的访问量需要有递进关系,否则则非法
if len(r.rules) > 1 {
var pre = math.MaxFloat64
for _, v := range r.rules {
cur := float64(v.numberOfAllowedAccesses) / float64(v.defaultExpiration.Nanoseconds())
if cur > pre {
// panic(`This rule is illegal,please modify the relevant rules:"allow ` + strconv.Itoa(v.numberOfAllowedAccesses) + ` visits within ` + v.defaultExpiration.String() +
// `" can't be bigger than "allow ` + strconv.Itoa(r.rules[i-1].numberOfAllowedAccesses) + ` visits within ` + r.rules[i-1].defaultExpiration.String() + `"`)
}
pre = cur
}
}
}
/*
是否还允许某用户访问,如果访问量过多,超出各细分规则中任何一条规则规定的访问量,则不允许访问
无论是否允许访问都会尝试在各细分访问规则记录中增加一条访问日志记录函数AllowVisit也可以认为
是AddRecords
例:
AllowVisit("username")
*/
func (r *Rule) AllowVisit(key interface{}) bool {
if len(r.rules) == 0 {
//panic("rule is emptyplease add rule by AddRule")
}
//这个地方需要注意,如果前面的某些策略通过,但是后面的策略不通过。这时候,在前面允许访问的策略中,
//允许访问次数是会减少的,我们这里并没有严格的做回滚操作。
//原因在于一方面是性能,另外一方面是随着
//时间流逝,前面的策略中允许访问的次数很快就会自动增长。
for i := range r.rules {
if !r.rules[i].allowVisit(key) {
return false
}
}
return true
}
/*
人工清空某用户的访问数据,主要针对某些特定客户的个性化需求,比如某个客户要求临时允许其访问更多的页面,
此时,调用出函数,清空其历史访问数据,间接实现这个目的,例:
ManualEmptyVisitorRecordsOf("andyyu")
*/
func (r *Rule) ManualEmptyVisitorRecordsOf(key interface{}) {
if len(r.rules) == 0 {
//panic("rule is emptyplease add rule by AddRule")
}
for i := range r.rules {
r.rules[i].manualEmptyVisitorRecordsOf(key)
}
}

202
ratelimit/demo1/save.go Normal file
View File

@@ -0,0 +1,202 @@
package richie
//
//import (
//"bufio"
//"bytes"
//"encoding/binary"
//"io"
//"os"
//"path/filepath"
//"strings"
//"sync"
//"time"
//)
//
//// LoadingAndAutoSaveToDisc 如果有历史备份文件则加载无历史备份文件则后续自动生成并且开启自动保存默认60秒完成一次存盘
//func (r *Rule) LoadingAndAutoSaveToDisc(backupFileName string, backUpInterval ...time.Duration) {
// r.loadBackupFileOnce.Do(func() {
// r.lockerForBackup = new(sync.Mutex)
// if len(r.rules) == 0 {
// //panic("rule is emptyplease add rule by AddRule")
// }
// r.needBackup = true
// r.backupFileName = strings.Split(backupFileName, ".")[0]
// if len(backUpInterval) == 0 {
// //默认60秒存盘一次
// r.backUpInterval = time.Second * 60
// } else {
// r.backUpInterval = backUpInterval[0]
// }
// if r.backupFileName == "" {
// //panic("backupFileName err:" + backupFileName)
// }
// //初次运行程序时,无备份文件,不认为是错误
// // err := r.loading()
// // if err != nil {
// // if !strings.HasPrefix(err.Error(), "Open backup file fail") {
// // panic(err.Error() + ` please repair or remove the backup file:"` + r.backupFileName + `.ratelimit" and then restart this program.`)
// // }
// // }
// var err error
// go func() {
// finished := true
// for range time.Tick(r.backUpInterval) {
// //如果数据量较大,那么在一个时间周期内不一定会完成存盘操作,所以要判断上一轮次的存盘是否完成
// if finished {
// finished = false
// err = r.SaveToDiscOnce()
// finished = true
// }
// }
// }()
// })
//}
//
////把数据保存到硬盘上,仅支持key为string,int,int64等类型数据的缓存
//func (r *Rule) SaveToDiscOnce() (err error) {
// r.lockerForBackup.Lock()
// defer r.lockerForBackup.Unlock()
// if len(r.rules) == 0 {
// //panic("rule is emptyplease add rule by AddRule")
// }
// if !r.needBackup {
// //panic("If you want't to SaveToDiscOnce,you should use LoadingAndAutoSaveToDisc after AddRule.")
// }
// f, err := os.Create(r.backupFileName + ".ratelimit_temp")
// if err != nil {
// return err
// }
// defer os.Remove(r.backupFileName + ".ratelimit_temp")
// buf := bufio.NewWriterSize(f, 40960)
// //1 先写规则数量
// _, err = buf.Write(uint64ToByte(uint64(len(r.rules))))
// if err != nil {
// return err
// }
// //2 依次写入每一组数据
// for i := range r.rules {
// curRuleData := new(bytes.Buffer)
// tempBuf := bufio.NewWriterSize(curRuleData, 40960)
// curRuleKeyNum := 0
// r.rules[i].usedVisitorRecordsIndex.Range(func(key, Index interface{}) bool {
// index := Index.(int)
// //备份过程中,不允许其它操作,加锁
// r.rules[i].visitorRecords[index].locker.Lock()
// //有效的才能加进去
// //2.3.1 写入keykey指用户名IP等只能是数字或string
// switch key.(type) {
// case string:
// //与其它类型不同KEY长度是不定长的
// tempBuf.Write([]byte{0x00})
// tempBuf.Write(uint64ToByte(uint64(len(key.(string)))))
// tempBuf.WriteString(key.(string))
// case int:
// tempBuf.Write([]byte{0x01})
// tempBuf.Write(uint64ToByte(uint64(key.(int))))
// case int8:
// tempBuf.Write([]byte{0x02})
// tempBuf.Write(uint64ToByte(uint64(key.(int8))))
// case int16:
// tempBuf.Write([]byte{0x03})
// tempBuf.Write(uint64ToByte(uint64(key.(int16))))
// case int32:
// tempBuf.Write([]byte{0x04})
// tempBuf.Write(uint64ToByte(uint64(key.(int32))))
// case int64:
// tempBuf.Write([]byte{0x05})
// tempBuf.Write(uint64ToByte(uint64(key.(int64))))
// case uint:
// tempBuf.Write([]byte{0x06})
// tempBuf.Write(uint64ToByte(uint64(key.(uint))))
// case uint8:
// tempBuf.Write([]byte{0x07})
// tempBuf.Write(uint64ToByte(uint64(key.(uint8))))
// case uint16:
// tempBuf.Write([]byte{0x08})
// tempBuf.Write(uint64ToByte(uint64(key.(uint16))))
// case uint32:
// tempBuf.Write([]byte{0x09})
// tempBuf.Write(uint64ToByte(uint64(key.(uint32))))
// case uint64:
// tempBuf.Write([]byte{0x0A})
// tempBuf.Write(uint64ToByte(key.(uint64)))
// default:
// //panic("key type can only be string,int,int8,int16,int32,int64,uint,uint8,uint16,uint32,uint64")
// }
// r.rules[i].visitorRecords[index].tailForCopy = r.rules[i].visitorRecords[index].tail
// r.rules[i].visitorRecords[index].headForCopy = r.rules[i].visitorRecords[index].head
// size := r.rules[i].visitorRecords[index].usedSize()
// //2.3.2写下当前key对应的有效访问记录数,为了简单,不判断其是否过期
// tempBuf.Write(uint64ToByte(uint64(size)))
// if size > 0 {
// for ii := 0; ii < size; ii++ {
// val, _ := r.rules[i].visitorRecords[index].tempQueuePopForCopy()
// //2.3.3写下每条访问数据的时间点
// tempBuf.Write(uint64ToByte(uint64(val)))
// }
// }
// curRuleKeyNum++
// r.rules[i].visitorRecords[index].locker.Unlock()
// return true
// })
// //2.1 //先写当前下标
// buf.Write(uint64ToByte(uint64(i)))
// //2.2 再写当前键的个数
// buf.Write(uint64ToByte(uint64(curRuleKeyNum)))
// //2.3再写某个键下面的所有数据,如果无数据,则不写
// //tempBuf由上面提前算出
// if curRuleKeyNum > 0 {
// tempBuf.Flush()
// b := curRuleData.Bytes()
// buf.Write(b)
// }
// }
// buf.Flush()
// err = f.Close()
// if err != nil {
// return
// }
// //成功生成临时文件后,成替换正式文件
// _, err = copyFile(r.backupFileName+".ratelimit", r.backupFileName+".ratelimit_temp")
// return
//}
//func uint64ToByte(i uint64) []byte {
// b := make([]byte, 8)
// binary.LittleEndian.PutUint64(b, i)
// return b
//}
//
////复制文件,目标文件所在目录不存在,则创建目录后再复制
////Copy(`d:\test\hello.txt`,`c:\test\hello.txt`)
//func copyFile(dstFileName, srcFileName string) (w int64, err error) {
// //打开源文件
// srcFile, err := os.Open(srcFileName)
// if err != nil {
// return 0, err
// }
// defer srcFile.Close()
// // 创建新的文件作为目标文件
// dstFile, err := os.Create(dstFileName)
// if err != nil {
// //如果出错,很可能是目标目录不存在,需要先创建目标目录
// err = os.MkdirAll(filepath.Dir(dstFileName), 0666)
// if err != nil {
// return 0, err
// }
// //再次尝试创建
// dstFile, err = os.Create(dstFileName)
// if err != nil {
// return 0, err
// }
// }
// defer dstFile.Close()
// //通过bufio实现对大文件复制的自动支持
// dst := bufio.NewWriter(dstFile)
// defer dst.Flush()
// src := bufio.NewReader(srcFile)
// w, err = io.Copy(dst, src)
// if err != nil {
// return 0, err
// }
// return w, err
//}

View File

@@ -0,0 +1,184 @@
package richie
import (
"sync"
"time"
)
//单组用户访问控制策略
type singleRule struct {
defaultExpiration time.Duration //表示计时周期,每条访问记录需要保存的时长,超过这个时长的数据记录将会被清除
numberOfAllowedAccesses int //在计时周期内最多允许访问的次数
estimatedNumberOfOnlineUsers int //在计时周期内预计有多少个用户会访问网站,建议选用一个稍大于实际值的值,以减少内存分配次数
cleanupInterval time.Duration //默认多长时间需要执行一次清除过期数据操作
visitorRecords []*autoGrowCircleQueueInt64 //用于存储用户的每一条访问记录
usedVisitorRecordsIndex sync.Map //存储visitorRecords中已使用的数据索引,key代表用户名或IP,为文本或数字类型,value代表visitorRecords中的下标位置
notUsedVisitorRecordsIndex map[int]struct{} //对应visitorRecords中未使用的数据的下标位置其自身非并发安全其并发安全由locker实现,因sync.Map计算长度不优
lockerForKeyIndex *sync.RWMutex //只用于分配用户KEY即只需保证用户KEY正确的分配在usedVisitorRecordsIndex与notUsedVisitorRecordsIndex
}
/*
初始化一个条单组用户访问控制控制策略,例:
vc := newsingleRule(time.Minute*30, 50)
或者 vc := newsingleRule(time.Minute*30, 50, 1000)
它表示:
在30分钟内每个用户最多允许访问50次,并且我们预计在这30分钟内大致有1000个用户会访问我们的网站
1000为可选字段此参数可默认不填写主要是用于提升性能类似于声明切片时的cap,绝大部分情况下无需关注此参数。
*/
func newsingleRule(defaultExpiration time.Duration, numberOfAllowedAccesses int, estimatedNumberOfOnlineUserNum ...int) *singleRule {
//规范化numberOfAllowedAccesses
//若参数numberOfAllowedAccesses设置是否合理在此被强行修改为1
if numberOfAllowedAccesses <= 0 {
numberOfAllowedAccesses = 1
}
//规范化estimatedNumberOfOnlineUsers
//estimatedNumberOfOnlineUsers没填写,或者是乱填写的,就默认用numberOfAllowedAccesses
estimatedNumberOfOnlineUsers := 0
if len(estimatedNumberOfOnlineUserNum) > 0 {
estimatedNumberOfOnlineUsers = estimatedNumberOfOnlineUserNum[0]
}
if estimatedNumberOfOnlineUsers <= 0 {
estimatedNumberOfOnlineUsers = numberOfAllowedAccesses
//普遍而言某一段时间内在线用户数达到1000已经较大所以除非用户指定estimatedNumberOfOnlineUserNum否则最大值定义为1000
//在线用户数是指在某一段时间内访问过的唯一用户总数
if estimatedNumberOfOnlineUsers > 1000 {
estimatedNumberOfOnlineUsers = 1000
}
}
//规范化defaultExpiration
cleanupInterval := defaultExpiration / 100
//强行修正清除过期数据的最长时间间隔与最短时间间隔
if cleanupInterval < time.Second*1 {
cleanupInterval = time.Second * 1
}
if cleanupInterval > time.Second*60 {
cleanupInterval = time.Second * 60
}
vc := createsingleRule(defaultExpiration, cleanupInterval, numberOfAllowedAccesses, estimatedNumberOfOnlineUsers)
//定期清除过期数据,并定期清理内存
go vc.deleteExpired()
return vc
}
func createsingleRule(defaultExpiration, cleanupInterval time.Duration, numberOfAllowedAccesses, estimatedNumberOfOnlineUsers int) *singleRule {
var vc singleRule
vc.defaultExpiration = defaultExpiration
vc.cleanupInterval = cleanupInterval
vc.numberOfAllowedAccesses = numberOfAllowedAccesses
vc.estimatedNumberOfOnlineUsers = estimatedNumberOfOnlineUsers
vc.notUsedVisitorRecordsIndex = make(map[int]struct{})
vc.lockerForKeyIndex = new(sync.RWMutex)
//根据在线用户数量初始化用户访问记录数据
vc.visitorRecords = make([]*autoGrowCircleQueueInt64, vc.estimatedNumberOfOnlineUsers)
for i := range vc.visitorRecords {
vc.visitorRecords[i] = newAutoGrowCircleQueueInt64(vc.numberOfAllowedAccesses)
//刚刚开始时,所有数据都未使用,放入未使用索引中
vc.notUsedVisitorRecordsIndex[i] = struct{}{}
}
return &vc
}
//根据用户key返回其数据在visitorRecords中的下标
func (s *singleRule) getIndexFrom(key interface{}) int {
//大部分情况下是读只有少部分情况下是写这里本业务测试中读写锁的的测试性能大概是互斥锁的5倍
//只需要用到读锁
s.lockerForKeyIndex.RLock()
//现有访问记录中有,则直接返回
if index, exist := s.usedVisitorRecordsIndex.Load(key); exist {
s.lockerForKeyIndex.RUnlock()
return index.(int)
}
s.lockerForKeyIndex.RUnlock()
//以下需要用到互斥锁
s.lockerForKeyIndex.Lock()
defer s.lockerForKeyIndex.Unlock()
//visitorRecords有闲置空间则从闲置空间中获取一条来返回
if len(s.notUsedVisitorRecordsIndex) > 0 {
for index := range s.notUsedVisitorRecordsIndex {
delete(s.notUsedVisitorRecordsIndex, index)
s.usedVisitorRecordsIndex.Store(key, index)
s.visitorRecords[index].key = key
return index
}
}
//visitorRecords没有闲置空间时则需要插入一条新数据到visitorRecords中
queue := newAutoGrowCircleQueueInt64(s.numberOfAllowedAccesses)
queue.key = key
s.visitorRecords = append(s.visitorRecords, queue)
index := len(s.visitorRecords) - 1 //最后一条的位置即为新的索引位置
s.usedVisitorRecordsIndex.Store(key, index)
return index
}
//经过一段时间无访问数据时从usedVisitorRecordsIndex中删除用户Key
func (s *singleRule) updateIndexOf(key interface{}) {
s.lockerForKeyIndex.Lock()
defer s.lockerForKeyIndex.Unlock()
if index, exist := s.usedVisitorRecordsIndex.Load(key); exist {
s.usedVisitorRecordsIndex.Delete(key) //删除完过期数据之后,如果该用户的所有访问记录均过期了,那么就删除该用户
s.notUsedVisitorRecordsIndex[index.(int)] = struct{}{} //并把该空间返还给notUsedVisitorRecordsIndex以便下次重复使用
}
}
//是否允许访问,允许访问则往访问记录中加入一条访问记录
func (s *singleRule) allowVisit(key interface{}) bool {
return s.add(key) == nil
}
//剩余访问次数
func (s *singleRule) remainingVisits(key interface{}) int {
index := s.getIndexFrom(key)
return s.visitorRecords[index].unUsedSize()
}
//增加一条访问记录
func (s *singleRule) add(key interface{}) (err error) {
index := s.getIndexFrom(key)
s.visitorRecords[index].deleteExpired(key)
return s.visitorRecords[index].pushWithConcurrencysafety(s.defaultExpiration)
}
//增加一条访问记录,从备份文件中增加,从备份文件中过来的数据不可信,有可能被不小心修改过,需要做校检
func (s *singleRule) addFromBackUpFile(key interface{}, reordFromBackUpFile int64) (err error) {
index := s.getIndexFrom(key)
s.visitorRecords[index].deleteExpired(key)
return s.visitorRecords[index].push(reordFromBackUpFile)
}
//清除访问记录
func (s *singleRule) manualEmptyVisitorRecordsOf(key interface{}) {
index := s.getIndexFrom(key)
for {
_, err := s.visitorRecords[index].pop()
if err != nil {
break
}
}
}
//删除过期数据
func (s *singleRule) deleteExpired() {
finished := true
for range time.Tick(s.cleanupInterval) {
//如果数据量较大,那么在一个清除周期内不一定会把所有数据全部清除,所以要判断上一轮次的清除是否完成
if finished {
finished = false
s.deleteExpiredOnce()
finished = true
}
}
}
//在特定时间间隔内执行一次删除过期数据操作
func (s *singleRule) deleteExpiredOnce() {
s.usedVisitorRecordsIndex.Range(func(key, indexVal interface{}) bool {
index := s.getIndexFrom(key)
s.visitorRecords[index].deleteExpired(key)
if s.visitorRecords[index].usedSize() == 0 {
//返回数据前,检察空间大小,太大的话,需要清理空间,把空间缩小到默认大小
s.visitorRecords[index].reSet()
s.updateIndexOf(key)
}
return true
})
}

View File

@@ -0,0 +1,73 @@
package richie
import (
"fmt"
"sort"
)
const (
Chinese = iota
English
)
/*
某用户剩余访问次数,例:
RemainingVisits("username")
*/
func (r *Rule) RemainingVisits(key interface{}) []int {
arr := make([]int, 0, len(r.rules))
for i := range r.rules {
arr = append(arr, r.rules[i].remainingVisits(key))
}
return arr
}
/*
打印各细分规则下的剩余访问次数
*/
func (r *Rule) PrintRemainingVisits(key interface{}, language ...int) {
//先确定语言,默认为中文,目前只支持中文,英文两种语言
lan := 0
if len(language) == 1 && language[0] == 1 {
lan = 1
}
for i := range r.rules {
if lan == 0 {
fmt.Println(key, "在", r.rules[i].defaultExpiration, "内共允许访问", r.rules[i].numberOfAllowedAccesses, "次,剩余", r.rules[i].remainingVisits(key))
} else {
fmt.Println(key, "allowed", r.rules[i].numberOfAllowedAccesses, "visits within", r.rules[i].defaultExpiration, ",with", r.rules[i].remainingVisits(key), "remaining")
}
}
}
//获得当前所有的在线用户,注意所有用int64存储的用户会被默认认为是IP地址会被自动转换为IP的字符串形式输出以方便查看
//如果不是本身就是以int64形式存储而不是IP4那么可以用ip4StringToInt64自己再转换回去
func (r *Rule) GetCurOnlineUsers() []string {
//向切片Sli中插入没出现过的元素V如果切片中有V则不插入
var insertIgnoreString = func(s []string, v string) []string {
for _, val := range s {
if val == v {
return s
}
}
s = append(s, v)
return s
}
var users []string
for i := range r.rules {
f := func(k, v interface{}) bool {
var user string
switch k.(type) {
case int64:
//user = int64ToIp4String(k.(int64))
default:
user = fmt.Sprint(k)
}
users = insertIgnoreString(users, user)
return true
}
r.rules[i].usedVisitorRecordsIndex.Range(f)
}
sort.Strings(users)
return users
}