From ea67c0336b5fd58a32d65f4ac96fc1ca16d33c7b Mon Sep 17 00:00:00 2001 From: xh <11675084@qq.com> Date: Mon, 24 Nov 2025 02:47:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ws,corn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/vite.config.ts | 6 +-- server/app/controller/ws.go | 11 +++-- server/app/corn/init.go | 35 ++++++++++++++++ server/app/jobs/init.go | 19 --------- server/core/corn.go | 67 +++++++++++++++++++++++++++++++ server/main.go | 2 +- server/util/redis.go | 28 +++++++++++++ server/util/ws_util/client.go | 24 +++++++---- server/util/ws_util/manager.go | 73 +++++++++++++++++++++++++--------- 9 files changed, 210 insertions(+), 55 deletions(-) create mode 100644 server/app/corn/init.go delete mode 100644 server/app/jobs/init.go create mode 100644 server/core/corn.go diff --git a/admin/vite.config.ts b/admin/vite.config.ts index 537f7ea..3e546a4 100644 --- a/admin/vite.config.ts +++ b/admin/vite.config.ts @@ -2,7 +2,7 @@ import { fileURLToPath, URL } from 'url' import { defineConfig, loadEnv } from 'vite' import vue from '@vitejs/plugin-vue' -import vueJsx from '@vitejs/plugin-vue-jsx' +// import vueJsx from '@vitejs/plugin-vue-jsx' // import AutoImport from 'unplugin-auto-import/vite' import Components from 'unplugin-vue-components/vite' @@ -11,7 +11,7 @@ import Components from 'unplugin-vue-components/vite' import { createSvgIconsPlugin } from 'vite-plugin-svg-icons' // import viteCompression from 'vite-plugin-compression' -import { visualizer } from 'rollup-plugin-visualizer' +// import { visualizer } from 'rollup-plugin-visualizer' // https://vitejs.dev/config/ export default ({ mode }) => { @@ -106,7 +106,7 @@ export default ({ mode }) => { server: { open: true, host: '0.0.0.0', - port: 5174, + port: 5180, proxy: { '/api': { target: env.VITE_APP_BASE_URL, diff --git a/server/app/controller/ws.go b/server/app/controller/ws.go index 08fdaba..bbb1096 100644 --- a/server/app/controller/ws.go +++ b/server/app/controller/ws.go @@ -4,6 +4,7 @@ import ( "log" "net/http" "x_admin/core" + "x_admin/util" "x_admin/util/ws_util" "github.com/gin-gonic/gin" @@ -17,10 +18,11 @@ var upgrader = websocket.Upgrader{ } func WsHandler(c *gin.Context) { + uuid := util.ToolsUtil.MakeUuidV7() // 从查询参数获取用户ID和房间ID(实际项目中应通过认证获取) - clientID := c.Query("id") + uid := c.Query("uid") roomID := c.Query("room") - if clientID == "" { + if uid == "" { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{"error": "id is required"}) return } @@ -31,13 +33,10 @@ func WsHandler(c *gin.Context) { return } - client := ws_util.NewClient(clientID, roomID, conn, core.Ws) + client := ws_util.NewClient(uuid, uid, roomID, conn, core.Ws) core.Ws.Register <- client // 启动读写协程 go client.Write() go client.Read() } -func init() { - go core.Ws.Start() -} diff --git a/server/app/corn/init.go b/server/app/corn/init.go new file mode 100644 index 0000000..3a7d572 --- /dev/null +++ b/server/app/corn/init.go @@ -0,0 +1,35 @@ +package corn + +import ( + "x_admin/core" + "x_admin/util" +) + +// func init() { +// c := cron.New(cron.WithSeconds()) +// c.AddFunc("*/5 * * * * *", func() { +// fmt.Println("定时任务:每5秒执行一次") +// core.Ws.SendToRoom("room1", []byte("hello room1")) +// core.Ws.SendToAll([]byte("hello all")) +// }) +// // 启动定时任务 +// c.Start() +// } +func init() { + tm := core.NewCronManager() + tm.Start() + // defer tm.Stop() + // 添加一个每5秒执行的任务 + tm.AddTask("broadcast", "*/5 * * * * *", func() { + // core.Ws.SendToRoom("room1", map[string]any{ + // "message": "Hello Room1!", + // }) + // 存入redis + util.RedisUtil.RPush("onlineCount", []any{core.Ws.GetOnlineCount()}, 10) + + // 广播当前在线用户数 + core.Ws.SendToAll(map[string]any{ + "onlineCount": core.Ws.GetOnlineCount(), + }) + }) +} diff --git a/server/app/jobs/init.go b/server/app/jobs/init.go deleted file mode 100644 index ae1b8fe..0000000 --- a/server/app/jobs/init.go +++ /dev/null @@ -1,19 +0,0 @@ -package jobs - -import ( - "fmt" - "x_admin/core" - - "github.com/robfig/cron/v3" -) - -func init() { - c := cron.New(cron.WithSeconds()) - c.AddFunc("*/5 * * * * *", func() { - fmt.Println("定时任务:每5秒执行一次") - core.Ws.SendToRoom("room1", []byte("hello room1")) - core.Ws.SendToAll([]byte("hello all")) - }) - // 启动定时任务 - c.Start() -} diff --git a/server/core/corn.go b/server/core/corn.go new file mode 100644 index 0000000..524a939 --- /dev/null +++ b/server/core/corn.go @@ -0,0 +1,67 @@ +package core + +import ( + "fmt" + "sync" + + "github.com/robfig/cron/v3" +) + +// CronManager 任务管理器 +type CronManager struct { + cron *cron.Cron + taskIDs map[string]cron.EntryID // 通过字符串ID映射到cron的内部任务ID + mutex sync.RWMutex +} + +// NewTaskManager 创建一个新的任务管理器 +func NewCronManager() *CronManager { + return &CronManager{ + cron: cron.New(cron.WithSeconds()), // 启用秒级别的调度 + taskIDs: make(map[string]cron.EntryID), + } +} + +// AddTask 添加任务 +func (tm *CronManager) AddTask(taskID, spec string, cmd func()) error { + tm.mutex.Lock() + defer tm.mutex.Unlock() + + // 如果任务已存在,先移除 + if id, exists := tm.taskIDs[taskID]; exists { + tm.cron.Remove(id) + } + + // 添加新任务 + id, err := tm.cron.AddFunc(spec, cmd) + if err != nil { + return fmt.Errorf("添加任务失败: %w", err) + } + tm.taskIDs[taskID] = id + fmt.Printf("任务 '%s' 已添加/更新\n", taskID) + return nil +} + +// RemoveTask 删除任务 +func (tm *CronManager) RemoveTask(taskID string) { + tm.mutex.Lock() + defer tm.mutex.Unlock() + + if id, exists := tm.taskIDs[taskID]; exists { + tm.cron.Remove(id) + delete(tm.taskIDs, taskID) + fmt.Printf("任务 '%s' 已移除\n", taskID) + } else { + fmt.Printf("任务 '%s' 不存在\n", taskID) + } +} + +// Start 启动任务调度器 +func (tm *CronManager) Start() { + tm.cron.Start() +} + +// Stop 停止任务调度器 +func (tm *CronManager) Stop() { + tm.cron.Stop() +} diff --git a/server/main.go b/server/main.go index 82ba280..f6efb44 100644 --- a/server/main.go +++ b/server/main.go @@ -13,7 +13,7 @@ import ( "x_admin/middleware" "x_admin/routes" - _ "x_admin/app/jobs" + _ "x_admin/app/corn" _ "x_admin/docs" swaggerfiles "github.com/swaggo/files" diff --git a/server/util/redis.go b/server/util/redis.go index 0dcca95..4bdaac9 100644 --- a/server/util/redis.go +++ b/server/util/redis.go @@ -205,6 +205,34 @@ func (ru redisUtil) Del(keys ...string) bool { return true } +// Push 向列表中添加元素,并保留最新的count个元素 +func (ru redisUtil) RPush(key string, value []any, count int64) bool { + var ctx = context.Background() + pipe := core.Redis.TxPipeline() + pipe.RPush(ctx, config.RedisConfig.RedisPrefix+key, value...) // 推到右侧(尾部) + if count != 0 { + pipe.LTrim(ctx, config.RedisConfig.RedisPrefix+key, -count, -1) // 保留最新的count个元素 + } + + _, err := pipe.Exec(ctx) + + if err != nil { + core.Logger.Errorf("redisUtil.Push err: err=[%+v]", err) + return false + } + return true +} + +// LIndex 获取列表中指定索引的元素 +func (ru redisUtil) LRange(key string, start, stop int64) []string { + res, err := ru.redis.LRange(context.Background(), config.RedisConfig.RedisPrefix+key, start, stop).Result() + if err != nil { + core.Logger.Errorf("redisUtil.LRange err: err=[%+v]", err) + return []string{} + } + return res +} + // toFullKeys 为keys批量增加前缀 func (ru redisUtil) toFullKeys(keys []string) (fullKeys []string) { for _, k := range keys { diff --git a/server/util/ws_util/client.go b/server/util/ws_util/client.go index 1504fcd..5f2b479 100644 --- a/server/util/ws_util/client.go +++ b/server/util/ws_util/client.go @@ -15,27 +15,30 @@ const ( ) type Client struct { - ID string + UUID string // 用于单推 + Uid string // 用于单推 RoomID string // 用于群推 conn *websocket.Conn send chan []byte - manager *Manager //持有对 Manager 的引用:显式的依赖注入,结构清晰、可测试、可扩展 + Manager *Manager //持有对 Manager 的引用:显式的依赖注入,结构清晰、可测试、可扩展 } -func NewClient(id, roomID string, conn *websocket.Conn, manager *Manager) *Client { +func NewClient(uuid, Uid, roomID string, conn *websocket.Conn, manager *Manager) *Client { return &Client{ - ID: id, + UUID: uuid, + Uid: Uid, RoomID: roomID, conn: conn, send: make(chan []byte, 256), - manager: manager, + Manager: manager, } } // 读取消息(通常用于接收客户端消息,此处简化) func (c *Client) Read() { defer func() { - c.manager.UnRegister <- c + // 从 Manager 中注销客户端 + c.Manager.UnRegister <- c c.conn.Close() }() @@ -50,7 +53,12 @@ func (c *Client) Read() { for { _, _, err := c.conn.ReadMessage() if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + // 处理浏览器主动关闭的情况 + if websocket.IsCloseError(err, + websocket.CloseGoingAway, // 客户端正在关闭连接 + websocket.CloseAbnormalClosure) { // 异常关闭 + log.Printf("WebSocket client %s closed: %v", c.UUID, err) + } else { log.Printf("WebSocket read error: %v", err) } break @@ -108,6 +116,6 @@ func (c *Client) Write() { // 主动关闭连接 func (c *Client) Close() { - c.manager.UnRegister <- c + c.Manager.UnRegister <- c c.conn.Close() } diff --git a/server/util/ws_util/manager.go b/server/util/ws_util/manager.go index 5ca3545..24fdf6e 100644 --- a/server/util/ws_util/manager.go +++ b/server/util/ws_util/manager.go @@ -1,11 +1,13 @@ package ws_util import ( + "encoding/json" "sync" ) type Manager struct { clients map[string]*Client // 所有客户端:key=clientID + uid_uuids map[string][]string // uid -> [uuid1, uuid2, ...] rooms map[string]map[string]bool // 群组:roomID -> {clientID: true} Register chan *Client UnRegister chan *Client @@ -13,12 +15,16 @@ type Manager struct { } func NewManager() *Manager { - return &Manager{ + var manager = Manager{ clients: make(map[string]*Client), + uid_uuids: make(map[string][]string), rooms: make(map[string]map[string]bool), Register: make(chan *Client), UnRegister: make(chan *Client), } + go manager.Start() + + return &manager } // 启动管理器协程 @@ -27,27 +33,40 @@ func (m *Manager) Start() { select { case client := <-m.Register: m.mutex.Lock() - m.clients[client.ID] = client + m.clients[client.UUID] = client + // 加入用户 + if client.Uid != "" { + m.uid_uuids[client.Uid] = append(m.uid_uuids[client.Uid], client.UUID) + } // 加入房间 if client.RoomID != "" { if _, ok := m.rooms[client.RoomID]; !ok { m.rooms[client.RoomID] = make(map[string]bool) } - m.rooms[client.RoomID][client.ID] = true + m.rooms[client.RoomID][client.UUID] = true } m.mutex.Unlock() case client := <-m.UnRegister: m.mutex.Lock() - if _, ok := m.clients[client.ID]; ok { + if _, ok := m.clients[client.UUID]; ok { // 删除客户端 - delete(m.clients, client.ID) + delete(m.clients, client.UUID) + // 删除用户对应的UUID + if client.Uid != "" { + for i, uuid := range m.uid_uuids[client.Uid] { + if uuid == client.UUID { + m.uid_uuids[client.Uid] = append(m.uid_uuids[client.Uid][:i], m.uid_uuids[client.Uid][i+1:]...) + break + } + } + } // 退出房间 if client.RoomID != "" { if room, ok := m.rooms[client.RoomID]; ok { - delete(room, client.ID) + delete(room, client.UUID) if len(room) == 0 { delete(m.rooms, client.RoomID) } @@ -60,23 +79,37 @@ func (m *Manager) Start() { } // 单推:向指定 clientID 发送消息 -func (m *Manager) SendToUser(clientID string, message []byte) { +func (m *Manager) SendToUser(uid string, message any) { + jsonMsg, err := json.Marshal(message) + if err != nil { + return + } + m.mutex.RLock() - client, exists := m.clients[clientID] + clientIDs, exists := m.uid_uuids[uid] m.mutex.RUnlock() - if exists && client != nil { - select { - case client.send <- message: - default: - // 通道满,丢弃或记录日志 - close(client.send) + if exists && clientIDs != nil { + for _, clientID := range clientIDs { + if client, ok := m.clients[clientID]; ok { + select { + case client.send <- jsonMsg: + default: + // 通道满,丢弃或记录日志 + close(client.send) + } + } } } } // 群推:向指定 roomID 的所有成员发送消息 -func (m *Manager) SendToRoom(roomID string, message []byte) { +func (m *Manager) SendToRoom(roomID string, message any) { + jsonMsg, err := json.Marshal(message) + if err != nil { + return + } + m.mutex.RLock() room, exists := m.rooms[roomID] if !exists { @@ -94,7 +127,7 @@ func (m *Manager) SendToRoom(roomID string, message []byte) { for _, client := range clientsToSend { select { - case client.send <- message: + case client.send <- jsonMsg: default: close(client.send) } @@ -102,13 +135,17 @@ func (m *Manager) SendToRoom(roomID string, message []byte) { } // 全推:向所有在线用户发送消息 -func (m *Manager) SendToAll(message []byte) { +func (m *Manager) SendToAll(message any) { + jsonMsg, err := json.Marshal(message) + if err != nil { + return + } m.mutex.RLock() defer m.mutex.RUnlock() for _, client := range m.clients { select { - case client.send <- message: + case client.send <- jsonMsg: default: close(client.send) }