From 9d1be0347ae40120e498af51ced9a2c602747a03 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 3 Apr 2025 14:02:33 +0200 Subject: [PATCH] [FIXED] Ensure object watcher stop closes the updates channel (#1844) Signed-off-by: Piotr Piotrowski --- jetstream/object.go | 3 +++ jetstream/test/kv_test.go | 31 +++++++++++++++++++++++++++++++ jetstream/test/object_test.go | 30 ++++++++++++++++++++++++++++++ object.go | 4 ++++ test/kv_test.go | 29 +++++++++++++++++++++++++++++ test/object_test.go | 29 +++++++++++++++++++++++++++++ 6 files changed, 126 insertions(+) diff --git a/jetstream/object.go b/jetstream/object.go index d8440d4..eafee78 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -1330,6 +1330,9 @@ func (obs *obs) Watch(ctx context.Context, opts ...WatchOpt) (ObjectWatcher, err if err != nil { return nil, err } + sub.SetClosedHandler(func(_ string) { + close(w.updates) + }) w.sub = sub return w, nil } diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index 186b3b0..08e654c 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -614,6 +614,37 @@ func TestKeyValueWatch(t *testing.T) { expectOk(t, kv.Delete(ctx, "age")) expectDelete("age", 6) }) + + t.Run("stop watcher should not block", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + watcher, err := kv.WatchAll(ctx) + expectOk(t, err) + + expectInitDone := expectInitDoneF(t, watcher) + expectInitDone() + + err = watcher.Stop() + expectOk(t, err) + + select { + case _, ok := <-watcher.Updates(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(100 * time.Millisecond): + break + } + }) } func TestKeyValueWatchContext(t *testing.T) { diff --git a/jetstream/test/object_test.go b/jetstream/test/object_test.go index 2d335f6..d7d3100 100644 --- a/jetstream/test/object_test.go +++ b/jetstream/test/object_test.go @@ -695,6 +695,36 @@ func TestObjectWatch(t *testing.T) { expectOk(t, err) expectUpdate("C") }) + + t.Run("stop watcher should close the channel", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + obs, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "WATCH-TEST"}) + expectOk(t, err) + + watcher, err := obs.Watch(ctx) + expectOk(t, err) + + expectInitDone := expectInitDoneF(t, watcher) + expectInitDone() + + err = watcher.Stop() + expectOk(t, err) + + select { + case _, ok := <-watcher.Updates(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(100 * time.Millisecond): + return + } + }) } func TestObjectLinks(t *testing.T) { diff --git a/object.go b/object.go index 75ceaa8..a5c1515 100644 --- a/object.go +++ b/object.go @@ -1127,6 +1127,10 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { if err != nil { return nil, err } + // Set us up to close when the waitForMessages func returns. + sub.pDone = func(_ string) { + close(w.updates) + } w.sub = sub return w, nil } diff --git a/test/kv_test.go b/test/kv_test.go index cbe0fcf..13c09ff 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -454,6 +454,35 @@ func TestKeyValueWatch(t *testing.T) { expectOk(t, kv.Delete("age")) expectDelete("age", 6) }) + + t.Run("stop watcher should not block", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + watcher, err := kv.WatchAll() + expectOk(t, err) + + expectInitDone := expectInitDoneF(t, watcher) + expectInitDone() + + err = watcher.Stop() + expectOk(t, err) + + select { + case _, ok := <-watcher.Updates(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("Stop watcher did not return") + } + }) } func TestKeyValueWatchContext(t *testing.T) { diff --git a/test/object_test.go b/test/object_test.go index b4df4b2..3d68636 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -625,6 +625,35 @@ func TestObjectWatch(t *testing.T) { expectOk(t, err) expectUpdate("C") }) + + t.Run("stop watcher should not block", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: "WATCH-TEST"}) + expectOk(t, err) + + watcher, err := obs.Watch() + expectOk(t, err) + + expectInitDone := expectInitDoneF(t, watcher) + expectInitDone() + + err = watcher.Stop() + expectOk(t, err) + + select { + case _, ok := <-watcher.Updates(): + if ok { + t.Fatal("Expected channel to be closed") + } + case <-time.After(100 * time.Millisecond): + return + } + }) } func TestObjectLinks(t *testing.T) {