mirror of
https://github.com/wonli/aqi.git
synced 2025-12-24 10:40:58 +08:00
feat(ws): add user cleanup logic for inactive connections
implement UnsubAllTopics method to clean user subscriptions add periodic cleanup of inactive users in Hubc guard loop add RemoveSubUser method to Topic for proper user removal
This commit is contained in:
17
ws/hubc.go
17
ws/hubc.go
@@ -67,12 +67,27 @@ func (h *Hubc) Run() {
|
||||
}
|
||||
|
||||
func (h *Hubc) guard() {
|
||||
cleanupTTL := 5 * time.Minute
|
||||
timer := time.NewTicker(30 * time.Second)
|
||||
for range timer.C {
|
||||
userCount := 0
|
||||
guestCount := len(h.Guests)
|
||||
h.Users.Range(func(key, value any) bool {
|
||||
userCount++
|
||||
user, ok := value.(*User)
|
||||
if !ok || user == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if len(user.AppClients) == 0 {
|
||||
if time.Since(user.LastHeartbeatTime) >= cleanupTTL {
|
||||
user.UnsubAllTopics()
|
||||
h.Users.Delete(key)
|
||||
h.PubSub.Pub("cleanupUser", H{"suid": user.Suid})
|
||||
}
|
||||
} else {
|
||||
userCount++
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
|
||||
@@ -6,10 +6,10 @@ import (
|
||||
)
|
||||
|
||||
type Topic struct {
|
||||
Id string //订阅主题ID
|
||||
PubSub *PubSub //关联PubSub
|
||||
SubUsers sync.Map //SubUsers map[string]*time.Time //订阅用户uniqueId和订阅时间
|
||||
SubHandlers sync.Map //SubHandlers map[string]func(msg *TopicMsg) //内部组件间通知
|
||||
Id string //订阅主题ID
|
||||
PubSub *PubSub //关联PubSub
|
||||
SubUsers sync.Map //SubUsers map[string]*time.Time //订阅用户uniqueId和订阅时间
|
||||
SubHandlers sync.Map //SubHandlers map[string]func(msg *TopicMsg) //内部组件间通知
|
||||
}
|
||||
|
||||
func (a *Topic) AddSubUser(user *User) {
|
||||
@@ -18,7 +18,12 @@ func (a *Topic) AddSubUser(user *User) {
|
||||
}
|
||||
|
||||
func (a *Topic) AddSubHandle(f func(msg *TopicMsg)) {
|
||||
a.SubHandlers.LoadOrStore(a.Id, f)
|
||||
a.SubHandlers.LoadOrStore(a.Id, f)
|
||||
}
|
||||
|
||||
// RemoveSubUser 从主题订阅集合中移除指定用户
|
||||
func (a *Topic) RemoveSubUser(suid string) {
|
||||
a.SubUsers.Delete(suid)
|
||||
}
|
||||
|
||||
func (a *Topic) SendToSubUser(msg []byte) {
|
||||
|
||||
18
ws/user.go
18
ws/user.go
@@ -67,6 +67,24 @@ func (u *User) UnsubTopic(topicId string) int {
|
||||
return len(u.SubTopics)
|
||||
}
|
||||
|
||||
// UnsubAllTopics 取消用户的所有主题订阅(用户侧与主题侧同时清理)
|
||||
func (u *User) UnsubAllTopics() int {
|
||||
u.Lock()
|
||||
defer u.Unlock()
|
||||
|
||||
for topicId, topic := range u.SubTopics {
|
||||
if topic != nil {
|
||||
// 从主题订阅集合中移除该用户
|
||||
topic.RemoveSubUser(u.Suid)
|
||||
}
|
||||
|
||||
// 从用户侧映射移除该主题
|
||||
delete(u.SubTopics, topicId)
|
||||
}
|
||||
|
||||
return len(u.SubTopics)
|
||||
}
|
||||
|
||||
// AppLogin 用户APP客户端登录
|
||||
func (u *User) appLogin(appId string, client *Client) error {
|
||||
var index int
|
||||
|
||||
Reference in New Issue
Block a user