diff --git a/jetstream/kv.go b/jetstream/kv.go index 8cb7f2d..535da41 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -1231,11 +1231,9 @@ func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOp // Check if done and initial values. if !w.initDone { w.received++ - // We set this on the first trip through.. - if w.initPending == 0 { - w.initPending = delta - } - if w.received > w.initPending || delta == 0 { + // Use the stable initPending value set at consumer creation. + // We're done if we've received all expected messages OR there are no more pending. + if w.received >= w.initPending || delta == 0 { w.initDone = true w.updates <- nil } @@ -1281,9 +1279,13 @@ func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOp // Skip if UpdatesOnly() is set, since there will never be updates initially. if !o.updatesOnly { initialPending, err := sub.InitialConsumerPending() - if err == nil && initialPending == 0 { - w.initDone = true - w.updates <- nil + if err == nil { + if initialPending == 0 { + w.initDone = true + w.updates <- nil + } else { + w.initPending = initialPending + } } } else { // if UpdatesOnly was used, mark initialization as complete diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index 4856aad..7fa3ac4 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -2032,3 +2032,87 @@ func TestKeyValueLimitMarkerTTL(t *testing.T) { } }) } + +func TestKeyValueListKeysDuplicates(t *testing.T) { + listKeysF := func(kv jetstream.KeyValue) ([]string, error) { + t.Helper() + lister, err := kv.ListKeys(context.Background()) + if err != nil { + return nil, fmt.Errorf("error listing keys: %v", err) + } + var keys []string + for key := range lister.Keys() { + keys = append(keys, key) + } + return keys, nil + } + + keysF := func(kv jetstream.KeyValue) ([]string, error) { + t.Helper() + return kv.Keys(context.Background()) + } + + for _, test := range []string{"ListKeys", "Keys"} { + t.Run(test, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx := context.Background() + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST_KV", History: 5}) + if err != nil { + t.Fatalf("Error creating KV: %v", err) + } + + for i := range 10 { + key := fmt.Sprintf("key_%d", i) + if _, err := kv.PutString(ctx, key, "initial"); err != nil { + t.Fatalf("Error putting key %s: %v", key, err) + } + } + + done := make(chan bool) + go func() { + // Continuously update existing keys + for { + select { + case <-done: + return + default: + for i := range 5 { + key := fmt.Sprintf("key_%d", i) + if _, err := kv.PutString(ctx, key, "updated"); err != nil { + t.Logf("Error updating key %s: %v", key, err) + } + } + } + } + }() + + // List keys multiple times while updates are happening + for range 20 { + var keys []string + if test == "Keys" { + keys, err = keysF(kv) + } else { + keys, err = listKeysF(kv) + } + if err != nil { + t.Fatalf("Error getting keys: %v", err) + } + + seen := make(map[string]struct{}) + for _, key := range keys { + if _, exists := seen[key]; exists { + t.Fatalf("Duplicate key found: %s", key) + } + seen[key] = struct{}{} + } + } + + close(done) + }) + } +} diff --git a/kv.go b/kv.go index 332a5cc..4990491 100644 --- a/kv.go +++ b/kv.go @@ -1076,11 +1076,9 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error // Skip if UpdatesOnly() is set, since there will never be updates initially. if !w.initDone { w.received++ - // We set this on the first trip through.. - if w.initPending == 0 { - w.initPending = delta - } - if w.received > w.initPending || delta == 0 { + // Use the stable initPending value set at consumer creation. + // We're done if we've received all expected messages OR there are no more pending + if w.received >= w.initPending || delta == 0 { // Avoid possible race setting up timer. if w.initDoneTimer != nil { w.initDoneTimer.Stop() @@ -1128,23 +1126,26 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error // of the consumer, send the marker. // Skip if UpdatesOnly() is set, since there will never be updates initially. if !o.updatesOnly { - if sub.jsi != nil && sub.jsi.pending == 0 { - w.initDone = true - w.updates <- nil - } else { - // Set a timer to send the marker if we do not get any messages. - w.initDoneTimer = time.AfterFunc(kv.js.opts.wait, func() { - w.mu.Lock() - defer w.mu.Unlock() - if !w.initDone { - w.initDone = true - select { - case w.errCh <- ErrKeyWatcherTimeout: - default: + if sub.jsi != nil { + if sub.jsi.pending == 0 { + w.initDone = true + w.updates <- nil + } else { + w.initPending = sub.jsi.pending + // Set a timer to send the marker if we do not get any messages. + w.initDoneTimer = time.AfterFunc(kv.js.opts.wait, func() { + w.mu.Lock() + defer w.mu.Unlock() + if !w.initDone { + w.initDone = true + select { + case w.errCh <- ErrKeyWatcherTimeout: + default: + } + w.updates <- nil } - w.updates <- nil - } - }) + }) + } } } else { // if UpdatesOnly was used, mark initialization as complete diff --git a/test/kv_test.go b/test/kv_test.go index 0bcbce8..ce03e3f 100644 --- a/test/kv_test.go +++ b/test/kv_test.go @@ -1853,3 +1853,84 @@ func TestKeyValueWatcherStopTimer(t *testing.T) { } time.Sleep(500 * time.Millisecond) } + +func TestKeyValueListKeysDuplicates(t *testing.T) { + listKeysF := func(kv nats.KeyValue) ([]string, error) { + t.Helper() + lister, err := kv.ListKeys() + if err != nil { + return nil, fmt.Errorf("error listing keys: %v", err) + } + var keys []string + for key := range lister.Keys() { + keys = append(keys, key) + } + return keys, nil + } + + keysF := func(kv nats.KeyValue) ([]string, error) { + t.Helper() + return kv.Keys() + } + + for _, test := range []string{"ListKeys", "Keys"} { + t.Run(test, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "TEST_KV", History: 5}) + if err != nil { + t.Fatalf("Error creating KV: %v", err) + } + + for i := range 10 { + key := fmt.Sprintf("key_%d", i) + if _, err := kv.PutString(key, "initial"); err != nil { + t.Fatalf("Error putting key %s: %v", key, err) + } + } + + done := make(chan bool) + go func() { + // Continuously update existing keys + for { + select { + case <-done: + return + default: + for i := range 5 { + key := fmt.Sprintf("key_%d", i) + kv.PutString(key, "updated") + } + } + } + }() + + // List keys multiple times while updates are happening + for range 20 { + var keys []string + if test == "Keys" { + keys, err = keysF(kv) + } else { + keys, err = listKeysF(kv) + } + if err != nil { + t.Fatalf("Error getting keys: %v", err) + } + + seen := make(map[string]struct{}) + for _, key := range keys { + if _, exists := seen[key]; exists { + t.Fatalf("Duplicate key found: %s", key) + } + seen[key] = struct{}{} + } + } + + close(done) + }) + } +}