limiter && request queue

This commit is contained in:
ideaa
2025-04-09 12:31:01 +08:00
parent 75d413da42
commit 631fc1b452
3 changed files with 50 additions and 31 deletions

View File

@@ -3,6 +3,7 @@ package ws
import (
"fmt"
"github.com/gobwas/ws"
"golang.org/x/time/rate"
"net"
"net/http"
"strings"
@@ -32,26 +33,29 @@ type Client struct {
ErrorCount int `json:"-"` //错误次数
Closed bool `json:"-"` //是否已经关闭
Limiter *rate.Limiter `json:"-"` //限速器
RequestQueue chan string `json:"-"` //处理队列
HttpRequest *http.Request `json:"-"`
HttpWriter http.ResponseWriter `json:"-"`
User *User `json:"user,omitempty"` //关联用户
Scope string `json:"scope"` //登录jwt scope, 用于判断用户从哪里登录的
AppId string `json:"appId"` //登录应用Id
StoreId uint `json:"storeId"` //店铺ID
MerchantId uint `json:"merchantId"` //商户ID
TenantId uint `json:"tenantId"` //租户ID
Platform string `json:"platform"` //登录平台
GroupId string `json:"groupId"` //用户分组Id
IsLogin bool `json:"isLogin"` //是否已登录
LoginAction string `json:"loginAction"` //登录动作
ForceDialogId string `json:"forceDialogId"` //打开聊天界面的会话ID
IpAddress string `json:"ipAddress"` //IP地址
IpAddressPort string `json:"IpAddressPort"` //IP地址和端口
IpLocation string `json:"ipLocation"` //通过IP转换获得的地理位置
ConnectionTime time.Time `json:"connectionTime"`
LastRequestTime time.Time `json:"lastRequestTime"`
LastHeartbeatTime time.Time `json:"lastHeartbeatTime"`
User *User `json:"user,omitempty"` //关联用户
Scope string `json:"scope"` //登录jwt scope, 用于判断用户从哪里登录的
AppId string `json:"appId"` //登录应用Id
StoreId uint `json:"storeId"` //店铺ID
MerchantId uint `json:"merchantId"` //商户ID
TenantId uint `json:"tenantId"` //租户ID
Platform string `json:"platform"` //登录平台
GroupId string `json:"groupId"` //用户分组Id
IsLogin bool `json:"isLogin"` //是否已登录
LoginAction string `json:"loginAction"` //登录动作
ForceDialogId string `json:"forceDialogId"` //打开聊天界面的会话ID
IpAddress string `json:"ipAddress"` //IP地址
IpAddressPort string `json:"IpAddressPort"` //IP地址和端口
IpLocation string `json:"ipLocation"` //通过IP转换获得的地理位置
ConnectionTime time.Time `json:"connectionTime"` //连接时间
LastRequestTime time.Time `json:"lastRequestTime"` //最后请求时间
LastHeartbeatTime time.Time `json:"lastHeartbeatTime"` //最后发送心跳时间
mu sync.RWMutex
Keys map[string]any
@@ -70,10 +74,10 @@ func (c *Client) Reader() {
return
}
if op == ws.OpText {
if op == ws.OpText && request != nil {
req := string(request)
c.Log("<-", req)
go Dispatcher(c, req)
c.RequestQueue <- req
} else if op == ws.OpPing {
err = wsutil.WriteServerMessage(c.Conn, ws.OpPong, nil)
if err != nil {
@@ -85,6 +89,23 @@ func (c *Client) Reader() {
}
}
// Request 处理请求
func (c *Client) Request() {
for req := range c.RequestQueue {
if !c.Limiter.Allow() {
c.Log("!!", "Too many requests, please retry later")
c.SendActionMsg(&Action{
Action: "sys.rateLimit",
Code: -1003,
Msg: "too many requests, please retry later",
})
continue
}
Dispatcher(c, req)
}
}
// Write 发送
func (c *Client) Write() {
timer := time.NewTicker(5 * time.Second)

View File

@@ -34,19 +34,13 @@ func Dispatcher(c *Client, request string) {
}
}
//请求频率限制5毫秒
if t.Sub(c.LastRequestTime).Microseconds() <= 2 {
c.SendActionMsg(&Action{Action: "sys.requestLimit", Code: -1003, Msg: "requests are too frequent"})
return
} else {
//更新最后请求时间
c.LastRequestTime = t
//更新最后请求时间
c.LastRequestTime = t
//如果心跳时间为0设置为当前时间
//防止在连接瞬间被哨兵扫描而断开
if c.LastHeartbeatTime.IsZero() {
c.LastHeartbeatTime = t
}
//如果心跳时间为0设置为当前时间
//防止在连接瞬间被哨兵扫描而断开
if c.LastHeartbeatTime.IsZero() {
c.LastHeartbeatTime = t
}
handlers := InitManager().Handlers(req.Action)

View File

@@ -2,6 +2,7 @@ package ws
import (
"fmt"
"golang.org/x/time/rate"
"net"
"net/http"
"time"
@@ -43,6 +44,8 @@ func HttpHandler(w http.ResponseWriter, r *http.Request) {
Hub: Hub,
Conn: conn,
Send: make(chan []byte, 32),
RequestQueue: make(chan string, 128),
Limiter: rate.NewLimiter(50, 100),
IpAddress: ipAddr,
IpAddressPort: fmt.Sprintf("%s:%d", ipAddr, addr.Port),
ConnectionTime: time.Now(),
@@ -54,4 +57,5 @@ func HttpHandler(w http.ResponseWriter, r *http.Request) {
go c.Reader()
go c.Write()
go c.Request()
}