mirror of
https://github.com/nats-io/nats.go.git
synced 2025-09-26 20:41:41 +08:00
[FIXED] Clear status listeners map on SubscriptionClosed event
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
20
nats.go
20
nats.go
@@ -33,6 +33,7 @@ import (
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -4686,7 +4687,7 @@ func (s *Subscription) registerStatusChangeListener(status SubStatus, ch chan Su
|
||||
// will not block. Lock should be held entering.
|
||||
func (s *Subscription) sendStatusEvent(status SubStatus) {
|
||||
for ch, statuses := range s.statListeners {
|
||||
if !containsStatus(statuses, status) {
|
||||
if !slices.Contains(statuses, status) {
|
||||
continue
|
||||
}
|
||||
// only send event if someone's listening
|
||||
@@ -4694,21 +4695,18 @@ func (s *Subscription) sendStatusEvent(status SubStatus) {
|
||||
case ch <- status:
|
||||
default:
|
||||
}
|
||||
if status == SubscriptionClosed {
|
||||
}
|
||||
// After sending SubscriptionClosed status to all listeners,
|
||||
// close all channels and clear the map to prevent future
|
||||
// sends to closed channels that could cause panics
|
||||
if status == SubscriptionClosed {
|
||||
for ch := range s.statListeners {
|
||||
close(ch)
|
||||
}
|
||||
s.statListeners = nil
|
||||
}
|
||||
}
|
||||
|
||||
func containsStatus(statuses []SubStatus, status SubStatus) bool {
|
||||
for _, s := range statuses {
|
||||
if s == status {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// changeSubStatus changes subscription status and sends events
|
||||
// to all listeners. Lock should be held entering.
|
||||
func (s *Subscription) changeSubStatus(status SubStatus) {
|
||||
|
Reference in New Issue
Block a user