feat: ratelimit rule

This commit is contained in:
chengjian1997
2022-03-29 15:40:56 +08:00
parent 2f8ad94b4f
commit 69c92de603
4 changed files with 590 additions and 0 deletions

192
ratelimit/circle_queue.go Normal file
View File

@@ -0,0 +1,192 @@
package ratelimit
import (
"errors"
"sync"
"time"
)
/*
用环形队列做为底层数据结构来存储用户访问数据,并能实现自动增长以及收缩
*/
// autoGrowCircleQueueInt64 使用切片实现的队列
type autoGrowCircleQueueInt64 struct {
key interface{}
// 注意maxSize比实际存储长度大1
maxSize int
// maxSizeTemp与visitorRecord长度相同,visitorRecord长度设计根据实际情况成自动增长
maxSizeTemp int
visitorRecord []int64
head int //头
tail int //尾
// 存盘时临时用到的虚拟队列的头和尾
headForCopy int
tailForCopy int
locker *sync.Mutex
}
// 初始化环形队列,长度超过1023的队列暂时只分配1023的空间
func newAutoGrowCircleQueueInt64(size int) *autoGrowCircleQueueInt64 {
var c autoGrowCircleQueueInt64
c.maxSize = size + 1
if c.maxSize > 1024 {
c.maxSizeTemp = 1024
} else {
c.maxSizeTemp = c.maxSize
}
c.visitorRecord = make([]int64, c.maxSizeTemp)
c.locker = new(sync.Mutex)
return &c
}
// 队列无人使用时,对于队列实际使用空间长度大于1023的需要对此队列做收缩操作以节省空间
func (q *autoGrowCircleQueueInt64) reSet() {
q.locker.Lock()
defer q.locker.Unlock()
if q.maxSize > 1024 && q.maxSizeTemp > 1024 {
newVisitorRecord := make([]int64, 1024)
q.visitorRecord = newVisitorRecord
q.maxSizeTemp = 1024
q.head = 0
q.tail = 0
}
}
// 队列是否需要扩容
func (q *autoGrowCircleQueueInt64) needGrow() bool {
if q.maxSizeTemp == q.maxSize {
return false
}
if q.tempQueueIsFull() {
return true
}
return false
}
// 对队列进行扩容操作
func (q *autoGrowCircleQueueInt64) grow() {
newVisitorRecordLen := len(q.visitorRecord) * 2
if newVisitorRecordLen > q.maxSize {
newVisitorRecordLen = q.maxSize
}
newVisitorRecord := make([]int64, newVisitorRecordLen)
// 复制数据
oldQueueLen := q.tempQueueLen()
for i := 0; i < oldQueueLen; i++ {
newVisitorRecord[i] = q.visitorRecord[q.head]
q.head = (q.head + 1) % q.maxSizeTemp
}
// 新旧数据替换
q.visitorRecord = newVisitorRecord
q.maxSizeTemp = newVisitorRecordLen
q.head = 0
q.tail = oldQueueLen
}
// 访问时间入对列,只用于从本地备份文件加载历史访问数据,本身是线性访问,无并发安全问题
func (q *autoGrowCircleQueueInt64) push(val int64) (err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.needGrow() {
q.grow()
}
if q.tempQueueIsFull() {
return errors.New("queue is full")
}
q.visitorRecord[q.tail] = val
q.tail = (q.tail + 1) % q.maxSizeTemp
return
}
// 访问时间入对列,并发安全,由于不同协程在高并发的时候,极端情况下,也即前后两次访问的时间差,与两协程的系统切换时间非常接近的情况下
// 由调用者自己生成时间容易出现紊乱的情况,所以访问时间只能到这个地方来统一生成,也即有极小的概率,先访问的时间比后访问的时间大
func (q *autoGrowCircleQueueInt64) pushWithConcurrencysafety(defaultExpiration time.Duration) (err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.needGrow() {
q.grow()
}
if q.tempQueueIsFull() {
return errors.New("queue is full")
}
q.visitorRecord[q.tail] = time.Now().Add(defaultExpiration).UnixNano()
q.tail = (q.tail + 1) % q.maxSizeTemp
return
}
// 出对列
func (q *autoGrowCircleQueueInt64) pop() (val int64, err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.tempQueueIsEmpty() {
return 0, errors.New("queue is empty")
}
val = q.visitorRecord[q.head]
q.head = (q.head + 1) % q.maxSizeTemp
return
}
// 用于备份数据的时候,虚拟队列的出队列操作,但实际未进行出队列操作
func (q *autoGrowCircleQueueInt64) tempQueuePopForCopy() (val int64, err error) {
if q.tempQueueIsEmptyForCopy() {
return 0, errors.New("queue is empty")
}
val = q.visitorRecord[q.headForCopy]
q.headForCopy = (q.headForCopy + 1) % q.maxSizeTemp
return
}
// 用于备份数据的时候,判断虚拟队列是否已满
func (q *autoGrowCircleQueueInt64) tempQueueIsFull() bool {
return (q.tail+1)%q.maxSizeTemp == q.head
}
// 判断队列是否为空
func (q *autoGrowCircleQueueInt64) tempQueueIsEmpty() bool {
return q.tail == q.head
}
// 用于备份数据的时候,判断虚拟队列是否为空
func (q *autoGrowCircleQueueInt64) tempQueueIsEmptyForCopy() bool {
return q.tailForCopy == q.headForCopy
}
// 判断队列已使用多少个元素
func (q *autoGrowCircleQueueInt64) usedSize() int {
return (q.tail + q.maxSizeTemp - q.head) % q.maxSizeTemp
}
// 判断队列中还有多少空间未使用
func (q *autoGrowCircleQueueInt64) tempQueueUnUsedSize() int {
return q.maxSizeTemp - 1 - q.usedSize()
}
// 判断队列中还有多少空间未使用
func (q *autoGrowCircleQueueInt64) unUsedSize() int {
return q.maxSize - 1 - ((q.tail + q.maxSizeTemp - q.head) % q.maxSizeTemp)
}
// 队列总的可用空间长度
func (q *autoGrowCircleQueueInt64) tempQueueLen() int {
return q.maxSizeTemp - 1
}
// 删除过期数据
func (q *autoGrowCircleQueueInt64) deleteExpired(key interface{}) {
q.locker.Lock()
defer q.locker.Unlock()
now := time.Now().UnixNano()
size := q.usedSize()
if size == 0 {
return
}
//依次删除过期数据
for i := 0; i < size; i++ {
if now > q.visitorRecord[q.head] {
q.head = (q.head + 1) % q.maxSizeTemp
} else {
return
}
}
}

94
ratelimit/rule.go Normal file
View File

@@ -0,0 +1,94 @@
package ratelimit
import (
"github.com/xxjwxc/public/errors"
"math"
"sort"
"strconv"
"time"
)
// Rule 用户访问控制策略,可由一个或多个访问控制规则组成
type Rule struct {
rules []*singleRule
}
/*
NewRule 初始化一个多重规则的频率控制策略,例:
r := NewRule()
初始化之后紧跟着需要调用AddRule方法增加一条或若干条用户访问控制策略增加用户访问控制策略后才可以正式使用
*/
func NewRule() *Rule {
return new(Rule)
}
/*
AddRule 增加用户访问控制策略,例:
r.AddRule(time.Minute*5, 20) 在5分钟内每个用户最多允许访问20次
r.AddRule(time.Minute*30, 50) 在30分钟内每个用户最多允许访问50次
r.AddRule(time.Hour*24, 200) 在24小时内每个用户最多允许访问200次
其中:
defaultExpiration 表示在某个时间段内
numberOfAllowedAccesses 表示允许访问的次数
estimatedNumberOfOnlineUserNum 表示预计可能有多少人访问,此参数为可变参数,可不填写
以上任何一条用户访问控制策略没通过,都不允许访问注意单条规则中不宜设定监控时间段过大的规则比如设定监控某个用户一个月甚至是1年的访问规则它会占用大多的内存
*/
func (r *Rule) AddRule(defaultExpiration time.Duration, numberOfAllowedAccesses int, estimatedNumberOfOnlineUserNum ...int) error {
r.rules = append(r.rules, newSingleRule(defaultExpiration, numberOfAllowedAccesses, estimatedNumberOfOnlineUserNum...))
// 把时间控制调整为从小到大排列,防止用户在实例化的时候,未按照预期的时间顺序添加,导致某些规则失效
sort.Slice(r.rules, func(i int, j int) bool {
return r.rules[i].defaultExpiration < r.rules[j].defaultExpiration
})
// 如果有多条规则,单位时间内所承载的访问量需要有递进关系,否则则非法
if len(r.rules) > 1 {
var pre = math.MaxFloat64
for i, v := range r.rules {
cur := float64(v.numberOfAllowedAccesses) / float64(v.defaultExpiration.Nanoseconds())
if cur > pre {
return errors.New(`This rule is illegal,please modify the relevant rules:"allow ` + strconv.Itoa(v.numberOfAllowedAccesses) + ` visits within ` + v.defaultExpiration.String() +
`" can't be bigger than "allow ` + strconv.Itoa(r.rules[i-1].numberOfAllowedAccesses) + ` visits within ` + r.rules[i-1].defaultExpiration.String() + `"`)
}
pre = cur
}
}
return nil
}
/*
AllowVisit 是否还允许某用户访问,如果访问量过多,超出各细分规则中任何一条规则规定的访问量,则不允许访问
无论是否允许访问都会尝试在各细分访问规则记录中增加一条访问日志记录函数AllowVisit也可以认为
是AddRecords
例:
AllowVisit("username")
*/
func (r *Rule) AllowVisit(key interface{}) (bool, error) {
if len(r.rules) == 0 {
return false, errors.New("rule is emptyplease add rule by AddRule")
}
// 这个地方需要注意,如果前面的某些策略通过,但是后面的策略不通过。这时候,在前面允许访问的策略中,
// 允许访问次数是会减少的,这里并没有严格的做回滚操作。
// 原因在于一方面是性能,另外一方面是随着时间流逝,前面的策略中允许访问的次数很快就会自动增长。
for i := range r.rules {
if !r.rules[i].allowVisit(key) {
return false, nil
}
}
return true, nil
}
/*
ManualEmptyVisitorRecordsOf 人工清空某用户的访问数据,主要针对某些特定客户的个性化需求,比如某个客户要求临时允许其访问更多的页面,
此时,调用出函数,清空其历史访问数据,间接实现这个目的,例:ManualEmptyVisitorRecordsOf("andyyu")
*/
func (r *Rule) ManualEmptyVisitorRecordsOf(key interface{}) error {
if len(r.rules) == 0 {
return errors.New("rule is emptyplease add rule by AddRule")
}
for i := range r.rules {
r.rules[i].manualEmptyVisitorRecordsOf(key)
}
return nil
}

115
ratelimit/rule_test.go Normal file
View File

@@ -0,0 +1,115 @@
package ratelimit
import (
"fmt"
"github.com/yudeguang/ratelimit"
"log"
"strconv"
"sync"
"testing"
"time"
)
func Test1(t *testing.T) {
var Visits int //因并发问题num比实际数量稍小
fmt.Println("\r\n测试1,性能测试预计耗时1分钟请耐心等待:")
//步骤一:初始化
r := NewRule()
//步骤二:增加一条或者多条规则组成复合规则,规则必须至少包含一条规则
//此处对于性能测试,为方便准确计数,只需要添加一条规则
r.AddRule(time.Second*10, 1000) //每10秒只允许访问1000次
/*
r.AddRule(time.Second*10, 10) //每10秒只允许访问10次
r.AddRule(time.Minute*30, 1000) //每30分钟只允许访问1000次
r.AddRule(time.Hour*24, 5000) //每天只允许访问500次
*/
//步骤三(可选):从本地磁盘加载历史访问数据
//r.LoadingAndAutoSaveToDisc("test1", time.Second*10) //设置10秒备份一次(不填写则默认60秒备份一次)备份到程序当前文件夹下文件名为test1.ratelimit
log.Println("性能测试正式开始")
//步骤四:调用函数判断某用户是否允许访问
/*
allow:= r.AllowVisit(user)
*/
//构建若干个用户,模拟用户访问
var users = make(map[string]bool)
for i := 1; i < 1000; i++ {
users["user_"+strconv.Itoa(i)] = true
}
begin := time.Now()
//模拟多个协程访问
chanNum := 200
var wg sync.WaitGroup
wg.Add(chanNum)
for i := 0; i < chanNum; i++ {
go func(i int, wg *sync.WaitGroup) {
for ii := 0; ii < 5000; ii++ {
for user := range users {
for {
Visits++
if !r.AllowVisit(user) {
break
}
}
}
}
wg.Done()
}(i, &wg)
}
//所有线程结束,完工
wg.Wait()
t1 := int(time.Now().Sub(begin).Seconds())
log.Println("性能测试完成:共计访问", Visits, "次,", "耗时", t, "秒,即每秒约完成", Visits/t1, "次操作")
//步骤五(可选):程序退出前主动手动存盘
//err := r.SaveToDiscOnce() //在自动备份的同时,还支持手动备份,一般在程序要退出时调用此函数
//if err == nil {
// log.Println("完成手动数据备份")
//} else {
// log.Println(err)
//}
}
func Test2(t *testing.T) {
fmt.Println("\r\n测试2模拟用户访问并打印:")
//步骤一:初始化
r := ratelimit.NewRule()
//步骤二:增加一条或者多条规则组成复合规则,规则必须至少包含一条规则
r.AddRule(time.Second*10, 5) //每10秒只允许访问5次
r.AddRule(time.Minute*30, 50) //每30分钟只允许访问50次
r.AddRule(time.Hour*24, 500) //每天只允许访问500次
//步骤三:调用函数判断某用户是否允许访问
/*
allow:= r.AllowVisit(user)
*/
//构建若干个用户,模拟用户访问
users := []string{"andyyu", "tony", "chery"}
for _, user := range users {
fmt.Println("\r\n开始模拟以下用户访问:", user)
for {
if r.AllowVisit(user) {
log.Println(user, "访问1次,剩余:", r.RemainingVisits(user))
} else {
log.Println(user, "访问过多,稍后再试")
break
}
time.Sleep(time.Second * 1)
}
}
//打印所有用户访问数据情况
fmt.Println("开始打印所有用户在相关时间段内详细的剩余访问次数情况:")
for _, user := range users {
fmt.Println(user)
fmt.Println(" 概述:", r.RemainingVisits(user))
fmt.Println(" 具体:")
r.PrintRemainingVisits(user)
fmt.Println("")
}
/*
在实际的平台运行过程中,往往会因为各种原因,某个客户的访问量过大,被系统临时禁止访问,这时候
这个客户就可能会投诉之类的,根据运营的实际需要,就需要手动清除掉某用户的访问记录,让其可以再继续访问。
对于函数ManualEmptyVisitorRecordsOf(),一般需要自行通过合理的方式,比如自行封装一个HTTP服务来间接调用
*/
log.Println("开始测试手动清楚某用户访问记录.")
log.Println("chery清空访问记录前,剩余:", r.RemainingVisits("chery"))
r.ManualEmptyVisitorRecordsOf("chery")
log.Println("chery清空访问记录后,剩余:", r.RemainingVisits("chery"))
}

189
ratelimit/single_rule.go Normal file
View File

@@ -0,0 +1,189 @@
package ratelimit
import (
"sync"
"time"
)
// singleRule 单组用户访问控制策略
type singleRule struct {
defaultExpiration time.Duration // 表示计时周期,每条访问记录需要保存的时长,超过这个时长的数据记录将会被清除
numberOfAllowedAccesses int // 在计时周期内最多允许访问的次数
estimatedNumberOfOnlineUsers int // 在计时周期内预计有多少个用户会访问网站,建议选用一个稍大于实际值的值,以减少内存分配次数
cleanupInterval time.Duration // 默认多长时间需要执行一次清除过期数据操作
visitorRecords []*autoGrowCircleQueueInt64 // 用于存储用户的每一条访问记录
usedVisitorRecordsIndex sync.Map // 存储visitorRecords中已使用的数据索引,key代表用户名或IP,为文本或数字类型,value代表visitorRecords中的下标位置
notUsedVisitorRecordsIndex map[int]struct{} // 对应visitorRecords中未使用的数据的下标位置其自身非并发安全其并发安全由locker实现,因sync.Map计算长度不优
lockerForKeyIndex *sync.RWMutex // 只用于分配用户KEY即只需保证用户KEY正确的分配在usedVisitorRecordsIndex与notUsedVisitorRecordsIndex
}
/*
newSingleRule 初始化一个条单组用户访问控制控制策略,例:
vc := newSingleRule(time.Minute*30, 50) 或者 vc := newSingleRule(time.Minute*30, 50, 1000)
它表示:
在30分钟内每个用户最多允许访问50次,并且我们预计在这30分钟内大致有1000个用户会访问我们的网站
1000为可选字段此参数可默认不填写主要是用于提升性能类似于声明切片时的cap,绝大部分情况下无需关注此参数。
*/
func newSingleRule(defaultExpiration time.Duration, numberOfAllowedAccesses int, estimatedNumberOfOnlineUserNum ...int) *singleRule {
// 规范化numberOfAllowedAccesses
// 若参数numberOfAllowedAccesses设置是否合理在此被强行修改为1
if numberOfAllowedAccesses <= 0 {
numberOfAllowedAccesses = 1
}
// 规范化estimatedNumberOfOnlineUsers
// estimatedNumberOfOnlineUsers没填写,或者是乱填写的,就默认用numberOfAllowedAccesses
estimatedNumberOfOnlineUsers := 0
if len(estimatedNumberOfOnlineUserNum) > 0 {
estimatedNumberOfOnlineUsers = estimatedNumberOfOnlineUserNum[0]
}
if estimatedNumberOfOnlineUsers <= 0 {
estimatedNumberOfOnlineUsers = numberOfAllowedAccesses
// 普遍而言某一段时间内在线用户数达到1000已经较大所以除非用户指定estimatedNumberOfOnlineUserNum否则最大值定义为1000
// 在线用户数是指在某一段时间内访问过的唯一用户总数
if estimatedNumberOfOnlineUsers > 1000 {
estimatedNumberOfOnlineUsers = 1000
}
}
// 规范化defaultExpiration
cleanupInterval := defaultExpiration / 100
//强行修正清除过期数据的最长时间间隔与最短时间间隔
if cleanupInterval < time.Second*1 {
cleanupInterval = time.Second * 1
}
if cleanupInterval > time.Second*60 {
cleanupInterval = time.Second * 60
}
vc := createSingleRule(defaultExpiration, cleanupInterval, numberOfAllowedAccesses, estimatedNumberOfOnlineUsers)
// 定期清除过期数据,并定期清理内存
go vc.deleteExpired()
return vc
}
func createSingleRule(defaultExpiration, cleanupInterval time.Duration, numberOfAllowedAccesses, estimatedNumberOfOnlineUsers int) *singleRule {
var vc singleRule
vc.defaultExpiration = defaultExpiration
vc.cleanupInterval = cleanupInterval
vc.numberOfAllowedAccesses = numberOfAllowedAccesses
vc.estimatedNumberOfOnlineUsers = estimatedNumberOfOnlineUsers
vc.notUsedVisitorRecordsIndex = make(map[int]struct{})
vc.lockerForKeyIndex = new(sync.RWMutex)
// 根据在线用户数量初始化用户访问记录数据
vc.visitorRecords = make([]*autoGrowCircleQueueInt64, vc.estimatedNumberOfOnlineUsers)
for i := range vc.visitorRecords {
vc.visitorRecords[i] = newAutoGrowCircleQueueInt64(vc.numberOfAllowedAccesses)
// 刚刚开始时,所有数据都未使用,放入未使用索引中
vc.notUsedVisitorRecordsIndex[i] = struct{}{}
}
return &vc
}
// getIndexFrom 根据用户key返回其数据在visitorRecords中的下标
func (s *singleRule) getIndexFrom(key interface{}) int {
// 大部分情况下是读,只有少部分情况下是写
// 只需要用到读锁
s.lockerForKeyIndex.RLock()
// 现有访问记录中有,则直接返回
if index, exist := s.usedVisitorRecordsIndex.Load(key); exist {
s.lockerForKeyIndex.RUnlock()
return index.(int)
}
s.lockerForKeyIndex.RUnlock()
// 以下需要用到互斥锁
s.lockerForKeyIndex.Lock()
defer s.lockerForKeyIndex.Unlock()
// visitorRecords有闲置空间则从闲置空间中获取一条来返回
if len(s.notUsedVisitorRecordsIndex) > 0 {
for index := range s.notUsedVisitorRecordsIndex {
delete(s.notUsedVisitorRecordsIndex, index)
s.usedVisitorRecordsIndex.Store(key, index)
s.visitorRecords[index].key = key
return index
}
}
// visitorRecords没有闲置空间时则需要插入一条新数据到visitorRecords中
queue := newAutoGrowCircleQueueInt64(s.numberOfAllowedAccesses)
queue.key = key
s.visitorRecords = append(s.visitorRecords, queue)
index := len(s.visitorRecords) - 1 // 最后一条的位置即为新的索引位置
s.usedVisitorRecordsIndex.Store(key, index)
return index
}
// updateIndexOf 经过一段时间无访问数据时从usedVisitorRecordsIndex中删除用户Key
func (s *singleRule) updateIndexOf(key interface{}) {
s.lockerForKeyIndex.Lock()
defer s.lockerForKeyIndex.Unlock()
if index, exist := s.usedVisitorRecordsIndex.Load(key); exist {
s.usedVisitorRecordsIndex.Delete(key) // 删除完过期数据之后,如果该用户的所有访问记录均过期了,那么就删除该用户
s.notUsedVisitorRecordsIndex[index.(int)] = struct{}{} // 并把该空间返还给notUsedVisitorRecordsIndex以便下次重复使用
}
}
// allowVisit 是否允许访问,允许访问则往访问记录中加入一条访问记录
func (s *singleRule) allowVisit(key interface{}) bool {
return s.add(key) == nil
}
// remainingVisits 剩余访问次数
func (s *singleRule) remainingVisits(key interface{}) int {
index := s.getIndexFrom(key)
return s.visitorRecords[index].unUsedSize()
}
// add 增加一条访问记录
func (s *singleRule) add(key interface{}) (err error) {
index := s.getIndexFrom(key)
s.visitorRecords[index].deleteExpired(key)
return s.visitorRecords[index].pushWithConcurrencysafety(s.defaultExpiration)
}
// addFromBackUpFile 增加一条访问记录,从备份文件中增加,从备份文件中过来的数据不可信,有可能被不小心修改过,需要做校检
func (s *singleRule) addFromBackUpFile(key interface{}, reordFromBackUpFile int64) (err error) {
index := s.getIndexFrom(key)
s.visitorRecords[index].deleteExpired(key)
return s.visitorRecords[index].push(reordFromBackUpFile)
}
// manualEmptyVisitorRecordsOf 清除访问记录
func (s *singleRule) manualEmptyVisitorRecordsOf(key interface{}) {
index := s.getIndexFrom(key)
for {
_, err := s.visitorRecords[index].pop()
if err != nil {
break
}
}
}
// deleteExpired 删除过期数据
func (s *singleRule) deleteExpired() {
finished := true
for range time.Tick(s.cleanupInterval) {
// 如果数据量较大,那么在一个清除周期内不一定会把所有数据全部清除,所以要判断上一轮次的清除是否完成
if finished {
finished = false
s.deleteExpiredOnce()
finished = true
}
}
}
// deleteExpiredOnce 在特定时间间隔内执行一次删除过期数据操作
func (s *singleRule) deleteExpiredOnce() {
s.usedVisitorRecordsIndex.Range(func(key, indexVal interface{}) bool {
index := s.getIndexFrom(key)
s.visitorRecords[index].deleteExpired(key)
if s.visitorRecords[index].usedSize() == 0 {
// 返回数据前,检察空间大小,太大的话,需要清理空间,把空间缩小到默认大小
s.visitorRecords[index].reSet()
s.updateIndexOf(key)
}
return true
})
}