[FIXED] Track delivered count and auto-unsubscribe for channel subscriptions (#1913)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
Piotr Piotrowski
2025-08-19 17:32:51 +02:00
committed by GitHub
parent 7af9697b58
commit 20accc137f
2 changed files with 90 additions and 35 deletions

17
nats.go
View File

@@ -3447,6 +3447,10 @@ func (nc *Conn) processMsg(data []byte) {
if sub.mch != nil {
select {
case sub.mch <- m:
// For ChanSubscribe, track delivered count here
if sub.typ == ChanSubscription {
sub.delivered++
}
default:
goto slowConsumer
}
@@ -3506,6 +3510,19 @@ func (nc *Conn) processMsg(data []byte) {
nc.checkForSequenceMismatch(m, sub, jsi)
}
// Check if we need to auto-unsubscribe for chan subscriptions
if sub.typ == ChanSubscription && sub.max > 0 && !ctrlMsg {
sub.mu.Lock()
if sub.delivered >= sub.max {
sub.mu.Unlock()
nc.mu.Lock()
nc.removeSub(sub)
nc.mu.Unlock()
} else {
sub.mu.Unlock()
}
}
return
slowConsumer:

View File

@@ -54,7 +54,7 @@ func TestServerAutoUnsub(t *testing.T) {
}
sub.AutoUnsubscribe(int(max))
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
@@ -83,7 +83,7 @@ func TestClientSyncAutoUnsub(t *testing.T) {
sub, _ := nc.SubscribeSync("foo")
sub.AutoUnsubscribe(max)
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
@@ -108,6 +108,44 @@ func TestClientSyncAutoUnsub(t *testing.T) {
}
}
func TestClientChanAutoUnsub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
nc := NewDefaultConnection(t)
defer nc.Close()
received := 0
max := 10
ch := make(chan *nats.Msg, max)
sub, _ := nc.ChanSubscribe("foo", ch)
sub.AutoUnsubscribe(max)
total := 100
for range total {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
// Drain the channel
for {
select {
case <-ch:
received++
case <-time.After(10 * time.Millisecond):
if received != max {
t.Fatalf("Received %d msgs, wanted only %d", received, max)
}
if sub.IsValid() {
t.Fatal("Expected subscription to be invalid after hitting max")
}
if err := sub.AutoUnsubscribe(10); err == nil {
t.Fatal("Calling AutoUnsubscribe() ob closed subscription should fail")
}
return
}
}
}
func TestClientASyncAutoUnsub(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
@@ -124,7 +162,7 @@ func TestClientASyncAutoUnsub(t *testing.T) {
}
sub.AutoUnsubscribe(int(max))
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
@@ -165,7 +203,7 @@ func TestAutoUnsubAndReconnect(t *testing.T) {
// Send less than the max
total := int(max / 2)
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
@@ -182,7 +220,7 @@ func TestAutoUnsubAndReconnect(t *testing.T) {
// Now send more than the total max.
total = int(3 * max)
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
@@ -227,7 +265,7 @@ func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) {
wg.Add(numRoutines)
for i := 0; i < numRoutines; i++ {
for i := range numRoutines {
go func(s *nats.Subscription, idx int) {
for {
// The first to reach the max delivered will cause the
@@ -245,7 +283,7 @@ func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) {
}
msg := []byte("Hello")
for i := 0; i < max/2; i++ {
for range max / 2 {
nc.Publish("foo", msg)
}
nc.Flush()
@@ -259,7 +297,7 @@ func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) {
t.Fatal("Failed to get reconnected cb")
}
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", msg)
}
nc.Flush()
@@ -396,7 +434,7 @@ func TestIsValidSubscriber(t *testing.T) {
if !sub.IsValid() {
t.Fatalf("Subscription should be valid")
}
for i := 0; i < 10; i++ {
for range 10 {
nc.Publish("foo", []byte("Hello"))
}
nc.Flush()
@@ -424,7 +462,7 @@ func TestSlowSubscriber(t *testing.T) {
sub, _ := nc.SubscribeSync("foo")
sub.SetPendingLimits(100, 1024)
for i := 0; i < 200; i++ {
for range 200 {
nc.Publish("foo", []byte("Hello"))
}
timeout := 5 * time.Second
@@ -455,7 +493,7 @@ func TestSlowChanSubscriber(t *testing.T) {
sub, _ := nc.ChanSubscribe("foo", ch)
sub.SetPendingLimits(100, 1024)
for i := 0; i < 200; i++ {
for range 200 {
nc.Publish("foo", []byte("Hello"))
}
timeout := 5 * time.Second
@@ -509,7 +547,7 @@ func TestSlowAsyncSubscriber(t *testing.T) {
t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", pbl, pb)
}
for i := 0; i < (int(pml) + 100); i++ {
for range int(pml) + 100 {
nc.Publish("foo", []byte("Hello"))
}
@@ -586,7 +624,7 @@ func TestAsyncErrHandler(t *testing.T) {
// First one trips the ch wait in subscription callback.
nc.Publish(subj, b)
nc.Flush()
for i := 0; i < toSend; i++ {
for range toSend {
nc.Publish(subj, b)
}
if err := nc.Flush(); err != nil {
@@ -650,7 +688,7 @@ func TestAsyncErrHandlerChanSubscription(t *testing.T) {
})
b := []byte("Hello World!")
for i := 0; i < toSend; i++ {
for range toSend {
nc.Publish(subj, b)
}
nc.Flush()
@@ -722,7 +760,7 @@ func TestAsyncSubscribersOnClose(t *testing.T) {
<-ch
})
for i := 0; i < toSend; i++ {
for range toSend {
nc.Publish("foo", []byte("Hello World!"))
}
nc.Flush()
@@ -820,7 +858,7 @@ func TestChanSubscriber(t *testing.T) {
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
@@ -861,7 +899,7 @@ func TestChanQueueSubscriber(t *testing.T) {
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
@@ -940,7 +978,7 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
nc.Flush()
// Send some messages
for i := 0; i < total; i++ {
for range total {
if err := ncp.Publish("foo", []byte("Hello")); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
@@ -990,7 +1028,7 @@ func TestQueueChanQueueSubscriber(t *testing.T) {
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
@@ -1049,7 +1087,7 @@ func TestUnsubscribeChanOnSubscriber(t *testing.T) {
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
@@ -1075,7 +1113,7 @@ func TestCloseChanOnSubscriber(t *testing.T) {
// Send some messages to ourselves.
total := 100
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", []byte("Hello"))
}
@@ -1110,7 +1148,7 @@ func TestAsyncSubscriptionPending(t *testing.T) {
})
defer sub.Unsubscribe()
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", msg)
}
nc.Flush()
@@ -1189,7 +1227,7 @@ func TestAsyncSubscriptionPendingDrain(t *testing.T) {
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {})
defer sub.Unsubscribe()
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", msg)
}
nc.Flush()
@@ -1229,7 +1267,7 @@ func TestSyncSubscriptionPendingDrain(t *testing.T) {
sub, _ := nc.SubscribeSync("foo")
defer sub.Unsubscribe()
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", msg)
}
nc.Flush()
@@ -1266,7 +1304,7 @@ func TestSyncSubscriptionPending(t *testing.T) {
// Send some messages to ourselves.
total := 100
msg := []byte("0123456789")
for i := 0; i < total; i++ {
for range total {
nc.Publish("foo", msg)
}
nc.Flush()
@@ -1289,7 +1327,7 @@ func TestSyncSubscriptionPending(t *testing.T) {
}
// Now drain some down and make sure pending is correct
for i := 0; i < total-1; i++ {
for range total - 1 {
sub.NextMsg(10 * time.Millisecond)
}
m, b, _ = sub.Pending()
@@ -1329,7 +1367,7 @@ func TestSetPendingLimits(t *testing.T) {
}
// function to send messages
send := func(subject string, count int) {
for i := 0; i < count; i++ {
for range count {
if err := nc.Publish(subject, payload); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
@@ -1642,7 +1680,7 @@ func TestSubscriptionEvents(t *testing.T) {
// initial status
WaitOnChannel(t, status, nats.SubscriptionActive)
for i := 0; i < 11; i++ {
for range 11 {
nc.Publish("foo", []byte("Hello"))
}
WaitOnChannel(t, status, nats.SubscriptionSlowConsumer)
@@ -1677,7 +1715,7 @@ func TestSubscriptionEvents(t *testing.T) {
sub.SetPendingLimits(10, 1024)
status := sub.StatusChanged(nats.SubscriptionSlowConsumer)
for i := 0; i < 20; i++ {
for range 20 {
nc.Publish("foo", []byte("Hello"))
}
WaitOnChannel(t, status, nats.SubscriptionSlowConsumer)
@@ -1692,7 +1730,7 @@ func TestSubscriptionEvents(t *testing.T) {
sub.SetPendingLimits(10, 1024)
status = sub.StatusChanged(nats.SubscriptionSlowConsumer)
for i := 0; i < 20; i++ {
for range 20 {
nc.Publish("foo", []byte("Hello"))
}
WaitOnChannel(t, status, nats.SubscriptionSlowConsumer)
@@ -1722,14 +1760,14 @@ func TestSubscriptionEvents(t *testing.T) {
WaitOnChannel(t, status, nats.SubscriptionActive)
// chan length is 10, so make sure we switch state more times
for i := 0; i < 20; i++ {
for range 20 {
// subscription will enter slow consumer state
for i := 0; i < 11; i++ {
for range 11 {
nc.Publish("foo", []byte("Hello"))
}
// messages flow normally, status flips to active
for i := 0; i < 10; i++ {
for range 10 {
nc.Publish("foo", []byte("Hello"))
blockChan <- struct{}{}
}
@@ -1757,7 +1795,7 @@ func TestMaxSubscriptionsExceeded(t *testing.T) {
}
defer nc.Close()
for i := 0; i < 6; i++ {
for range 6 {
s, err := nc.Subscribe("foo", func(_ *nats.Msg) {})
if err != nil {
t.Fatalf("Error subscribing: %v", err)
@@ -1870,7 +1908,7 @@ func TestSubscribeSyncPermissionError(t *testing.T) {
defer nc.Close()
subs := make([]*nats.Subscription, 0, 100)
for i := 0; i < 10; i++ {
for i := range 10 {
var subject string
if i%2 == 0 {
subject = "foo"