优化ws,corn

This commit is contained in:
xh
2025-11-24 02:47:23 +08:00
parent e44c925972
commit ea67c0336b
9 changed files with 210 additions and 55 deletions

View File

@@ -2,7 +2,7 @@ import { fileURLToPath, URL } from 'url'
import { defineConfig, loadEnv } from 'vite'
import vue from '@vitejs/plugin-vue'
import vueJsx from '@vitejs/plugin-vue-jsx'
// import vueJsx from '@vitejs/plugin-vue-jsx'
// import AutoImport from 'unplugin-auto-import/vite'
import Components from 'unplugin-vue-components/vite'
@@ -11,7 +11,7 @@ import Components from 'unplugin-vue-components/vite'
import { createSvgIconsPlugin } from 'vite-plugin-svg-icons'
// import viteCompression from 'vite-plugin-compression'
import { visualizer } from 'rollup-plugin-visualizer'
// import { visualizer } from 'rollup-plugin-visualizer'
// https://vitejs.dev/config/
export default ({ mode }) => {
@@ -106,7 +106,7 @@ export default ({ mode }) => {
server: {
open: true,
host: '0.0.0.0',
port: 5174,
port: 5180,
proxy: {
'/api': {
target: env.VITE_APP_BASE_URL,

View File

@@ -4,6 +4,7 @@ import (
"log"
"net/http"
"x_admin/core"
"x_admin/util"
"x_admin/util/ws_util"
"github.com/gin-gonic/gin"
@@ -17,10 +18,11 @@ var upgrader = websocket.Upgrader{
}
func WsHandler(c *gin.Context) {
uuid := util.ToolsUtil.MakeUuidV7()
// 从查询参数获取用户ID和房间ID实际项目中应通过认证获取
clientID := c.Query("id")
uid := c.Query("uid")
roomID := c.Query("room")
if clientID == "" {
if uid == "" {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "id is required"})
return
}
@@ -31,13 +33,10 @@ func WsHandler(c *gin.Context) {
return
}
client := ws_util.NewClient(clientID, roomID, conn, core.Ws)
client := ws_util.NewClient(uuid, uid, roomID, conn, core.Ws)
core.Ws.Register <- client
// 启动读写协程
go client.Write()
go client.Read()
}
func init() {
go core.Ws.Start()
}

35
server/app/corn/init.go Normal file
View File

@@ -0,0 +1,35 @@
package corn
import (
"x_admin/core"
"x_admin/util"
)
// func init() {
// c := cron.New(cron.WithSeconds())
// c.AddFunc("*/5 * * * * *", func() {
// fmt.Println("定时任务每5秒执行一次")
// core.Ws.SendToRoom("room1", []byte("hello room1"))
// core.Ws.SendToAll([]byte("hello all"))
// })
// // 启动定时任务
// c.Start()
// }
func init() {
tm := core.NewCronManager()
tm.Start()
// defer tm.Stop()
// 添加一个每5秒执行的任务
tm.AddTask("broadcast", "*/5 * * * * *", func() {
// core.Ws.SendToRoom("room1", map[string]any{
// "message": "Hello Room1!",
// })
// 存入redis
util.RedisUtil.RPush("onlineCount", []any{core.Ws.GetOnlineCount()}, 10)
// 广播当前在线用户数
core.Ws.SendToAll(map[string]any{
"onlineCount": core.Ws.GetOnlineCount(),
})
})
}

View File

@@ -1,19 +0,0 @@
package jobs
import (
"fmt"
"x_admin/core"
"github.com/robfig/cron/v3"
)
func init() {
c := cron.New(cron.WithSeconds())
c.AddFunc("*/5 * * * * *", func() {
fmt.Println("定时任务每5秒执行一次")
core.Ws.SendToRoom("room1", []byte("hello room1"))
core.Ws.SendToAll([]byte("hello all"))
})
// 启动定时任务
c.Start()
}

67
server/core/corn.go Normal file
View File

@@ -0,0 +1,67 @@
package core
import (
"fmt"
"sync"
"github.com/robfig/cron/v3"
)
// CronManager 任务管理器
type CronManager struct {
cron *cron.Cron
taskIDs map[string]cron.EntryID // 通过字符串ID映射到cron的内部任务ID
mutex sync.RWMutex
}
// NewTaskManager 创建一个新的任务管理器
func NewCronManager() *CronManager {
return &CronManager{
cron: cron.New(cron.WithSeconds()), // 启用秒级别的调度
taskIDs: make(map[string]cron.EntryID),
}
}
// AddTask 添加任务
func (tm *CronManager) AddTask(taskID, spec string, cmd func()) error {
tm.mutex.Lock()
defer tm.mutex.Unlock()
// 如果任务已存在,先移除
if id, exists := tm.taskIDs[taskID]; exists {
tm.cron.Remove(id)
}
// 添加新任务
id, err := tm.cron.AddFunc(spec, cmd)
if err != nil {
return fmt.Errorf("添加任务失败: %w", err)
}
tm.taskIDs[taskID] = id
fmt.Printf("任务 '%s' 已添加/更新\n", taskID)
return nil
}
// RemoveTask 删除任务
func (tm *CronManager) RemoveTask(taskID string) {
tm.mutex.Lock()
defer tm.mutex.Unlock()
if id, exists := tm.taskIDs[taskID]; exists {
tm.cron.Remove(id)
delete(tm.taskIDs, taskID)
fmt.Printf("任务 '%s' 已移除\n", taskID)
} else {
fmt.Printf("任务 '%s' 不存在\n", taskID)
}
}
// Start 启动任务调度器
func (tm *CronManager) Start() {
tm.cron.Start()
}
// Stop 停止任务调度器
func (tm *CronManager) Stop() {
tm.cron.Stop()
}

View File

@@ -13,7 +13,7 @@ import (
"x_admin/middleware"
"x_admin/routes"
_ "x_admin/app/jobs"
_ "x_admin/app/corn"
_ "x_admin/docs"
swaggerfiles "github.com/swaggo/files"

View File

@@ -205,6 +205,34 @@ func (ru redisUtil) Del(keys ...string) bool {
return true
}
// Push 向列表中添加元素,并保留最新的count个元素
func (ru redisUtil) RPush(key string, value []any, count int64) bool {
var ctx = context.Background()
pipe := core.Redis.TxPipeline()
pipe.RPush(ctx, config.RedisConfig.RedisPrefix+key, value...) // 推到右侧(尾部)
if count != 0 {
pipe.LTrim(ctx, config.RedisConfig.RedisPrefix+key, -count, -1) // 保留最新的count个元素
}
_, err := pipe.Exec(ctx)
if err != nil {
core.Logger.Errorf("redisUtil.Push err: err=[%+v]", err)
return false
}
return true
}
// LIndex 获取列表中指定索引的元素
func (ru redisUtil) LRange(key string, start, stop int64) []string {
res, err := ru.redis.LRange(context.Background(), config.RedisConfig.RedisPrefix+key, start, stop).Result()
if err != nil {
core.Logger.Errorf("redisUtil.LRange err: err=[%+v]", err)
return []string{}
}
return res
}
// toFullKeys 为keys批量增加前缀
func (ru redisUtil) toFullKeys(keys []string) (fullKeys []string) {
for _, k := range keys {

View File

@@ -15,27 +15,30 @@ const (
)
type Client struct {
ID string
UUID string // 用于单推
Uid string // 用于单推
RoomID string // 用于群推
conn *websocket.Conn
send chan []byte
manager *Manager //持有对 Manager 的引用:显式的依赖注入,结构清晰、可测试、可扩展
Manager *Manager //持有对 Manager 的引用:显式的依赖注入,结构清晰、可测试、可扩展
}
func NewClient(id, roomID string, conn *websocket.Conn, manager *Manager) *Client {
func NewClient(uuid, Uid, roomID string, conn *websocket.Conn, manager *Manager) *Client {
return &Client{
ID: id,
UUID: uuid,
Uid: Uid,
RoomID: roomID,
conn: conn,
send: make(chan []byte, 256),
manager: manager,
Manager: manager,
}
}
// 读取消息(通常用于接收客户端消息,此处简化)
func (c *Client) Read() {
defer func() {
c.manager.UnRegister <- c
// 从 Manager 中注销客户端
c.Manager.UnRegister <- c
c.conn.Close()
}()
@@ -50,7 +53,12 @@ func (c *Client) Read() {
for {
_, _, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
// 处理浏览器主动关闭的情况
if websocket.IsCloseError(err,
websocket.CloseGoingAway, // 客户端正在关闭连接
websocket.CloseAbnormalClosure) { // 异常关闭
log.Printf("WebSocket client %s closed: %v", c.UUID, err)
} else {
log.Printf("WebSocket read error: %v", err)
}
break
@@ -108,6 +116,6 @@ func (c *Client) Write() {
// 主动关闭连接
func (c *Client) Close() {
c.manager.UnRegister <- c
c.Manager.UnRegister <- c
c.conn.Close()
}

View File

@@ -1,11 +1,13 @@
package ws_util
import (
"encoding/json"
"sync"
)
type Manager struct {
clients map[string]*Client // 所有客户端key=clientID
uid_uuids map[string][]string // uid -> [uuid1, uuid2, ...]
rooms map[string]map[string]bool // 群组roomID -> {clientID: true}
Register chan *Client
UnRegister chan *Client
@@ -13,12 +15,16 @@ type Manager struct {
}
func NewManager() *Manager {
return &Manager{
var manager = Manager{
clients: make(map[string]*Client),
uid_uuids: make(map[string][]string),
rooms: make(map[string]map[string]bool),
Register: make(chan *Client),
UnRegister: make(chan *Client),
}
go manager.Start()
return &manager
}
// 启动管理器协程
@@ -27,27 +33,40 @@ func (m *Manager) Start() {
select {
case client := <-m.Register:
m.mutex.Lock()
m.clients[client.ID] = client
m.clients[client.UUID] = client
// 加入用户
if client.Uid != "" {
m.uid_uuids[client.Uid] = append(m.uid_uuids[client.Uid], client.UUID)
}
// 加入房间
if client.RoomID != "" {
if _, ok := m.rooms[client.RoomID]; !ok {
m.rooms[client.RoomID] = make(map[string]bool)
}
m.rooms[client.RoomID][client.ID] = true
m.rooms[client.RoomID][client.UUID] = true
}
m.mutex.Unlock()
case client := <-m.UnRegister:
m.mutex.Lock()
if _, ok := m.clients[client.ID]; ok {
if _, ok := m.clients[client.UUID]; ok {
// 删除客户端
delete(m.clients, client.ID)
delete(m.clients, client.UUID)
// 删除用户对应的UUID
if client.Uid != "" {
for i, uuid := range m.uid_uuids[client.Uid] {
if uuid == client.UUID {
m.uid_uuids[client.Uid] = append(m.uid_uuids[client.Uid][:i], m.uid_uuids[client.Uid][i+1:]...)
break
}
}
}
// 退出房间
if client.RoomID != "" {
if room, ok := m.rooms[client.RoomID]; ok {
delete(room, client.ID)
delete(room, client.UUID)
if len(room) == 0 {
delete(m.rooms, client.RoomID)
}
@@ -60,23 +79,37 @@ func (m *Manager) Start() {
}
// 单推:向指定 clientID 发送消息
func (m *Manager) SendToUser(clientID string, message []byte) {
func (m *Manager) SendToUser(uid string, message any) {
jsonMsg, err := json.Marshal(message)
if err != nil {
return
}
m.mutex.RLock()
client, exists := m.clients[clientID]
clientIDs, exists := m.uid_uuids[uid]
m.mutex.RUnlock()
if exists && client != nil {
select {
case client.send <- message:
default:
// 通道满,丢弃或记录日志
close(client.send)
if exists && clientIDs != nil {
for _, clientID := range clientIDs {
if client, ok := m.clients[clientID]; ok {
select {
case client.send <- jsonMsg:
default:
// 通道满,丢弃或记录日志
close(client.send)
}
}
}
}
}
// 群推:向指定 roomID 的所有成员发送消息
func (m *Manager) SendToRoom(roomID string, message []byte) {
func (m *Manager) SendToRoom(roomID string, message any) {
jsonMsg, err := json.Marshal(message)
if err != nil {
return
}
m.mutex.RLock()
room, exists := m.rooms[roomID]
if !exists {
@@ -94,7 +127,7 @@ func (m *Manager) SendToRoom(roomID string, message []byte) {
for _, client := range clientsToSend {
select {
case client.send <- message:
case client.send <- jsonMsg:
default:
close(client.send)
}
@@ -102,13 +135,17 @@ func (m *Manager) SendToRoom(roomID string, message []byte) {
}
// 全推:向所有在线用户发送消息
func (m *Manager) SendToAll(message []byte) {
func (m *Manager) SendToAll(message any) {
jsonMsg, err := json.Marshal(message)
if err != nil {
return
}
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, client := range m.clients {
select {
case client.send <- message:
case client.send <- jsonMsg:
default:
close(client.send)
}