fix: common subscriptions issued by different clients at the same time may be lost (#186)

This commit is contained in:
Hubertus Hohl
2023-03-12 00:17:10 +01:00
committed by GitHub
parent 655bf9fdb1
commit 7bd7bd5087

View File

@@ -301,6 +301,9 @@ func NewTopicsIndex() *TopicsIndex {
// Subscribe adds a new subscription for a client to a topic filter, returning // Subscribe adds a new subscription for a client to a topic filter, returning
// true if the subscription was new. // true if the subscription was new.
func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) bool { func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) bool {
x.root.Lock()
defer x.root.Unlock()
var existed bool var existed bool
prefix, _ := isolateParticle(subscription.Filter, 0) prefix, _ := isolateParticle(subscription.Filter, 0)
if strings.EqualFold(prefix, SharePrefix) { if strings.EqualFold(prefix, SharePrefix) {
@@ -320,6 +323,9 @@ func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription
// Unsubscribe removes a subscription filter for a client, returning true if the // Unsubscribe removes a subscription filter for a client, returning true if the
// subscription existed. // subscription existed.
func (x *TopicsIndex) Unsubscribe(filter, client string) bool { func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
x.root.Lock()
defer x.root.Unlock()
var d int var d int
if strings.HasPrefix(filter, SharePrefix) { if strings.HasPrefix(filter, SharePrefix) {
d = 2 d = 2
@@ -346,6 +352,9 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
// 1 if a retained message was added, and -1 if the retained message was removed. // 1 if a retained message was added, and -1 if the retained message was removed.
// 0 is returned if sequential empty payloads are received. // 0 is returned if sequential empty payloads are received.
func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64 { func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64 {
x.root.Lock()
defer x.root.Unlock()
n := x.set(pk.TopicName, 0) n := x.set(pk.TopicName, 0)
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()