mirror of
https://github.com/wonli/aqi.git
synced 2025-12-24 10:40:58 +08:00
refactor(ws/pubsub): simplify topic message processing loop
Remove redundant channel check and flatten select statement into a range loop for cleaner message handling
This commit is contained in:
32
ws/pubsub.go
32
ws/pubsub.go
@@ -39,7 +39,7 @@ func (a *PubSub) initTopic(topicId string) *Topic {
|
||||
// Pub 发布主题
|
||||
func (a *PubSub) Pub(topicId string, data any) {
|
||||
msg := Action{
|
||||
Action: "subscriber",
|
||||
Action: topicId,
|
||||
Data: H{
|
||||
"topicId": topicId,
|
||||
"message": data,
|
||||
@@ -66,25 +66,17 @@ func (a *PubSub) SubFunc(topicId string, f func(msg *TopicMsg)) {
|
||||
}
|
||||
|
||||
func (a *PubSub) Start() {
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-a.TopicMsgQueue:
|
||||
if !ok {
|
||||
logger.SugarLog.Info("从订阅主题队列取数据失败")
|
||||
continue
|
||||
}
|
||||
|
||||
t, hasTopic := a.Topics.Load(msg.TopicId)
|
||||
if !hasTopic {
|
||||
logger.SugarLog.Info("未发布订阅主题收到消息")
|
||||
continue
|
||||
}
|
||||
|
||||
//订阅消息的函数处理
|
||||
t.(*Topic).ApplyFunc(msg)
|
||||
|
||||
//订阅消息的用户处理
|
||||
t.(*Topic).SendToSubUser(msg.Msg)
|
||||
for msg := range a.TopicMsgQueue {
|
||||
t, hasTopic := a.Topics.Load(msg.TopicId)
|
||||
if !hasTopic {
|
||||
logger.SugarLog.Info("未发布订阅主题收到消息")
|
||||
continue
|
||||
}
|
||||
|
||||
//订阅消息的函数处理
|
||||
t.(*Topic).ApplyFunc(msg)
|
||||
|
||||
//订阅消息的用户处理
|
||||
t.(*Topic).SendToSubUser(msg.Msg)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user