[FIXED] Race condition in Fetch and FetchBatch when using heartbeats (#1601)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
Piotr Piotrowski
2024-04-02 17:24:17 +02:00
committed by GitHub
parent 33316cdf88
commit 42076581a5

10
js.go
View File

@@ -2867,6 +2867,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if subClosed { if subClosed {
err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed) err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
} }
hbLock := sync.Mutex{}
if err == nil && len(msgs) < batch && !subClosed { if err == nil && len(msgs) < batch && !subClosed {
// For batch real size of 1, it does not make sense to set no_wait in // For batch real size of 1, it does not make sense to set no_wait in
// the request. // the request.
@@ -2909,7 +2910,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if o.hb > 0 { if o.hb > 0 {
if hbTimer == nil { if hbTimer == nil {
hbTimer = time.AfterFunc(2*o.hb, func() { hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel() cancel()
}) })
} else { } else {
@@ -2951,6 +2954,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
} }
// If there is at least a message added to msgs, then need to return OK and no error // If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 { if err != nil && len(msgs) == 0 {
hbLock.Lock()
defer hbLock.Unlock()
if hbErr != nil { if hbErr != nil {
return nil, hbErr return nil, hbErr
} }
@@ -3181,9 +3186,12 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
} }
var hbTimer *time.Timer var hbTimer *time.Timer
var hbErr error var hbErr error
hbLock := sync.Mutex{}
if o.hb > 0 { if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() { hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel() cancel()
}) })
} }
@@ -3219,11 +3227,13 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
} }
} }
if err != nil { if err != nil {
hbLock.Lock()
if hbErr != nil { if hbErr != nil {
result.err = hbErr result.err = hbErr
} else { } else {
result.err = o.checkCtxErr(err) result.err = o.checkCtxErr(err)
} }
hbLock.Unlock()
} }
close(result.msgs) close(result.msgs)
result.done <- struct{}{} result.done <- struct{}{}