From 8f7e5456e54e15d5cc2d1bd056e086d1a370bcd2 Mon Sep 17 00:00:00 2001 From: JustSong Date: Fri, 12 May 2023 18:28:54 +0800 Subject: [PATCH] feat: use sse to fetch new messages (close #70) --- controller/message-sse.go | 56 +++++++++++++++++++++++++++++ controller/message.go | 2 ++ main.go | 3 +- middleware/sse.go | 13 +++++++ router/api-router.go | 1 + web/src/components/MessagesTable.js | 33 ++++++++++++----- 6 files changed, 97 insertions(+), 11 deletions(-) create mode 100644 controller/message-sse.go create mode 100644 middleware/sse.go diff --git a/controller/message-sse.go b/controller/message-sse.go new file mode 100644 index 0000000..7dee676 --- /dev/null +++ b/controller/message-sse.go @@ -0,0 +1,56 @@ +package controller + +import ( + "github.com/gin-gonic/gin" + "io" + "message-pusher/model" + "sync" +) + +var messageChanBufferSize = 10 + +var messageChanStore struct { + Map map[int]*chan *model.Message + Mutex sync.RWMutex +} + +func messageChanStoreAdd(messageChan *chan *model.Message, userId int) { + messageChanStore.Mutex.Lock() + defer messageChanStore.Mutex.Unlock() + messageChanStore.Map[userId] = messageChan +} + +func messageChanStoreRemove(userId int) { + messageChanStore.Mutex.Lock() + defer messageChanStore.Mutex.Unlock() + delete(messageChanStore.Map, userId) +} + +func init() { + messageChanStore.Map = make(map[int]*chan *model.Message) +} + +func syncMessageToUser(message *model.Message, userId int) { + messageChanStore.Mutex.RLock() + defer messageChanStore.Mutex.RUnlock() + messageChan, ok := messageChanStore.Map[userId] + if !ok { + return + } + *messageChan <- message +} + +func GetNewMessages(c *gin.Context) { + userId := c.GetInt("id") + messageChan := make(chan *model.Message, messageChanBufferSize) + messageChanStoreAdd(&messageChan, userId) + c.Stream(func(w io.Writer) bool { + if msg, ok := <-messageChan; ok { + c.SSEvent("message", *msg) + return true + } + return false + }) + messageChanStoreRemove(userId) + close(messageChan) +} diff --git a/controller/message.go b/controller/message.go index 89ad9ab..e731fc5 100644 --- a/controller/message.go +++ b/controller/message.go @@ -185,11 +185,13 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode if err != nil { return err } + go syncMessageToUser(message, user.Id) } else { if message.Async { return errors.New("异步发送消息需要用户具备消息持久化的权限") } message.Link = "unsaved" // This is for user to identify whether the message is saved + go syncMessageToUser(message, user.Id) } if !message.Async { err := channel.SendMessage(message, user, channel_) diff --git a/main.go b/main.go index 53a9144..a1bba94 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,6 @@ package main import ( "embed" - "github.com/gin-contrib/gzip" "github.com/gin-contrib/sessions" "github.com/gin-contrib/sessions/cookie" "github.com/gin-contrib/sessions/redis" @@ -56,7 +55,7 @@ func main() { // Initialize HTTP server server := gin.Default() server.SetHTMLTemplate(common.LoadTemplate()) - server.Use(gzip.Gzip(gzip.DefaultCompression)) + //server.Use(gzip.Gzip(gzip.DefaultCompression)) // conflict with sse // Initialize session store if common.RedisEnabled { diff --git a/middleware/sse.go b/middleware/sse.go new file mode 100644 index 0000000..0f2bffb --- /dev/null +++ b/middleware/sse.go @@ -0,0 +1,13 @@ +package middleware + +import "github.com/gin-gonic/gin" + +func SetSSEHeaders() func(c *gin.Context) { + return func(c *gin.Context) { + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("Transfer-Encoding", "chunked") + c.Next() + } +} diff --git a/router/api-router.go b/router/api-router.go index 5786d87..8f7edb9 100644 --- a/router/api-router.go +++ b/router/api-router.go @@ -58,6 +58,7 @@ func SetApiRouter(router *gin.Engine) { messageRoute := apiRouter.Group("/message") { messageRoute.GET("/", middleware.UserAuth(), controller.GetUserMessages) + messageRoute.GET("/stream", middleware.UserAuth(), middleware.SetSSEHeaders(), controller.GetNewMessages) messageRoute.GET("/search", middleware.UserAuth(), controller.SearchMessages) messageRoute.GET("/status/:link", controller.GetMessageStatus) messageRoute.POST("/resend/:id", middleware.UserAuth(), controller.ResendMessage) diff --git a/web/src/components/MessagesTable.js b/web/src/components/MessagesTable.js index 6194a16..ac3de2e 100644 --- a/web/src/components/MessagesTable.js +++ b/web/src/components/MessagesTable.js @@ -1,12 +1,5 @@ import React, { useEffect, useRef, useState } from 'react'; -import { - Button, - Form, - Label, - Modal, - Pagination, - Table, -} from 'semantic-ui-react'; +import { Button, Form, Label, Modal, Pagination, Table } from 'semantic-ui-react'; import { API, openPage, showError, showSuccess, showWarning } from '../helpers'; import { ITEMS_PER_PAGE } from '../constants'; @@ -61,7 +54,7 @@ const MessagesTable = () => { title: '消息标题', description: '消息描述', content: '消息内容', - link: '', + link: '' }); // Message to be viewed const [viewModalOpen, setViewModalOpen] = useState(false); @@ -123,6 +116,17 @@ const MessagesTable = () => { showError(reason); }); checkPermission().then(); + const eventSource = new EventSource('/api/message/stream'); + eventSource.onerror = (e) => { + showError('服务端消息推送流连接出错!'); + }; + eventSource.onmessage = (e) => { + let newMessage = JSON.parse(e.data); + insertNewMessage(newMessage); + }; + return () => { + eventSource.close(); + }; }, []); const viewMessage = async (id) => { @@ -203,6 +207,17 @@ const MessagesTable = () => { setLoading(false); }; + const insertNewMessage = (message) => { + console.log(messages); + setMessages(messages => { + let newMessages = [message]; + newMessages.push(...messages); + return newMessages; + } + ); + setActivePage(1); + }; + const refresh = async () => { await loadMessages(0); setActivePage(1);