From 7bd7bd5087c40f96015a61d01ce7ae4e6951c807 Mon Sep 17 00:00:00 2001 From: Hubertus Hohl Date: Sun, 12 Mar 2023 00:17:10 +0100 Subject: [PATCH] fix: common subscriptions issued by different clients at the same time may be lost (#186) --- topics.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/topics.go b/topics.go index f0941e1..ecb3370 100644 --- a/topics.go +++ b/topics.go @@ -301,6 +301,9 @@ func NewTopicsIndex() *TopicsIndex { // Subscribe adds a new subscription for a client to a topic filter, returning // true if the subscription was new. func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) bool { + x.root.Lock() + defer x.root.Unlock() + var existed bool prefix, _ := isolateParticle(subscription.Filter, 0) if strings.EqualFold(prefix, SharePrefix) { @@ -320,6 +323,9 @@ func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription // Unsubscribe removes a subscription filter for a client, returning true if the // subscription existed. func (x *TopicsIndex) Unsubscribe(filter, client string) bool { + x.root.Lock() + defer x.root.Unlock() + var d int if strings.HasPrefix(filter, SharePrefix) { d = 2 @@ -346,6 +352,9 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool { // 1 if a retained message was added, and -1 if the retained message was removed. // 0 is returned if sequential empty payloads are received. func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64 { + x.root.Lock() + defer x.root.Unlock() + n := x.set(pk.TopicName, 0) n.Lock() defer n.Unlock()