Fix ScanSubscribersTopicInheritanceBug (#243)

* Sub a/b should not receive msg for a/b/c...

* Add TestScanSubscribersTopicInheritanceBug test

* Ensure sharedSubscription are gathered

* Fix Unsubscribe for sharedSub and optimization

* Unsub with lower case in TestUnsubscribeShared

* Add test with # for TestScanSubscribersShared
This commit is contained in:
thedevop
2023-06-19 01:42:16 -07:00
committed by GitHub
parent e60b8ff0c9
commit 4db49a4b9d
2 changed files with 37 additions and 16 deletions

View File

@@ -327,7 +327,9 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
defer x.root.Unlock()
var d int
if strings.HasPrefix(filter, SharePrefix) {
prefix, _ := isolateParticle(filter, 0)
shareSub := strings.EqualFold(prefix, SharePrefix)
if shareSub {
d = 2
}
@@ -336,8 +338,7 @@ func (x *TopicsIndex) Unsubscribe(filter, client string) bool {
return false
}
prefix, _ := isolateParticle(filter, 0)
if strings.EqualFold(prefix, SharePrefix) {
if shareSub {
group, _ := isolateParticle(filter, 1)
particle.shared.Delete(group, client)
} else {
@@ -500,20 +501,27 @@ func (x *TopicsIndex) scanSubscribers(topic string, d int, n *particle, subs *Su
}
key, hasNext := isolateParticle(topic, d)
for _, partKey := range []string{key, "+", "#"} {
for _, partKey := range []string{key, "+"} {
if particle := n.particles.get(partKey); particle != nil { // [MQTT-3.3.2-3]
x.gatherSubscriptions(topic, particle, subs)
x.gatherSharedSubscriptions(particle, subs)
if wild := particle.particles.get("#"); wild != nil && partKey != "#" && partKey != "+" {
x.gatherSubscriptions(topic, wild, subs) // also match any subs where filter/# is filter as per 4.7.1.2
}
if hasNext {
x.scanSubscribers(topic, d+1, particle, subs)
} else {
x.gatherSubscriptions(topic, particle, subs)
x.gatherSharedSubscriptions(particle, subs)
if wild := particle.particles.get("#"); wild != nil && partKey != "+" {
x.gatherSubscriptions(topic, wild, subs) // also match any subs where filter/# is filter as per 4.7.1.2
x.gatherSharedSubscriptions(wild, subs)
}
}
}
}
if particle := n.particles.get("#"); particle != nil {
x.gatherSubscriptions(topic, particle, subs)
x.gatherSharedSubscriptions(particle, subs)
}
return subs
}

View File

@@ -319,7 +319,7 @@ func TestUnsubscribeShared(t *testing.T) {
require.True(t, exists)
require.Equal(t, byte(2), client.Qos)
require.True(t, index.Unsubscribe("$SHARE/tmp/a/b/c", "cl1"))
require.True(t, index.Unsubscribe("$share/tmp/a/b/c", "cl1"))
_, exists = final.shared.Get("tmp", "cl1")
require.False(t, exists)
}
@@ -501,28 +501,40 @@ func TestScanSubscribers(t *testing.T) {
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: "$SYS/test", Identifier: 2})
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
require.Equal(t, 4, len(subs.Subscriptions))
require.Equal(t, 3, len(subs.Subscriptions))
require.Contains(t, subs.Subscriptions, "cl1")
require.Contains(t, subs.Subscriptions, "cl2")
require.Contains(t, subs.Subscriptions, "cl3")
require.Contains(t, subs.Subscriptions, "cl4")
require.Equal(t, byte(1), subs.Subscriptions["cl1"].Qos)
require.Equal(t, byte(2), subs.Subscriptions["cl2"].Qos)
require.Equal(t, byte(1), subs.Subscriptions["cl3"].Qos)
require.Equal(t, byte(0), subs.Subscriptions["cl4"].Qos)
require.Equal(t, 22, subs.Subscriptions["cl1"].Identifiers["a/b/c"])
require.Equal(t, 0, subs.Subscriptions["cl2"].Identifiers["a/#"])
require.Equal(t, 77, subs.Subscriptions["cl2"].Identifiers["a/b/+"])
require.Equal(t, 0, subs.Subscriptions["cl2"].Identifiers["a/b/c"])
require.Equal(t, 234, subs.Subscriptions["cl3"].Identifiers["+/b"])
require.Equal(t, 5, subs.Subscriptions["cl4"].Identifiers["#"])
subs = index.scanSubscribers("d/e/f/g", 0, nil, new(Subscribers))
require.Equal(t, 1, len(subs.Subscriptions))
require.Contains(t, subs.Subscriptions, "cl4")
require.Equal(t, byte(0), subs.Subscriptions["cl4"].Qos)
require.Equal(t, 5, subs.Subscriptions["cl4"].Identifiers["#"])
subs = index.scanSubscribers("", 0, nil, new(Subscribers))
require.Equal(t, 0, len(subs.Subscriptions))
}
func TestScanSubscribersTopicInheritanceBug(t *testing.T) {
index := NewTopicsIndex()
index.Subscribe("cl1", packets.Subscription{Qos: 0, Filter: "a/b/c"})
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: "a/b"})
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
require.Equal(t, 1, len(subs.Subscriptions))
}
func TestScanSubscribersShared(t *testing.T) {
index := NewTopicsIndex()
index.Subscribe("cl1", packets.Subscription{Qos: 1, Filter: SharePrefix + "/tmp/a/b/c", Identifier: 111})
@@ -531,8 +543,9 @@ func TestScanSubscribersShared(t *testing.T) {
index.Subscribe("cl2", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 10})
index.Subscribe("cl3", packets.Subscription{Qos: 1, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 200})
index.Subscribe("cl4", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/+", Identifier: 201})
index.Subscribe("cl5", packets.Subscription{Qos: 0, Filter: SharePrefix + "/tmp/a/b/c/#"})
subs := index.scanSubscribers("a/b/c", 0, nil, new(Subscribers))
require.Equal(t, 3, len(subs.Shared))
require.Equal(t, 4, len(subs.Shared))
}
func TestSelectSharedSubscriber(t *testing.T) {