[ADDED] Error() method to KeyLister and KeyWatcher interfaces (#1889)

* [ADDED] Channel-based Error() method to KeyLister and KeyWatcher interfaces

The new Error() method returns <-chan error, enabling concurrent error
handling with select statements. This allows immediate detection of
timeout errors during key listing/watching operations.

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>

* Returned closed err channel if watcher is nil

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Piotr Piotrowski
2025-06-30 12:04:20 +02:00
committed by GitHub
parent 3146d564a5
commit b0b229c516
2 changed files with 134 additions and 1 deletions

37
kv.go
View File

@@ -115,12 +115,22 @@ type KeyWatcher interface {
Updates() <-chan KeyValueEntry
// Stop will stop this watcher.
Stop() error
// Error returns a channel that will receive any error that occurs during
// watching. In particular, this will receive an error if the watcher times
// out while expecting more initial keys. The channel is closed when the
// watch operation completes or when Stop() is called.
Error() <-chan error
}
// KeyLister is used to retrieve a list of key value store keys
type KeyLister interface {
Keys() <-chan string
Stop() error
// Error returns a channel that will receive any error that occurs during
// key listing. In particular, this will receive an error if the underlying
// watcher times out while expecting more keys. The channel is closed when
// the listing operation completes or when Stop() is called.
Error() <-chan error
}
type WatchOpt interface {
@@ -331,6 +341,7 @@ var (
ErrKeyDeleted = errors.New("nats: key was deleted")
ErrHistoryToLarge = errors.New("nats: history limited to a max of 64")
ErrNoKeysFound = errors.New("nats: no keys found")
ErrKeyWatcherTimeout = errors.New("nats: key watcher timed out waiting for initial keys")
)
var (
@@ -911,6 +922,10 @@ func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}
func (kl *keyLister) Error() <-chan error {
return kl.watcher.Error()
}
// History will return all values for the key.
func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
@@ -943,6 +958,7 @@ type watcher struct {
received uint64
ctx context.Context
initDoneTimer *time.Timer
errCh chan error
}
// Context returns the context for the watcher if set.
@@ -969,6 +985,16 @@ func (w *watcher) Stop() error {
return w.sub.Unsubscribe()
}
// Error returns a channel that will receive any error that occurs during watching.
func (w *watcher) Error() <-chan error {
if w == nil {
closedCh := make(chan error)
close(closedCh)
return closedCh
}
return w.errCh
}
// WatchAll watches all keys.
func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
return kv.Watch(AllKeys, opts...)
@@ -1006,7 +1032,11 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
}
// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
w := &watcher{
updates: make(chan KeyValueEntry, 256),
ctx: o.ctx,
errCh: make(chan error, 1),
}
update := func(m *Msg) {
tokens, err := parser.GetMetadataFields(m.Reply)
@@ -1108,6 +1138,10 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
defer w.mu.Unlock()
if !w.initDone {
w.initDone = true
select {
case w.errCh <- ErrKeyWatcherTimeout:
default:
}
w.updates <- nil
}
})
@@ -1124,6 +1158,7 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
w.initDoneTimer.Stop()
}
close(w.updates)
close(w.errCh)
}
sub.mu.Unlock()

View File

@@ -483,6 +483,54 @@ func TestKeyValueWatch(t *testing.T) {
t.Fatalf("Stop watcher did not return")
}
})
// Test channel-based error API integration with select patterns
t.Run("error channel with select", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
nc, js := jsClient(t, s)
defer nc.Close()
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH_ERROR"})
expectOk(t, err)
// Put some initial keys
_, err = kv.Put("test1", []byte("value1"))
expectOk(t, err)
_, err = kv.Put("test2", []byte("value2"))
expectOk(t, err)
watcher, err := kv.WatchAll()
expectOk(t, err)
defer watcher.Stop()
updateCount := 0
var watchCompleted bool
Outer:
for !watchCompleted {
select {
case entry := <-watcher.Updates():
if entry == nil {
break Outer
}
updateCount++
case err := <-watcher.Error():
if err != nil {
t.Fatalf("Unexpected error from watcher error channel: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("Timeout waiting for watcher completion")
}
}
if updateCount < 2 {
t.Fatalf("Expected at least 2 updates, got %d", updateCount)
}
})
}
func TestKeyValueWatchContext(t *testing.T) {
@@ -885,6 +933,56 @@ func TestKeyValueListKeys(t *testing.T) {
if _, ok := kmap["age"]; !ok {
t.Fatalf("Expected %q to be only key present", "age")
}
// Test channel-based error API patterns
t.Run("error channel with select", func(t *testing.T) {
keys, err := kv.ListKeys()
expectOk(t, err)
defer keys.Stop()
var keyList []string
var completed bool
for !completed {
select {
case key, ok := <-keys.Keys():
if !ok {
completed = true
break
}
keyList = append(keyList, key)
case err := <-keys.Error():
if err != nil {
t.Fatalf("Unexpected error from error channel: %v", err)
}
}
}
if len(keyList) != 1 {
t.Fatalf("Expected 1 key using select pattern, got %d", len(keyList))
}
})
t.Run("error check after completion", func(t *testing.T) {
keys, err := kv.ListKeys()
expectOk(t, err)
defer keys.Stop()
var keyList []string
for key := range keys.Keys() {
keyList = append(keyList, key)
}
// Check for errors after completion - should not block and return nil
if err := <-keys.Error(); err != nil {
t.Fatalf("Unexpected error after completion: %v", err)
}
if len(keyList) != 1 {
t.Fatalf("Expected 1 key after completion check, got %d", len(keyList))
}
})
}
func TestKeyValueCrossAccounts(t *testing.T) {