diff --git a/README.md b/README.md index b125bdc..f580805 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ _✨ 搭建专属于你的消息推送服务,支持多种消息推送方式, + 飞书群机器人, + 钉钉群机器人, + Bark App, - + [桌面客户端](https://github.com/songquanpeng/personal-assistant)(WIP) + + WebSocket 客户端([官方客户端](https://github.com/songquanpeng/personal-assistant),[接入文档](./docs/API.md#WebSocket%20客户端)), 2. 多种用户登录注册方式: + 邮箱登录注册以及通过邮箱进行密码重置。 + [GitHub 开放授权](https://github.com/settings/applications/new)。 diff --git a/channel/client.go b/channel/client.go index 97d0aeb..62524eb 100644 --- a/channel/client.go +++ b/channel/client.go @@ -6,62 +6,147 @@ import ( "message-pusher/common" "message-pusher/model" "sync" + "time" ) -var clientConnMap map[int]*websocket.Conn +const ( + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + maxMessageSize = 512 +) + +type webSocketClient struct { + userId int + conn *websocket.Conn + message chan *Message + pong chan bool + stop chan bool + timestamp int64 +} + +func (c *webSocketClient) handleDataReading() { + c.conn.SetReadLimit(maxMessageSize) + _ = c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { + return c.conn.SetReadDeadline(time.Now().Add(pongWait)) + }) + for { + messageType, _, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived, websocket.CloseAbnormalClosure) { + common.SysError("error read WebSocket client: " + err.Error()) + } + c.close() + break + } + switch messageType { + case websocket.PingMessage: + c.pong <- true + case websocket.CloseMessage: + c.close() + break + } + } +} + +func (c *webSocketClient) handleDataWriting() { + pingTicker := time.NewTicker(pingPeriod) + defer func() { + pingTicker.Stop() + clientConnMapMutex.Lock() + client, ok := clientMap[c.userId] + // otherwise we may delete the new added client! + if ok && client.timestamp == c.timestamp { + delete(clientMap, c.userId) + } + clientConnMapMutex.Unlock() + err := c.conn.Close() + if err != nil { + common.SysError("error close WebSocket client: " + err.Error()) + } + }() + for { + select { + case message := <-c.message: + _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteJSON(message) + if err != nil { + common.SysError("error write data to WebSocket client: " + err.Error()) + return + } + case <-c.pong: + err := c.conn.WriteMessage(websocket.PongMessage, nil) + if err != nil { + common.SysError("error send pong to WebSocket client: " + err.Error()) + return + } + case <-pingTicker.C: + _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + err := c.conn.WriteMessage(websocket.PingMessage, nil) + if err != nil { + common.SysError("error write data to WebSocket client: " + err.Error()) + return + } + case <-c.stop: + err := c.conn.WriteMessage(websocket.CloseMessage, nil) + if err != nil { + common.SysError("error write data to WebSocket client: " + err.Error()) + } + return + } + } +} + +func (c *webSocketClient) sendMessage(message *Message) { + c.message <- message +} + +func (c *webSocketClient) close() { + // should only be called once + c.stop <- true + // the defer function in handleDataWriting will do the cleanup +} + +var clientMap map[int]*webSocketClient var clientConnMapMutex sync.Mutex func init() { clientConnMapMutex.Lock() - clientConnMap = make(map[int]*websocket.Conn) - clientConnMapMutex.Unlock() -} - -func SendMessageWithConn(message *Message, conn *websocket.Conn) error { - return conn.WriteJSON(message) -} - -func LogoutClient(userId int) { - clientConnMapMutex.Lock() - delete(clientConnMap, userId) + clientMap = make(map[int]*webSocketClient) clientConnMapMutex.Unlock() } func RegisterClient(userId int, conn *websocket.Conn) { clientConnMapMutex.Lock() - oldConn, existed := clientConnMap[userId] + oldClient, existed := clientMap[userId] clientConnMapMutex.Unlock() if existed { byeMessage := &Message{ Title: common.SystemName, Description: "其他客户端已连接服务器,本客户端已被挤下线!", } - err := SendMessageWithConn(byeMessage, oldConn) - if err != nil { - common.SysError("error send message to client: " + err.Error()) - } - err = oldConn.Close() - if err != nil { - common.SysError("error close WebSocket connection: " + err.Error()) - } + oldClient.sendMessage(byeMessage) + oldClient.close() } helloMessage := &Message{ Title: common.SystemName, Description: "客户端连接成功!", } - err := SendMessageWithConn(helloMessage, conn) - if err != nil { - common.SysError("error send message to client: " + err.Error()) - return - } else { - clientConnMapMutex.Lock() - clientConnMap[userId] = conn - clientConnMapMutex.Unlock() - conn.SetCloseHandler(func(code int, text string) error { - LogoutClient(userId) - return nil - }) + newClient := &webSocketClient{ + userId: userId, + conn: conn, + message: make(chan *Message), + pong: make(chan bool), + stop: make(chan bool), + timestamp: time.Now().UnixMilli(), } + go newClient.handleDataWriting() + go newClient.handleDataReading() + defer newClient.sendMessage(helloMessage) + clientConnMapMutex.Lock() + clientMap[userId] = newClient + clientConnMapMutex.Unlock() } func SendClientMessage(message *Message, user *model.User) error { @@ -69,14 +154,11 @@ func SendClientMessage(message *Message, user *model.User) error { return errors.New("未配置 WebSocket 客户端消息推送方式") } clientConnMapMutex.Lock() - conn, existed := clientConnMap[user.Id] + client, existed := clientMap[user.Id] clientConnMapMutex.Unlock() if !existed { return errors.New("客户端未连接") } - err := SendMessageWithConn(message, conn) - if err != nil { - LogoutClient(user.Id) - } - return err + client.sendMessage(message) + return nil } diff --git a/controller/websocket.go b/controller/websocket.go index 58000f6..4004e07 100644 --- a/controller/websocket.go +++ b/controller/websocket.go @@ -8,7 +8,13 @@ import ( "net/http" ) -var upgrader = websocket.Upgrader{} // use default options +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} func RegisterClient(c *gin.Context) { secret := c.Query("secret") diff --git a/docs/API.md b/docs/API.md new file mode 100644 index 0000000..537ce90 --- /dev/null +++ b/docs/API.md @@ -0,0 +1,36 @@ +# API 文档 + +## WebSocket 客户端 +你可以使用 WebSocket 客户端连接服务器,具体的客户端的类型可以是桌面应用,手机应用或 Web 应用等,只要遵循下述协议即可。 + +目前同一时间一个用户只能有一个客户端连接到服务器,之前已连接的客户端将被断开连接。 + +### 连接协议 +1. API 端点为:`ws://:/api/register_client/?secret=` +2. 如果启用了 HTTPS,则需要将 `ws` 替换为 `wss`。 +3. 上述 `secret` 为用户在后台设置的 `服务器连接密钥`,而非 `推送 token`。 + +### 接收消息 +1. 消息编码格式为 JSON。 +2. 具体内容: + ```json + { + "title": "标题", + "description": "描述", + "content": "内容", + "html_content": "转换为 HTML 后的内容", + "url": "链接" + } + ``` + 可能还有多余字段,忽略即可。 + +### 连接保活 +1. 每 `56s` 服务器将发送 `ping` 报文,客户端需要在 `60s` 回复 `pong` 报文,否则服务端将不再维护该连接。 +2. 服务端会主动回复客户端发来的 `ping` 报文。 + +### 实现列表 +当前可用的 WebSocket 客户端实现有: +1. 官方 WebSocket 桌面客户端实现:https://github.com/songquanpeng/personal-assistant +2. 待补充 + +欢迎在此提交你的客户端实现。 \ No newline at end of file