diff --git a/README.md b/README.md index f91d998..8ce31b5 100644 --- a/README.md +++ b/README.md @@ -58,16 +58,17 @@ _✨ 搭建专属于你的消息推送服务,支持多种消息推送方式, + Telegram 机器人, + Discord 群机器人, + 群组消息,可以将多个推送通道组合成一个群组,然后向群组发送消息,可以实现一次性推送到多个渠道的功能。 -2. 多种用户登录注册方式: +2. 支持在 Web 端管理发送的消息,支持自动刷新。 +3. 支持异步消息。 +4. 多种用户登录注册方式: + 邮箱登录注册以及通过邮箱进行密码重置。 + [GitHub 开放授权](https://github.com/settings/applications/new)。 + 微信公众号授权(需要额外部署 [WeChat Server](https://github.com/songquanpeng/wechat-server))。 -3. 支持 Markdown。 -4. 支持用户管理。 -5. Cloudflare Turnstile 用户校验。 -6. 支持在线发布公告,设置关于界面以及页脚。 -7. 支持在 Web 端管理发送的消息,支持自动刷新。 -8. API 兼容其他消息推送服务,例如 [Server 酱](https://sct.ftqq.com/)。 +5. 支持 Markdown。 +6. 支持用户管理。 +7. Cloudflare Turnstile 用户校验。 +8. 支持在线发布公告,设置关于界面以及页脚。 +9. API 兼容其他消息推送服务,例如 [Server 酱](https://sct.ftqq.com/)。 ## 用途 1. [整合进自己的博客系统,每当有人登录时发微信消息提醒](https://github.com/songquanpeng/blog/blob/486d63e96ef7906a6c767653a20ec2d3278e9a4a/routes/user.js#L27)。 @@ -180,6 +181,7 @@ proxy_send_timeout 300s; 7. `to`:选填,推送给指定用户,如果不填则默认推送给自己,受限于具体的消息推送方式,有些推送方式不支持此项。 1. `@all`:推送给所有用户。 2. `user1|user2|user3`:推送给多个用户,用户之间使用 `|` 分隔。 + 8. `async`:选填,如果设置为 `true` 则消息推送将在后台异步进行,返回结果包含 `uuid` 字段,可用于后续[获取消息发送状态](./docs/API.md#通过消息 UUID 获取消息发送状态)。 3. `POST` 请求方式:字段与上面 `GET` 请求方式保持一致。 + 注意:请求体编码格式为 `application/json`,`v0.3.2` 版本起支持 Post Form。 diff --git a/channel/message-queue.go b/channel/message-queue.go new file mode 100644 index 0000000..b738d39 --- /dev/null +++ b/channel/message-queue.go @@ -0,0 +1,63 @@ +package channel + +import ( + "message-pusher/common" + "message-pusher/model" +) + +var AsyncMessageQueue chan int +var AsyncMessageQueueSize = 128 +var AsyncMessageSenderNum = 2 + +func init() { + AsyncMessageQueue = make(chan int, AsyncMessageQueueSize) + for i := 0; i < AsyncMessageSenderNum; i++ { + go asyncMessageSender() + } +} + +// LoadAsyncMessages loads async pending messages from database. +// We have to wait the database connection is ready. +func LoadAsyncMessages() { + ids, err := model.GetAsyncPendingMessageIds() + if err != nil { + common.FatalLog("failed to load async pending messages: " + err.Error()) + } + for _, id := range ids { + AsyncMessageQueue <- id + } +} + +func asyncMessageSenderHelper(message *model.Message) error { + user, err := model.GetUserById(message.UserId, false) + if err != nil { + return err + } + channel_, err := model.GetChannelByName(message.Channel, user.Id) + if err != nil { + return err + } + return SendMessage(message, user, channel_) +} + +func asyncMessageSender() { + for { + id := <-AsyncMessageQueue + message, err := model.GetMessageById(id) + if err != nil { + common.SysError("async message sender error: " + err.Error()) + continue + } + err = asyncMessageSenderHelper(message) + status := common.MessageSendStatusFailed + if err != nil { + common.SysError("async message sender error: " + err.Error()) + } else { + status = common.MessageSendStatusSent + } + err = message.UpdateStatus(status) + if err != nil { + common.SysError("async message sender error: " + err.Error()) + } + } +} diff --git a/common/constants.go b/common/constants.go index cc8fd5f..01a00ee 100644 --- a/common/constants.go +++ b/common/constants.go @@ -101,10 +101,11 @@ const ( ) const ( - MessageSendStatusUnknown = 0 - MessageSendStatusPending = 1 - MessageSendStatusSent = 2 - MessageSendStatusFailed = 3 + MessageSendStatusUnknown = 0 + MessageSendStatusPending = 1 + MessageSendStatusSent = 2 + MessageSendStatusFailed = 3 + MessageSendStatusAsyncPending = 4 ) const ( diff --git a/controller/message.go b/controller/message.go index 19a638b..c22c7eb 100644 --- a/controller/message.go +++ b/controller/message.go @@ -38,6 +38,7 @@ func GetPushMessage(c *gin.Context) { Desp: c.Query("desp"), Short: c.Query("short"), OpenId: c.Query("openid"), + Async: c.Query("async") == "true", } keepCompatible(&message) pushMessageHelper(c, &message) @@ -55,6 +56,7 @@ func PostPushMessage(c *gin.Context) { Desp: c.PostForm("desp"), Short: c.PostForm("short"), OpenId: c.PostForm("openid"), + Async: c.PostForm("async") == "true", } if message == (model.Message{}) { // Looks like the user is using JSON @@ -142,6 +144,7 @@ func pushMessageHelper(c *gin.Context, message *model.Message) { c.JSON(http.StatusOK, gin.H{ "success": true, "message": "", + "uuid": message.Link, }) } @@ -149,6 +152,7 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode if channel_.Status != common.ChannelStatusEnabled { return errors.New("该渠道已被禁用") } + common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value message.Link = common.GetUUID() if message.URL == "" { message.URL = fmt.Sprintf("%s/message/%s", common.ServerAddress, message.Link) @@ -158,25 +162,36 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode defer func() { // Update the status of the message status := common.MessageSendStatusFailed - if success { - status = common.MessageSendStatusSent + if message.Async { + status = common.MessageSendStatusAsyncPending + } else { + if success { + status = common.MessageSendStatusSent + } } err := message.UpdateStatus(status) if err != nil { common.SysError("failed to update the status of the message: " + err.Error()) } + if message.Async { + channel.AsyncMessageQueue <- message.Id + } }() err := message.UpdateAndInsert(user.Id) if err != nil { return err } } else { + if message.Async { + return errors.New("异步发送消息需要用户具备消息持久化的权限") + } message.Link = "unsaved" // This is for user to identify whether the message is saved } - err := channel.SendMessage(message, user, channel_) - common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value - if err != nil { - return err + if !message.Async { + err := channel.SendMessage(message, user, channel_) + if err != nil { + return err + } } success = true return nil // After this line, the message status will be updated @@ -258,7 +273,7 @@ func GetUserMessages(c *gin.Context) { func GetMessage(c *gin.Context) { messageId, _ := strconv.Atoi(c.Param("id")) userId := c.GetInt("id") - message, err := model.GetMessageById(messageId, userId) + message, err := model.GetMessageByIds(messageId, userId) if err != nil { c.JSON(http.StatusOK, gin.H{ "success": false, @@ -274,6 +289,24 @@ func GetMessage(c *gin.Context) { return } +func GetMessageStatus(c *gin.Context) { + link := c.Param("link") + status, err := model.GetMessageStatusByLink(link) + if err != nil { + c.JSON(http.StatusOK, gin.H{ + "success": false, + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "success": true, + "message": "", + "status": status, + }) + return +} + func SearchMessages(c *gin.Context) { keyword := c.Query("keyword") messages, err := model.SearchMessages(keyword) @@ -296,7 +329,7 @@ func ResendMessage(c *gin.Context) { messageId, _ := strconv.Atoi(c.Param("id")) userId := c.GetInt("id") helper := func() error { - message, err := model.GetMessageById(messageId, userId) + message, err := model.GetMessageByIds(messageId, userId) message.Id = 0 if err != nil { return err diff --git a/docs/API.md b/docs/API.md index 3d73e64..b3ff8ce 100644 --- a/docs/API.md +++ b/docs/API.md @@ -33,4 +33,29 @@ 1. 官方 WebSocket 桌面客户端实现:https://github.com/songquanpeng/personal-assistant 2. 待补充 -欢迎在此提交你的客户端实现。 \ No newline at end of file +欢迎在此提交你的客户端实现。 + + +## 通过消息 UUID 获取消息发送状态 +1. API 端点为:`https://:/api/message/status/` +2. 由于使用的是消息的 UUID 而非 ID,因此此处不需要鉴权, +3. 返回内容示例: + ```json + { + "success": true, + "message": "", + "status": 2 + } + ``` +4. 返回内容字段: + 1. `success`:本次请求是否成功 + 2. `message`:错误信息 + 3. `status`:消息状态码。 +5. 消息状态码定义如下: + ``` + MessageSendStatusUnknown = 0 + MessageSendStatusPending = 1 + MessageSendStatusSent = 2 + MessageSendStatusFailed = 3 + MessageSendStatusAsyncPending = 4 + ``` \ No newline at end of file diff --git a/main.go b/main.go index 7907ac6..53a9144 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,7 @@ func main() { if err != nil { common.FatalLog(err) } + go channel.LoadAsyncMessages() defer func() { err := model.CloseDB() if err != nil { diff --git a/model/message.go b/model/message.go index c7cf9b9..a9d84a7 100644 --- a/model/message.go +++ b/model/message.go @@ -18,14 +18,15 @@ type Message struct { HTMLContent string `json:"html_content" gorm:"-:all"` Timestamp int64 `json:"timestamp" gorm:"type:bigint"` Link string `json:"link" gorm:"unique;index"` - To string `json:"to" gorm:"column:to"` // if specified, will send to this user(s) - Status int `json:"status" gorm:"default:0"` // pending, sent, failed - OpenId string `json:"openid" gorm:"-:all"` // alias for to - Desp string `json:"desp" gorm:"-:all"` // alias for content - Short string `json:"short" gorm:"-:all"` // alias for description + To string `json:"to" gorm:"column:to"` // if specified, will send to this user(s) + Status int `json:"status" gorm:"default:0;index"` // pending, sent, failed + OpenId string `json:"openid" gorm:"-:all"` // alias for to + Desp string `json:"desp" gorm:"-:all"` // alias for content + Short string `json:"short" gorm:"-:all"` // alias for description + Async bool `json:"async" gorm:"-"` // if true, will send message asynchronously } -func GetMessageById(id int, userId int) (*Message, error) { +func GetMessageByIds(id int, userId int) (*Message, error) { if id == 0 || userId == 0 { return nil, errors.New("id 或 userId 为空!") } @@ -34,6 +35,20 @@ func GetMessageById(id int, userId int) (*Message, error) { return &message, err } +func GetMessageById(id int) (*Message, error) { + if id == 0 { + return nil, errors.New("id 为空!") + } + message := Message{Id: id} + err := DB.Where(message).First(&message).Error + return &message, err +} + +func GetAsyncPendingMessageIds() (ids []int, err error) { + err = DB.Model(&Message{}).Where("status = ?", common.MessageSendStatusAsyncPending).Pluck("id", &ids).Error + return ids, err +} + func GetMessageByLink(link string) (*Message, error) { if link == "" { return nil, errors.New("link 为空!") @@ -43,6 +58,15 @@ func GetMessageByLink(link string) (*Message, error) { return &message, err } +func GetMessageStatusByLink(link string) (int, error) { + if link == "" { + return common.MessageSendStatusUnknown, errors.New("link 为空!") + } + message := Message{} + err := DB.Where("link = ?", link).Select("status").First(&message).Error + return message.Status, err +} + func GetMessagesByUserId(userId int, startIdx int, num int) (messages []*Message, err error) { err = DB.Where("user_id = ?", userId).Order("id desc").Limit(num).Offset(startIdx).Find(&messages).Error return messages, err diff --git a/router/api-router.go b/router/api-router.go index 515c1f3..6570741 100644 --- a/router/api-router.go +++ b/router/api-router.go @@ -59,6 +59,7 @@ func SetApiRouter(router *gin.Engine) { { messageRoute.GET("/", middleware.UserAuth(), controller.GetUserMessages) messageRoute.GET("/search", middleware.UserAuth(), controller.SearchMessages) + messageRoute.GET("/status/:link", controller.GetMessageStatus) messageRoute.POST("/resend/:id", middleware.UserAuth(), controller.ResendMessage) messageRoute.GET("/:id", middleware.UserAuth(), controller.GetMessage) messageRoute.DELETE("/", middleware.RootAuth(), controller.DeleteAllMessages) diff --git a/web/src/components/MessagesTable.js b/web/src/components/MessagesTable.js index 7ab584f..8f8dbbd 100644 --- a/web/src/components/MessagesTable.js +++ b/web/src/components/MessagesTable.js @@ -17,7 +17,7 @@ function renderStatus(status) { case 1: return ( ); case 2: @@ -32,6 +32,12 @@ function renderStatus(status) { 发送失败 ); + case 4: + return ( + + ); default: return (