feat: now server part supports multiple channels for the same type (#50)

This commit is contained in:
JustSong
2023-05-05 10:43:56 +08:00
parent b291ed43ca
commit 3a8d625201
21 changed files with 601 additions and 298 deletions

View File

@@ -20,11 +20,8 @@ type barkMessageResponse struct {
Message string `json:"message"`
}
func SendBarkMessage(message *model.Message, user *model.User) error {
if user.BarkServer == "" || user.BarkSecret == "" {
return errors.New("未配置 Bark 消息推送方式")
}
url := fmt.Sprintf("%s/%s", user.BarkServer, user.BarkSecret)
func SendBarkMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
url := fmt.Sprintf("%s/%s", channel_.URL, channel_.Secret)
req := barkMessageRequest{
Title: message.Title,
Body: message.Content,

View File

@@ -2,6 +2,7 @@ package channel
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"message-pusher/common"
"message-pusher/model"
@@ -17,7 +18,7 @@ const (
)
type webSocketClient struct {
userId int
key string
conn *websocket.Conn
message chan *model.Message
pong chan bool
@@ -55,10 +56,10 @@ func (c *webSocketClient) handleDataWriting() {
defer func() {
pingTicker.Stop()
clientConnMapMutex.Lock()
client, ok := clientMap[c.userId]
client, ok := clientMap[c.key]
// otherwise we may delete the new added client!
if ok && client.timestamp == c.timestamp {
delete(clientMap, c.userId)
delete(clientMap, c.key)
}
clientConnMapMutex.Unlock()
err := c.conn.Close()
@@ -108,18 +109,19 @@ func (c *webSocketClient) close() {
// the defer function in handleDataWriting will do the cleanup
}
var clientMap map[int]*webSocketClient
var clientMap map[string]*webSocketClient
var clientConnMapMutex sync.Mutex
func init() {
clientConnMapMutex.Lock()
clientMap = make(map[int]*webSocketClient)
clientMap = make(map[string]*webSocketClient)
clientConnMapMutex.Unlock()
}
func RegisterClient(userId int, conn *websocket.Conn) {
func RegisterClient(channelName string, userId int, conn *websocket.Conn) {
key := fmt.Sprintf("%s:%d", channelName, userId)
clientConnMapMutex.Lock()
oldClient, existed := clientMap[userId]
oldClient, existed := clientMap[key]
clientConnMapMutex.Unlock()
if existed {
byeMessage := &model.Message{
@@ -134,7 +136,7 @@ func RegisterClient(userId int, conn *websocket.Conn) {
Description: "客户端连接成功!",
}
newClient := &webSocketClient{
userId: userId,
key: key,
conn: conn,
message: make(chan *model.Message),
pong: make(chan bool),
@@ -145,16 +147,14 @@ func RegisterClient(userId int, conn *websocket.Conn) {
go newClient.handleDataReading()
defer newClient.sendMessage(helloMessage)
clientConnMapMutex.Lock()
clientMap[userId] = newClient
clientMap[key] = newClient
clientConnMapMutex.Unlock()
}
func SendClientMessage(message *model.Message, user *model.User) error {
if user.ClientSecret == "" {
return errors.New("未配置 WebSocket 客户端消息推送方式")
}
func SendClientMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
key := fmt.Sprintf("%s:%d", channel_.Name, user.Id)
clientConnMapMutex.Lock()
client, existed := clientMap[user.Id]
client, existed := clientMap[key]
clientConnMapMutex.Unlock()
if !existed {
return errors.New("客户端未连接")

View File

@@ -26,11 +26,8 @@ type corpMessageResponse struct {
Message string `json:"errmsg"`
}
func SendCorpMessage(message *model.Message, user *model.User) error {
func SendCorpMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
// https://developer.work.weixin.qq.com/document/path/91770
if user.CorpWebhookURL == "" {
return errors.New("未配置企业微信群机器人消息推送方式")
}
messageRequest := corpMessageRequest{
MessageType: "text",
}
@@ -48,7 +45,7 @@ func SendCorpMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
resp, err := http.Post(fmt.Sprintf("%s", user.CorpWebhookURL), "application/json",
resp, err := http.Post(fmt.Sprintf("%s", channel_.URL), "application/json",
bytes.NewBuffer(jsonData))
if err != nil {
return err

View File

@@ -35,11 +35,8 @@ type dingMessageResponse struct {
Message string `json:"errmsg"`
}
func SendDingMessage(message *model.Message, user *model.User) error {
func SendDingMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
// https://open.dingtalk.com/document/robots/custom-robot-access#title-72m-8ag-pqw
if user.DingWebhookURL == "" {
return errors.New("未配置钉钉群机器人消息推送方式")
}
messageRequest := dingMessageRequest{
MessageType: "text",
}
@@ -60,7 +57,7 @@ func SendDingMessage(message *model.Message, user *model.User) error {
}
timestamp := time.Now().UnixMilli()
sign, err := dingSign(user.DingWebhookSecret, timestamp)
sign, err := dingSign(channel_.Secret, timestamp)
if err != nil {
return err
}
@@ -68,7 +65,7 @@ func SendDingMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
resp, err := http.Post(fmt.Sprintf("%s&timestamp=%d&sign=%s", user.DingWebhookURL, timestamp, sign), "application/json",
resp, err := http.Post(fmt.Sprintf("%s&timestamp=%d&sign=%s", channel_.URL, timestamp, sign), "application/json",
bytes.NewBuffer(jsonData))
if err != nil {
return err

View File

@@ -18,10 +18,7 @@ type discordMessageResponse struct {
Message string `json:"message"`
}
func SendDiscordMessage(message *model.Message, user *model.User) error {
if user.DiscordWebhookURL == "" {
return errors.New("未配置 Discord 群机器人消息推送方式")
}
func SendDiscordMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
if message.Content == "" {
message.Content = message.Description
}
@@ -42,7 +39,7 @@ func SendDiscordMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
resp, err := http.Post(user.DiscordWebhookURL, "application/json", bytes.NewBuffer(jsonData))
resp, err := http.Post(channel_.URL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return err
}

View File

@@ -6,7 +6,7 @@ import (
"message-pusher/model"
)
func SendEmailMessage(message *model.Message, user *model.User) error {
func SendEmailMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
if message.To != "" {
if user.SendEmailToOthers != common.SendEmailToOthersAllowed && user.Role < common.RoleAdminUser {
return errors.New("没有权限发送邮件给其他人,请联系管理员为你添加该权限")

View File

@@ -46,11 +46,8 @@ type larkMessageResponse struct {
Message string `json:"msg"`
}
func SendLarkMessage(message *model.Message, user *model.User) error {
func SendLarkMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
// https://open.feishu.cn/document/ukTMukTMukTM/ucTM5YjL3ETO24yNxkjN#e1cdee9f
if user.LarkWebhookURL == "" {
return errors.New("未配置飞书群机器人消息推送方式")
}
messageRequest := larkMessageRequest{
MessageType: "text",
}
@@ -83,7 +80,7 @@ func SendLarkMessage(message *model.Message, user *model.User) error {
now := time.Now()
timestamp := now.Unix()
sign, err := larkSign(user.LarkWebhookSecret, timestamp)
sign, err := larkSign(channel_.Secret, timestamp)
if err != nil {
return err
}
@@ -93,7 +90,7 @@ func SendLarkMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
resp, err := http.Post(user.LarkWebhookURL, "application/json",
resp, err := http.Post(channel_.URL, "application/json",
bytes.NewBuffer(jsonData))
if err != nil {
return err

View File

@@ -5,45 +5,31 @@ import (
"message-pusher/model"
)
const (
TypeEmail = "email"
TypeWeChatTestAccount = "test"
TypeWeChatCorpAccount = "corp_app"
TypeCorp = "corp"
TypeLark = "lark"
TypeDing = "ding"
TypeTelegram = "telegram"
TypeDiscord = "discord"
TypeBark = "bark"
TypeClient = "client"
TypeNone = "none"
)
func SendMessage(message *model.Message, user *model.User) error {
switch message.Channel {
case TypeEmail:
return SendEmailMessage(message, user)
case TypeWeChatTestAccount:
return SendWeChatTestMessage(message, user)
case TypeWeChatCorpAccount:
return SendWeChatCorpMessage(message, user)
case TypeCorp:
return SendCorpMessage(message, user)
case TypeLark:
return SendLarkMessage(message, user)
case TypeDing:
return SendDingMessage(message, user)
case TypeBark:
return SendBarkMessage(message, user)
case TypeClient:
return SendClientMessage(message, user)
case TypeTelegram:
return SendTelegramMessage(message, user)
case TypeDiscord:
return SendDiscordMessage(message, user)
case TypeNone:
func SendMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
switch channel_.Type {
case model.TypeEmail:
return SendEmailMessage(message, user, channel_)
case model.TypeWeChatTestAccount:
return SendWeChatTestMessage(message, user, channel_)
case model.TypeWeChatCorpAccount:
return SendWeChatCorpMessage(message, user, channel_)
case model.TypeCorp:
return SendCorpMessage(message, user, channel_)
case model.TypeLark:
return SendLarkMessage(message, user, channel_)
case model.TypeDing:
return SendDingMessage(message, user, channel_)
case model.TypeBark:
return SendBarkMessage(message, user, channel_)
case model.TypeClient:
return SendClientMessage(message, user, channel_)
case model.TypeTelegram:
return SendTelegramMessage(message, user, channel_)
case model.TypeDiscord:
return SendDiscordMessage(message, user, channel_)
case model.TypeNone:
return nil
default:
return errors.New("不支持的消息通道:" + message.Channel)
return errors.New("不支持的消息通道:" + channel_.Type)
}
}

View File

@@ -20,13 +20,10 @@ type telegramMessageResponse struct {
Description string `json:"description"`
}
func SendTelegramMessage(message *model.Message, user *model.User) error {
func SendTelegramMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
// https://core.telegram.org/bots/api#sendmessage
if user.TelegramBotToken == "" || user.TelegramChatId == "" {
return errors.New("未配置 Telegram 机器人消息推送方式")
}
messageRequest := telegramMessageRequest{
ChatId: user.TelegramChatId,
ChatId: channel_.AppId,
Text: message.Content,
ParseMode: "markdown",
}
@@ -37,7 +34,7 @@ func SendTelegramMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
resp, err := http.Post(fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", user.TelegramBotToken), "application/json",
resp, err := http.Post(fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", channel_.Secret), "application/json",
bytes.NewBuffer(jsonData))
if err != nil {
return err

View File

@@ -12,6 +12,7 @@ type TokenStoreItem interface {
Token() string
Refresh()
IsFilled() bool
IsShared() bool
}
type tokenStore struct {
@@ -22,34 +23,51 @@ type tokenStore struct {
var s tokenStore
func channel2item(channel_ *model.Channel) TokenStoreItem {
if channel_.Type == model.TypeWeChatTestAccount {
item := &WeChatTestAccountTokenStoreItem{
AppID: channel_.AppId,
AppSecret: channel_.Secret,
}
return item
} else if channel_.Type == model.TypeWeChatCorpAccount {
corpId, agentId, err := parseWechatCorpAccountAppId(channel_.AppId)
if err != nil {
common.SysError(err.Error())
return nil
}
item := &WeChatCorpAccountTokenStoreItem{
CorpId: corpId,
AgentSecret: channel_.Secret,
AgentId: agentId,
}
return item
}
return nil
}
func channels2items(channels []*model.Channel) []TokenStoreItem {
var items []TokenStoreItem
for _, channel_ := range channels {
item := channel2item(channel_)
if item != nil {
items = append(items, item)
}
}
return items
}
func TokenStoreInit() {
s.Map = make(map[string]*TokenStoreItem)
// https://developers.weixin.qq.com/doc/offiaccount/Basic_Information/Get_access_token.html
// https://developer.work.weixin.qq.com/document/path/91039
s.ExpirationSeconds = 2 * 55 * 60 // 2 hours - 5 minutes
go func() {
users, err := model.GetAllUsersWithSecrets()
channels, err := model.GetTokenStoreChannels()
if err != nil {
common.FatalLog(err.Error())
}
var items []TokenStoreItem
for _, user := range users {
if user.WeChatTestAccountId != "" {
item := &WeChatTestAccountTokenStoreItem{
AppID: user.WeChatTestAccountId,
AppSecret: user.WeChatTestAccountSecret,
}
items = append(items, item)
}
if user.WeChatCorpAccountId != "" {
item := &WeChatCorpAccountTokenStoreItem{
CorpId: user.WeChatCorpAccountId,
AgentSecret: user.WeChatCorpAccountAgentSecret,
AgentId: user.WeChatCorpAccountAgentId,
}
items = append(items, item)
}
}
items := channels2items(channels)
s.Mutex.RLock()
for i := range items {
// s.Map[item.Key()] = &item // This is wrong, you are getting the address of a local variable!
@@ -99,78 +117,14 @@ func TokenStoreRemoveItem(item TokenStoreItem) {
}
func TokenStoreAddUser(user *model.User) {
testItem := WeChatTestAccountTokenStoreItem{
AppID: user.WeChatTestAccountId,
AppSecret: user.WeChatTestAccountSecret,
channels, err := model.GetTokenStoreChannelsByUserId(user.Id)
if err != nil {
common.SysError(err.Error())
return
}
TokenStoreAddItem(&testItem)
corpItem := WeChatCorpAccountTokenStoreItem{
CorpId: user.WeChatCorpAccountId,
AgentSecret: user.WeChatCorpAccountAgentSecret,
AgentId: user.WeChatCorpAccountAgentId,
}
TokenStoreAddItem(&corpItem)
}
func TokenStoreUpdateUser(cleanUser *model.User, originUser *model.User) {
// WeChat Test Account
// The fields of cleanUser may be incomplete!
if cleanUser.WeChatTestAccountId == originUser.WeChatTestAccountId {
cleanUser.WeChatTestAccountId = ""
}
if cleanUser.WeChatTestAccountSecret == originUser.WeChatTestAccountSecret {
cleanUser.WeChatTestAccountSecret = ""
}
// This means the user updated those fields.
if cleanUser.WeChatTestAccountId != "" || cleanUser.WeChatTestAccountSecret != "" {
oldWeChatTestAccountTokenStoreItem := WeChatTestAccountTokenStoreItem{
AppID: originUser.WeChatTestAccountId,
AppSecret: originUser.WeChatTestAccountSecret,
}
// Yeah, it's a deep copy.
newWeChatTestAccountTokenStoreItem := oldWeChatTestAccountTokenStoreItem
if cleanUser.WeChatTestAccountId != "" {
newWeChatTestAccountTokenStoreItem.AppID = cleanUser.WeChatTestAccountId
}
if cleanUser.WeChatTestAccountSecret != "" {
newWeChatTestAccountTokenStoreItem.AppSecret = cleanUser.WeChatTestAccountSecret
}
if !oldWeChatTestAccountTokenStoreItem.IsShared() {
TokenStoreRemoveItem(&oldWeChatTestAccountTokenStoreItem)
}
TokenStoreAddItem(&newWeChatTestAccountTokenStoreItem)
}
// WeChat Corp Account
if cleanUser.WeChatCorpAccountId == originUser.WeChatCorpAccountId {
cleanUser.WeChatCorpAccountId = ""
}
if cleanUser.WeChatCorpAccountAgentId == originUser.WeChatCorpAccountAgentId {
cleanUser.WeChatCorpAccountAgentId = ""
}
if cleanUser.WeChatCorpAccountAgentSecret == originUser.WeChatCorpAccountAgentSecret {
cleanUser.WeChatCorpAccountAgentSecret = ""
}
if cleanUser.WeChatCorpAccountId != "" || cleanUser.WeChatCorpAccountAgentId != "" || cleanUser.WeChatCorpAccountAgentSecret != "" {
oldWeChatCorpAccountTokenStoreItem := WeChatCorpAccountTokenStoreItem{
CorpId: originUser.WeChatCorpAccountId,
AgentSecret: originUser.WeChatCorpAccountAgentSecret,
AgentId: originUser.WeChatCorpAccountAgentId,
}
newWeChatCorpAccountTokenStoreItem := oldWeChatCorpAccountTokenStoreItem
if cleanUser.WeChatCorpAccountId != "" {
newWeChatCorpAccountTokenStoreItem.CorpId = cleanUser.WeChatCorpAccountId
}
if cleanUser.WeChatCorpAccountAgentSecret != "" {
newWeChatCorpAccountTokenStoreItem.AgentSecret = cleanUser.WeChatCorpAccountAgentSecret
}
if cleanUser.WeChatCorpAccountAgentId != "" {
newWeChatCorpAccountTokenStoreItem.AgentId = cleanUser.WeChatCorpAccountAgentId
}
if !oldWeChatCorpAccountTokenStoreItem.IsShared() {
TokenStoreRemoveItem(&oldWeChatCorpAccountTokenStoreItem)
}
TokenStoreAddItem(&newWeChatCorpAccountTokenStoreItem)
items := channels2items(channels)
for i := range items {
TokenStoreAddItem(items[i])
}
}
@@ -178,20 +132,109 @@ func TokenStoreUpdateUser(cleanUser *model.User, originUser *model.User) {
// user must be filled.
// It's okay to delete a user that don't have an item here.
func TokenStoreRemoveUser(user *model.User) {
testAccountTokenStoreItem := WeChatTestAccountTokenStoreItem{
AppID: user.WeChatTestAccountId,
AppSecret: user.WeChatTestAccountSecret,
channels, err := model.GetTokenStoreChannelsByUserId(user.Id)
if err != nil {
common.SysError(err.Error())
return
}
if !testAccountTokenStoreItem.IsShared() {
TokenStoreRemoveItem(&testAccountTokenStoreItem)
items := channels2items(channels)
for i := range items {
if items[i].IsShared() {
continue
}
TokenStoreRemoveItem(items[i])
}
corpAccountTokenStoreItem := WeChatCorpAccountTokenStoreItem{
CorpId: user.WeChatCorpAccountId,
AgentSecret: user.WeChatCorpAccountAgentSecret,
AgentId: user.WeChatCorpAccountAgentId,
}
func TokenStoreAddChannel(channel *model.Channel) {
if channel.Type != model.TypeWeChatTestAccount && channel.Type != model.TypeWeChatCorpAccount {
return
}
if !corpAccountTokenStoreItem.IsShared() {
TokenStoreRemoveItem(&corpAccountTokenStoreItem)
item := channel2item(channel)
if item != nil {
TokenStoreAddItem(item)
}
}
func TokenStoreRemoveChannel(channel *model.Channel) {
if channel.Type != model.TypeWeChatTestAccount && channel.Type != model.TypeWeChatCorpAccount {
return
}
item := channel2item(channel)
if item != nil {
TokenStoreRemoveItem(item)
}
}
func TokenStoreUpdateChannel(newChannel *model.Channel, oldChannel *model.Channel) {
if oldChannel.Type != model.TypeWeChatTestAccount && oldChannel.Type != model.TypeWeChatCorpAccount {
return
}
if oldChannel.Type == model.TypeWeChatTestAccount {
// Only keep changed parts
if newChannel.AppId == oldChannel.AppId {
newChannel.AppId = ""
}
if newChannel.Secret == oldChannel.Secret {
newChannel.Secret = ""
}
oldItem := WeChatTestAccountTokenStoreItem{
AppID: oldChannel.AppId,
AppSecret: oldChannel.Secret,
}
// Yeah, it's a deep copy.
newItem := oldItem
// This means the user updated those fields.
if newChannel.AppId != "" {
newItem.AppID = newChannel.AppId
}
if newChannel.Secret != "" {
newItem.AppSecret = newChannel.Secret
}
if !oldItem.IsShared() {
TokenStoreRemoveItem(&oldItem)
}
TokenStoreAddItem(&newItem)
return
}
if oldChannel.Type == model.TypeWeChatCorpAccount {
// Only keep changed parts
if newChannel.AppId == oldChannel.AppId {
newChannel.AppId = ""
}
if newChannel.Secret == oldChannel.Secret {
newChannel.Secret = ""
}
corpId, agentId, err := parseWechatCorpAccountAppId(oldChannel.AppId)
if err != nil {
common.SysError(err.Error())
return
}
oldItem := WeChatCorpAccountTokenStoreItem{
CorpId: corpId,
AgentSecret: oldChannel.Secret,
AgentId: agentId,
}
// Yeah, it's a deep copy.
newItem := oldItem
// This means the user updated those fields.
if newChannel.AppId != "" {
corpId, agentId, err := parseWechatCorpAccountAppId(oldChannel.AppId)
if err != nil {
common.SysError(err.Error())
return
}
newItem.CorpId = corpId
newItem.AgentId = agentId
}
if newChannel.Secret != "" {
newItem.AgentSecret = newChannel.Secret
}
if !oldItem.IsShared() {
TokenStoreRemoveItem(&oldItem)
}
TokenStoreAddItem(&newItem)
return
}
}

View File

@@ -8,6 +8,7 @@ import (
"message-pusher/common"
"message-pusher/model"
"net/http"
"strings"
"time"
)
@@ -30,8 +31,9 @@ func (i *WeChatCorpAccountTokenStoreItem) Key() string {
}
func (i *WeChatCorpAccountTokenStoreItem) IsShared() bool {
return model.DB.Where("wechat_corp_account_id = ? and wechat_corp_account_agent_secret = ? and wechat_corp_account_agent_id = ?",
i.CorpId, i.AgentSecret, i.AgentId).Find(&model.User{}).RowsAffected != 1
appId := fmt.Sprintf("%s|%s", i.CorpId, i.AgentId)
return model.DB.Where("type = ? and app_id = ? and secret = ?",
model.TypeWeChatCorpAccount, appId, i.AgentSecret).Find(&model.Channel{}).RowsAffected != 1
}
func (i *WeChatCorpAccountTokenStoreItem) IsFilled() bool {
@@ -95,14 +97,26 @@ type wechatCorpMessageResponse struct {
ErrorMessage string `json:"errmsg"`
}
func SendWeChatCorpMessage(message *model.Message, user *model.User) error {
if user.WeChatCorpAccountId == "" {
return errors.New("未配置微信企业号消息推送方式")
func parseWechatCorpAccountAppId(appId string) (string, string, error) {
parts := strings.Split(appId, "|")
if len(parts) != 2 {
return "", "", errors.New("无效的微信企业号配置")
}
return parts[0], parts[1], nil
}
func SendWeChatCorpMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
// https://developer.work.weixin.qq.com/document/path/90236
corpId, agentId, err := parseWechatCorpAccountAppId(channel_.AppId)
if err != nil {
return err
}
userId := channel_.AccountId
clientType := channel_.Other
agentSecret := channel_.Secret
messageRequest := wechatCorpMessageRequest{
ToUser: user.WeChatCorpAccountUserId,
AgentId: user.WeChatCorpAccountAgentId,
ToUser: userId,
AgentId: agentId,
}
if message.To != "" {
messageRequest.ToUser = message.To
@@ -118,7 +132,7 @@ func SendWeChatCorpMessage(message *model.Message, user *model.User) error {
messageRequest.TextCard.URL = common.ServerAddress
}
} else {
if user.WeChatCorpAccountClientType == "plugin" {
if clientType == "plugin" {
messageRequest.MessageType = "textcard"
messageRequest.TextCard.Title = message.Title
messageRequest.TextCard.Description = message.Description
@@ -132,7 +146,7 @@ func SendWeChatCorpMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
key := fmt.Sprintf("%s%s%s", user.WeChatCorpAccountId, user.WeChatCorpAccountAgentId, user.WeChatCorpAccountAgentSecret)
key := fmt.Sprintf("%s%s%s", corpId, agentId, agentSecret)
accessToken := TokenStoreGetToken(key)
resp, err := http.Post(fmt.Sprintf("https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s", accessToken), "application/json",
bytes.NewBuffer(jsonData))

View File

@@ -29,8 +29,8 @@ func (i *WeChatTestAccountTokenStoreItem) Key() string {
}
func (i *WeChatTestAccountTokenStoreItem) IsShared() bool {
return model.DB.Where("wechat_test_account_id = ? and wechat_test_account_secret = ?",
i.AppID, i.AppSecret).Find(&model.User{}).RowsAffected != 1
return model.DB.Where("type = ? and app_id = ? and secret = ?",
model.TypeWeChatTestAccount, i.AppID, i.AppSecret).Find(&model.Channel{}).RowsAffected != 1
}
func (i *WeChatTestAccountTokenStoreItem) IsFilled() bool {
@@ -88,13 +88,10 @@ type wechatTestMessageResponse struct {
ErrorMessage string `json:"errmsg"`
}
func SendWeChatTestMessage(message *model.Message, user *model.User) error {
if user.WeChatTestAccountId == "" {
return errors.New("未配置微信测试号消息推送方式")
}
func SendWeChatTestMessage(message *model.Message, user *model.User, channel_ *model.Channel) error {
values := wechatTestMessageRequest{
ToUser: user.WeChatTestAccountOpenId,
TemplateId: user.WeChatTestAccountTemplateId,
ToUser: channel_.AccountId,
TemplateId: channel_.Other,
URL: "",
}
values.Data.Text.Value = message.Description
@@ -103,7 +100,7 @@ func SendWeChatTestMessage(message *model.Message, user *model.User) error {
if err != nil {
return err
}
key := fmt.Sprintf("%s%s", user.WeChatTestAccountId, user.WeChatTestAccountSecret)
key := fmt.Sprintf("%s%s", channel_.AppId, channel_.Secret)
accessToken := TokenStoreGetToken(key)
resp, err := http.Post(fmt.Sprintf("https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=%s", accessToken), "application/json",
bytes.NewBuffer(jsonData))

View File

@@ -106,3 +106,9 @@ const (
MessageSendStatusSent = 2
MessageSendStatusFailed = 3
)
const (
ChannelStatusUnknown = 0
ChannelStatusEnabled = 1
ChannelStatusDisabled = 2
)

183
controller/channel.go Normal file
View File

@@ -0,0 +1,183 @@
package controller
import (
"github.com/gin-gonic/gin"
"message-pusher/channel"
"message-pusher/common"
"message-pusher/model"
"net/http"
"strconv"
)
func GetAllChannels(c *gin.Context) {
userId := c.GetInt("id")
p, _ := strconv.Atoi(c.Query("p"))
if p < 0 {
p = 0
}
channels, err := model.GetChannelsByUserId(userId, p*common.ItemsPerPage, common.ItemsPerPage)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"data": channels,
})
return
}
func SearchChannels(c *gin.Context) {
userId := c.GetInt("id")
keyword := c.Query("keyword")
channels, err := model.SearchChannels(userId, keyword)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"data": channels,
})
return
}
func GetChannel(c *gin.Context) {
id, err := strconv.Atoi(c.Param("id"))
userId := c.GetInt("id")
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
channel_, err := model.GetChannelById(id, userId)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"data": channel_,
})
return
}
func AddChannel(c *gin.Context) {
channel_ := model.Channel{}
err := c.ShouldBindJSON(&channel_)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
if len(channel_.Name) == 0 || len(channel_.Name) > 20 {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "通道名称长度必须在1-20之间",
})
return
}
cleanChannel := model.Channel{
UserId: c.GetInt("id"),
Name: channel_.Name,
}
err = cleanChannel.Insert()
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
channel.TokenStoreAddChannel(&cleanChannel)
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
})
return
}
func DeleteChannel(c *gin.Context) {
id, _ := strconv.Atoi(c.Param("id"))
userId := c.GetInt("id")
channel_, err := model.DeleteChannelById(id, userId)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
channel.TokenStoreRemoveChannel(channel_)
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
})
return
}
func UpdateChannel(c *gin.Context) {
userId := c.GetInt("id")
statusOnly := c.Query("status_only")
channel_ := model.Channel{}
err := c.ShouldBindJSON(&channel_)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
oldChannel, err := model.GetChannelById(channel_.Id, userId)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
cleanChannel := oldChannel
if statusOnly != "" {
cleanChannel.Status = channel_.Status
} else {
// If you add more fields, please also update channel_.Update()
cleanChannel.Type = channel_.Type
cleanChannel.Name = channel_.Name
cleanChannel.Secret = channel_.Secret
cleanChannel.AppId = channel_.AppId
cleanChannel.AccountId = channel_.AccountId
cleanChannel.URL = channel_.URL
cleanChannel.Other = channel_.Other
}
err = cleanChannel.Update()
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
channel.TokenStoreUpdateChannel(cleanChannel, oldChannel)
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"data": cleanChannel,
})
return
}

View File

@@ -65,21 +65,21 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
user := model.User{Username: c.Param("username")}
err := user.FillUserByUsername()
if err != nil {
c.JSON(http.StatusForbidden, gin.H{
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
if user.Status == common.UserStatusNonExisted {
c.JSON(http.StatusForbidden, gin.H{
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "用户不存在",
})
return
}
if user.Status == common.UserStatusDisabled {
c.JSON(http.StatusForbidden, gin.H{
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "用户已被封禁",
})
@@ -89,7 +89,7 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
if message.Token == "" {
message.Token = c.Request.Header.Get("Authorization")
if message.Token == "" {
c.JSON(http.StatusForbidden, gin.H{
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "token 为空",
})
@@ -97,7 +97,7 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
}
}
if user.Token != message.Token {
c.JSON(http.StatusForbidden, gin.H{
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "无效的 token",
})
@@ -110,10 +110,18 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
if message.Channel == "" {
message.Channel = user.Channel
if message.Channel == "" {
message.Channel = channel.TypeEmail
message.Channel = model.TypeEmail
}
}
err = saveAndSendMessage(&user, message)
channel_, err := model.GetChannelByName(message.Channel, user.Id)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "无效的渠道的名称",
})
return
}
err = saveAndSendMessage(&user, message, channel_)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
@@ -127,7 +135,7 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
})
}
func saveAndSendMessage(user *model.User, message *model.Message) error {
func saveAndSendMessage(user *model.User, message *model.Message, channel_ *model.Channel) error {
message.Link = common.GetUUID()
if message.URL == "" {
message.URL = fmt.Sprintf("%s/message/%s", common.ServerAddress, message.Link)
@@ -152,7 +160,7 @@ func saveAndSendMessage(user *model.User, message *model.Message) error {
} else {
message.Link = "unsaved" // This is for user to identify whether the message is saved
}
err := channel.SendMessage(message, user)
err := channel.SendMessage(message, user, channel_)
common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
if err != nil {
return err
@@ -284,7 +292,11 @@ func ResendMessage(c *gin.Context) {
if err != nil {
return err
}
err = saveAndSendMessage(user, message)
channel_, err := model.GetChannelByName(message.Channel, user.Id)
if err != nil {
return err
}
err = saveAndSendMessage(user, message, channel_)
if err != nil {
return err
}

View File

@@ -389,45 +389,15 @@ func UpdateSelf(c *gin.Context) {
})
return
}
originUser, err := model.GetUserById(c.GetInt("id"), true)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
// White list mode. For safe :)
cleanUser := model.User{
Id: c.GetInt("id"),
Username: user.Username,
Password: user.Password,
DisplayName: user.DisplayName,
Token: user.Token,
Channel: user.Channel,
WeChatTestAccountId: user.WeChatTestAccountId,
WeChatTestAccountSecret: user.WeChatTestAccountSecret,
WeChatTestAccountTemplateId: user.WeChatTestAccountTemplateId,
WeChatTestAccountOpenId: user.WeChatTestAccountOpenId,
WeChatTestAccountVerificationToken: user.WeChatTestAccountVerificationToken,
WeChatCorpAccountId: user.WeChatCorpAccountId,
WeChatCorpAccountAgentSecret: user.WeChatCorpAccountAgentSecret,
WeChatCorpAccountAgentId: user.WeChatCorpAccountAgentId,
WeChatCorpAccountUserId: user.WeChatCorpAccountUserId,
WeChatCorpAccountClientType: user.WeChatCorpAccountClientType,
CorpWebhookURL: user.CorpWebhookURL,
LarkWebhookURL: user.LarkWebhookURL,
LarkWebhookSecret: user.LarkWebhookSecret,
DingWebhookURL: user.DingWebhookURL,
DingWebhookSecret: user.DingWebhookSecret,
BarkServer: user.BarkServer,
BarkSecret: user.BarkSecret,
ClientSecret: user.ClientSecret,
TelegramBotToken: user.TelegramBotToken,
TelegramChatId: user.TelegramChatId,
DiscordWebhookURL: user.DiscordWebhookURL,
Id: c.GetInt("id"),
Username: user.Username,
Password: user.Password,
DisplayName: user.DisplayName,
Token: user.Token,
Channel: user.Channel,
}
channel.TokenStoreUpdateUser(&cleanUser, originUser)
if user.Password == "$I_LOVE_U" {
user.Password = "" // rollback to what it should be

View File

@@ -27,10 +27,29 @@ func RegisterClient(c *gin.Context) {
}
user := model.User{Username: c.Param("username")}
err := user.FillUserByUsername()
if secret != user.ClientSecret {
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "用户名与密钥不匹配",
"message": "无效的用户名",
})
return
}
channelName := c.Query("channel")
if channelName == "" {
channelName = "client"
}
channel_, err := model.GetChannelByName(channelName, user.Id)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "无效的通道名称",
})
return
}
if secret != channel_.Secret {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": "通道名称与密钥不匹配",
})
return
}
@@ -42,6 +61,6 @@ func RegisterClient(c *gin.Context) {
})
return
}
channel.RegisterClient(user.Id, conn)
channel.RegisterClient(channelName, user.Id, conn)
return
}

106
model/channel.go Normal file
View File

@@ -0,0 +1,106 @@
package model
import (
"errors"
)
const (
TypeEmail = "email"
TypeWeChatTestAccount = "test"
TypeWeChatCorpAccount = "corp_app"
TypeCorp = "corp"
TypeLark = "lark"
TypeDing = "ding"
TypeTelegram = "telegram"
TypeDiscord = "discord"
TypeBark = "bark"
TypeClient = "client"
TypeNone = "none"
)
type Channel struct {
Id int `json:"id"`
Type string `json:"type" gorm:"type:varchar(32)"`
UserId int `json:"user_id" gorm:"uniqueIndex:name_user_id"`
Name string `json:"name" gorm:"type:varchar(32);uniqueIndex:name_user_id"`
Status int `json:"status" gorm:"default:1"` // enabled, disabled
Secret string `json:"secret"`
AppId string `json:"app_id"`
AccountId string `json:"account_id"`
URL string `json:"url" gorm:"column:url"`
Other string `json:"other"`
}
func GetChannelById(id int, userId int) (*Channel, error) {
if id == 0 || userId == 0 {
return nil, errors.New("id 或 userId 为空!")
}
c := Channel{Id: id, UserId: userId}
err := DB.Where(c).First(&c).Error
return &c, err
}
func GetChannelByName(name string, userId int) (*Channel, error) {
if name == "" || userId == 0 {
return nil, errors.New("name 或 userId 为空!")
}
c := Channel{Name: name, UserId: userId}
err := DB.Where(c).First(&c).Error
return &c, err
}
func GetTokenStoreChannels() (channels []*Channel, err error) {
err = DB.Where("type = ? or type = ?", TypeWeChatCorpAccount, TypeWeChatTestAccount).Find(&channels).Error
return channels, err
}
func GetTokenStoreChannelsByUserId(userId int) (channels []*Channel, err error) {
err = DB.Where("user_id = ?", userId).Where("type = ? or type = ?", TypeWeChatCorpAccount, TypeWeChatTestAccount).Find(&channels).Error
return channels, err
}
func GetChannelsByUserId(userId int, startIdx int, num int) (channels []*Channel, err error) {
err = DB.Where("user_id = ?", userId).Order("id desc").Limit(num).Offset(startIdx).Find(&channels).Error
return channels, err
}
func SearchChannels(userId int, keyword string) (channels []*Channel, err error) {
err = DB.Where("user_id = ?", userId).Select([]string{"id", "name"}).Where("id = ? or name LIKE", keyword, keyword+"%").Find(&channels).Error
return channels, err
}
func DeleteChannelById(id int, userId int) (c *Channel, err error) {
// Why we need userId here? In case user want to delete other's c.
if id == 0 || userId == 0 {
return nil, errors.New("id 或 userId 为空!")
}
c = &Channel{Id: id, UserId: userId}
err = DB.Where(c).First(&c).Error
if err != nil {
return nil, err
}
return c, c.Delete()
}
func (channel *Channel) Insert() error {
var err error
err = DB.Create(channel).Error
return err
}
func (channel *Channel) UpdateStatus(status int) error {
err := DB.Model(channel).Update("status", status).Error
return err
}
// Update Make sure your token's fields is completed, because this will update non-zero values
func (channel *Channel) Update() error {
var err error
err = DB.Model(channel).Select("type", "name", "secret", "app_id", "account_id", "url", "other").Updates(channel).Error
return err
}
func (channel *Channel) Delete() error {
err := DB.Delete(channel).Error
return err
}

View File

@@ -64,6 +64,10 @@ func InitDB() (err error) {
if err != nil {
return err
}
err = db.AutoMigrate(&Channel{})
if err != nil {
return err
}
err = createRootAccountIfNeed()
return err
} else {

View File

@@ -9,41 +9,20 @@ import (
// User if you add sensitive fields, don't forget to clean them in setupLogin function.
// Otherwise, the sensitive information will be saved on local storage in plain text!
type User struct {
Id int `json:"id"`
Username string `json:"username" gorm:"unique;index" validate:"max=12"`
Password string `json:"password" gorm:"not null;" validate:"min=8,max=20"`
DisplayName string `json:"display_name" gorm:"index" validate:"max=20"`
Role int `json:"role" gorm:"type:int;default:1"` // admin, common
Status int `json:"status" gorm:"type:int;default:1"` // enabled, disabled
Token string `json:"token"`
Email string `json:"email" gorm:"index" validate:"max=50"`
GitHubId string `json:"github_id" gorm:"column:github_id;index"`
WeChatId string `json:"wechat_id" gorm:"column:wechat_id;index"`
VerificationCode string `json:"verification_code" gorm:"-:all"` // this field is only for Email verification, don't save it to database!
Channel string `json:"channel"`
WeChatTestAccountId string `json:"wechat_test_account_id" gorm:"column:wechat_test_account_id"`
WeChatTestAccountSecret string `json:"wechat_test_account_secret" gorm:"column:wechat_test_account_secret"`
WeChatTestAccountTemplateId string `json:"wechat_test_account_template_id" gorm:"column:wechat_test_account_template_id"`
WeChatTestAccountOpenId string `json:"wechat_test_account_open_id" gorm:"column:wechat_test_account_open_id"`
WeChatTestAccountVerificationToken string `json:"wechat_test_account_verification_token" gorm:"column:wechat_test_account_verification_token"`
WeChatCorpAccountId string `json:"wechat_corp_account_id" gorm:"column:wechat_corp_account_id"`
WeChatCorpAccountAgentSecret string `json:"wechat_corp_account_agent_secret" gorm:"column:wechat_corp_account_agent_secret"`
WeChatCorpAccountAgentId string `json:"wechat_corp_account_agent_id" gorm:"column:wechat_corp_account_agent_id"`
WeChatCorpAccountUserId string `json:"wechat_corp_account_user_id" gorm:"column:wechat_corp_account_user_id"`
WeChatCorpAccountClientType string `json:"wechat_corp_account_client_type" gorm:"column:wechat_corp_account_client_type;default=plugin"`
CorpWebhookURL string `json:"corp_webhook_url" gorm:"corp_webhook_url"`
LarkWebhookURL string `json:"lark_webhook_url"`
LarkWebhookSecret string `json:"lark_webhook_secret"`
DingWebhookURL string `json:"ding_webhook_url"`
DingWebhookSecret string `json:"ding_webhook_secret"`
BarkServer string `json:"bark_server"`
BarkSecret string `json:"bark_secret"`
ClientSecret string `json:"client_secret"`
TelegramBotToken string `json:"telegram_bot_token"`
TelegramChatId string `json:"telegram_chat_id"`
DiscordWebhookURL string `json:"discord_webhook_url"`
SendEmailToOthers int `json:"send_email_to_others" gorm:"type:int;default:0"`
SaveMessageToDatabase int `json:"save_message_to_database" gorm:"type:int;default:0"`
Id int `json:"id"`
Username string `json:"username" gorm:"unique;index" validate:"max=12"`
Password string `json:"password" gorm:"not null;" validate:"min=8,max=20"`
DisplayName string `json:"display_name" gorm:"index" validate:"max=20"`
Role int `json:"role" gorm:"type:int;default:1"` // admin, common
Status int `json:"status" gorm:"type:int;default:1"` // enabled, disabled
Token string `json:"token"`
Email string `json:"email" gorm:"index" validate:"max=50"`
GitHubId string `json:"github_id" gorm:"column:github_id;index"`
WeChatId string `json:"wechat_id" gorm:"column:wechat_id;index"`
VerificationCode string `json:"verification_code" gorm:"-:all"` // this field is only for Email verification, don't save it to database!
Channel string `json:"channel"`
SendEmailToOthers int `json:"send_email_to_others" gorm:"type:int;default:0"`
SaveMessageToDatabase int `json:"save_message_to_database" gorm:"type:int;default:0"`
}
func GetMaxUserId() int {
@@ -57,11 +36,6 @@ func GetAllUsers(startIdx int, num int) (users []*User, err error) {
return users, err
}
func GetAllUsersWithSecrets() (users []*User, err error) {
err = DB.Where("status = ?", common.UserStatusEnabled).Where("wechat_test_account_id != '' or wechat_corp_account_id != ''").Find(&users).Error
return users, err
}
func SearchUsers(keyword string) (users []*User, err error) {
err = DB.Select([]string{"id", "username", "display_name", "role", "status", "email"}).Where("id = ? or username LIKE ? or email LIKE ? or display_name LIKE ?", keyword, keyword+"%", keyword+"%", keyword+"%").Find(&users).Error
return users, err
@@ -77,10 +51,7 @@ func GetUserById(id int, selectAll bool) (*User, error) {
err = DB.First(&user, "id = ?", id).Error
} else {
err = DB.Select([]string{"id", "username", "display_name", "role", "status", "email", "wechat_id", "github_id",
"channel", "token",
"wechat_test_account_id", "wechat_test_account_template_id", "wechat_test_account_open_id",
"wechat_corp_account_id", "wechat_corp_account_agent_id", "wechat_corp_account_user_id", "wechat_corp_account_client_type",
"bark_server", "telegram_chat_id", "save_message_to_database",
"channel", "token", "save_message_to_database",
}).First(&user, "id = ?", id).Error
}
return &user, err

View File

@@ -64,6 +64,16 @@ func SetApiRouter(router *gin.Engine) {
messageRoute.DELETE("/", middleware.RootAuth(), controller.DeleteAllMessages)
messageRoute.DELETE("/:id", middleware.UserAuth(), controller.DeleteMessage)
}
channelRoute := apiRouter.Group("/token")
channelRoute.Use(middleware.UserAuth())
{
channelRoute.GET("/", controller.GetAllChannels)
channelRoute.GET("/search", controller.SearchChannels)
channelRoute.GET("/:id", controller.GetChannel)
channelRoute.POST("/", controller.AddChannel)
channelRoute.PUT("/", controller.UpdateChannel)
channelRoute.DELETE("/:id", controller.DeleteChannel)
}
}
pushRouter := router.Group("/push")
pushRouter.Use(middleware.GlobalAPIRateLimit())