js: Replace JetStreamManager listers with iterators

This commit is contained in:
Jaime Piña
2021-03-09 13:23:59 -08:00
parent e61aa9efa2
commit 994c933b84
3 changed files with 140 additions and 91 deletions

6
js.go
View File

@@ -115,6 +115,7 @@ type JetStreamContext interface {
// js is an internal struct from a JetStreamContext.
type js struct {
ctx context.Context
nc *Conn
// For importing JetStream from other accounts.
pre string
@@ -363,6 +364,11 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
return nil
}
func (ctx ContextOpt) configureJSContext(opts *js) error {
opts.ctx = ctx
return nil
}
// Context returns an option that can be used to configure a context.
func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}

121
jsm.go
View File

@@ -37,11 +37,11 @@ type JetStreamManager interface {
// StreamInfo retrieves information from a stream.
StreamInfo(stream string) (*StreamInfo, error)
// Purge stream messages.
// PurgeStream purges a stream messages.
PurgeStream(name string) error
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo
// StreamNames is used to retrieve a list of Stream names.
StreamNames(ctx context.Context) <-chan string
@@ -58,11 +58,11 @@ type JetStreamManager interface {
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error
// ConsumerInfo retrieves consumer information.
// ConsumerInfo retrieves information of a consumer from a stream.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)
// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(ctx context.Context, stream string) <-chan string
@@ -273,9 +273,9 @@ func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}
// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
type consumerLister struct {
stream string
js *js
@@ -298,7 +298,7 @@ type consumerListResponse struct {
}
// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
@@ -340,18 +340,43 @@ func (c *ConsumerLister) Next() bool {
}
// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
func (c *consumerLister) Page() []*ConsumerInfo {
return c.page
}
// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
func (c *consumerLister) Err() error {
return c.err
}
// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
o, cancel, err := getJSContextOpts(jsc, opts...)
if err != nil {
return nil
}
ch := make(chan *ConsumerInfo)
l := &consumerLister{js: o, stream: stream}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()
return ch
}
type consumerNamesLister struct {
@@ -726,9 +751,9 @@ func (js *js) PurgeStream(name string) error {
return nil
}
// StreamLister fetches pages of StreamInfo objects. This object is not safe
// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
type streamLister struct {
js *js
page []*StreamInfo
err error
@@ -753,7 +778,7 @@ type streamNamesRequest struct {
}
// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
func (s *streamLister) Next() bool {
if s.err != nil {
return false
}
@@ -792,18 +817,43 @@ func (s *StreamLister) Next() bool {
}
// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
func (s *streamLister) Page() []*StreamInfo {
return s.page
}
// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
func (s *streamLister) Err() error {
return s.err
}
// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
o, cancel, err := getJSContextOpts(jsc, opts...)
if err != nil {
return nil
}
ch := make(chan *StreamInfo)
l := &streamLister{js: o}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()
return ch
}
type streamNamesLister struct {
@@ -884,3 +934,30 @@ func (js *js) StreamNames(ctx context.Context) <-chan string {
return ch
}
func getJSContextOpts(defs *js, opts ...JSOpt) (*js, context.CancelFunc, error) {
var o js
for _, opt := range opts {
if err := opt.configureJSContext(&o); err != nil {
return nil, nil, err
}
}
// Check for option collisions. Right now just timeout and context.
if o.ctx != nil && o.wait != 0 {
return nil, nil, ErrContextAndTimeout
}
if o.wait == 0 && o.ctx == nil {
o.wait = defs.wait
}
var cancel context.CancelFunc
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
o.pre = defs.pre
}
o.nc = defs.nc
return &o, cancel, nil
}

View File

@@ -253,22 +253,15 @@ func TestJetStreamSubscribe(t *testing.T) {
expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
infos = append(infos, info)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
}
return p
return infos
}
// Create the stream using our client API.
@@ -1106,43 +1099,30 @@ func TestJetStreamManagement(t *testing.T) {
})
t.Run("list streams", func(t *testing.T) {
sl := js.NewStreamLister()
if !sl.Next() {
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
t.Fatalf("Unexpected stream lister next")
}
if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", p)
}
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != 1 || infos[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", infos)
}
})
t.Run("list consumers", func(t *testing.T) {
if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("") {
infos = append(infos, info)
}
t.Fatalf("Unexpected nil error")
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
infos = infos[:0]
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
t.Fatalf("Unexpected consumer lister next")
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
})
@@ -2570,22 +2550,15 @@ func TestJetStream_Unsubscribe(t *testing.T) {
fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
}
return p
return infos
}
js.Publish("foo.A", []byte("A"))
@@ -2708,22 +2681,15 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := jsm.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
var infos []*nats.ConsumerInfo
for info := range jsm.ConsumersInfo("foo") {
infos = append(infos, info)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
}
return p
return infos
}
t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {