From ac8b51a3abd8a60b50efbe8dd6e05e696016cb83 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 23 Feb 2021 14:25:58 -0800 Subject: [PATCH] js: Add js.PullSubscribe and sub.Fetch APIs for pull consumers Signed-off-by: Waldemar Quevedo --- go.sum | 10 - js.go | 396 ++++++++++++++++++++++++----- nats.go | 15 +- test/js_test.go | 649 ++++++++++++++++++++++++++++-------------------- 4 files changed, 717 insertions(+), 353 deletions(-) diff --git a/go.sum b/go.sum index bb67107..84350ed 100644 --- a/go.sum +++ b/go.sum @@ -9,25 +9,20 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA= github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.3-0.20200519195258-f2bf5ce574c7/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= -github.com/nats-io/jwt v1.1.0 h1:+vOlgtM0ZsF46GbmUoadq0/2rChNS45gtxHEa3H1gqM= github.com/nats-io/jwt v1.1.0/go.mod h1:n3cvmLfBfnpV4JJRN7lRYCyZnw48ksGsbThGXEk4w9M= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.0.0-20200916203241-1f8ce17dff02/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= github.com/nats-io/jwt/v2 v2.0.0-20201015190852-e11ce317263c/go.mod h1:vs+ZEjP+XKy8szkBmQwCB7RjYdIlMaPsFPs4VdS4bTQ= -github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc h1:pu+s4XC+bYnI0iD2vDtOl83zjCYUau/q6c83pEvsGZc= github.com/nats-io/jwt/v2 v2.0.0-20210125223648-1c24d462becc/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= -github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813 h1:km4lLzT86NyJRhO++VqfP/vn5cbfm+E05i2bGdqDbrY= github.com/nats-io/jwt/v2 v2.0.0-20210208203759-ff814ca5f813/go.mod h1:PuO5FToRL31ecdFqVjc794vK0Bj0CwzveQEDvkb7MoQ= github.com/nats-io/jwt/v2 v2.0.1 h1:SycklijeduR742i/1Y3nRhURYM7imDzZZ3+tuAQqhQA= github.com/nats-io/jwt/v2 v2.0.1/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= @@ -35,9 +30,7 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200524125952-51ebd92a9093/go.mod h1 github.com/nats-io/nats-server/v2 v2.1.8-0.20200601203034-f8d6dd992b71/go.mod h1:Nan/1L5Sa1JRW+Thm4HNYcIDcVRFc5zK9OpSZeI2kk4= github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1:TkHpUIDETmTI7mrHN40D1pzxfzHZuGmtMbtb83TGVQw= github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= -github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4 h1:GStuc0W1rK45FSlpt3+7UTLzmRys2/6WSDuJFyzZ6Xg= github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= -github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8 h1:jPZZofsCevE2oJl3YexVw3drWOFdo8H4AWMb/1WcVoc= github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= github.com/nats-io/nats-server/v2 v2.2.1-0.20210317155042-716c6d60b593 h1:ui2nvqPBr0GNQMRYyndCUJy/zHSe8yXvhsmk+Ky0tuY= github.com/nats-io/nats-server/v2 v2.2.1-0.20210317155042-716c6d60b593/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= @@ -50,7 +43,6 @@ github.com/nats-io/nats.go v1.10.1-0.20210211000709-75ded9c77585/go.mod h1:uBWnC github.com/nats-io/nats.go v1.10.1-0.20210228004050-ed743748acac/go.mod h1:hxFvLNbNmT6UppX5B5Tr/r3g+XSwGjJzFn6mxPNJEHc= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.2.0 h1:WXKF7diOaPU9cJdLD7nuzwasQy9vT1tBqzXZZf3AMJM= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= @@ -59,7 +51,6 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= @@ -69,7 +60,6 @@ golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/js.go b/js.go index 21faa29..6a2dc2c 100644 --- a/js.go +++ b/js.go @@ -1,4 +1,4 @@ -// Copyright 2020 The NATS Authors +// Copyright 2020-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -105,6 +105,9 @@ type JetStream interface { // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) + + // PullSubscribe creates a Subscription that can fetch messages. + PullSubscribe(subj string, opts ...SubOpt) (*Subscription, error) } // JetStreamContext is the public interface for JetStream. @@ -351,6 +354,11 @@ func (ttl MaxWait) configureJSContext(js *jsOpts) error { return nil } +func (ttl MaxWait) configurePull(opts *pullOpts) error { + opts.ttl = time.Duration(ttl) + return nil +} + // AckWait sets the maximum amount of time we will wait for an ack. type AckWait time.Duration @@ -369,12 +377,17 @@ type ContextOpt struct { context.Context } +func (ctx ContextOpt) configureJSContext(opts *jsOpts) error { + opts.ctx = ctx + return nil +} + func (ctx ContextOpt) configurePublish(opts *pubOpts) error { opts.ctx = ctx return nil } -func (ctx ContextOpt) configureJSContext(opts *jsOpts) error { +func (ctx ContextOpt) configurePull(opts *pullOpts) error { opts.ctx = ctx return nil } @@ -427,9 +440,9 @@ type SequencePair struct { // nextRequest is for getting next messages for pull based consumers from JetStream. type nextRequest struct { - Expires *time.Time `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - NoWait bool `json:"no_wait,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + NoWait bool `json:"no_wait,omitempty"` } // jsSub includes JetStream subscription info. @@ -438,7 +451,7 @@ type jsSub struct { consumer string stream string deliver string - pull int + pull bool durable bool attached bool } @@ -473,6 +486,9 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error { // Subscribe will create a subscription to the appropriate stream and consumer. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { + if cb == nil { + return nil, ErrBadSubscription + } return js.subscribe(subj, _EMPTY_, cb, nil, opts) } @@ -484,6 +500,9 @@ func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics. func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) { + if cb == nil { + return nil, ErrBadSubscription + } return js.subscribe(subj, queue, cb, nil, opts) } @@ -498,6 +517,11 @@ func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscri return js.subscribe(subj, _EMPTY_, nil, ch, opts) } +// PullSubscribe creates a pull subscriber. +func (js *js) PullSubscribe(subj string, opts ...SubOpt) (*Subscription, error) { + return js.subscribe(subj, _EMPTY_, nil, nil, opts) +} + func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []SubOpt) (*Subscription, error) { cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet} o := subOpts{cfg: &cfg} @@ -509,11 +533,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] } } - isPullMode := o.pull > 0 - if cb != nil && isPullMode { - return nil, ErrPullModeNotAllowed - } - + isPullMode := ch == nil && cb == nil badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy if isPullMode && badPullAck { return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) @@ -592,9 +612,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] ocb := cb cb = func(m *Msg) { ocb(m); m.Ack() } } - sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) - if err != nil { - return nil, err + + if isPullMode { + sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}} + } else { + sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) + if err != nil { + return nil, err + } } // If we are creating or updating let's process that request. @@ -668,7 +693,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] // If we are pull based go ahead and fire off the first request to populate. if isPullMode { sub.jsi.pull = o.pull - sub.Poll() } return sub, nil @@ -710,38 +734,14 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) { type subOpts struct { // For attaching. stream, consumer string - // For pull based consumers, batch size for pull - pull int + // For pull based consumers. + pull bool // For manual ack mack bool // For creating or updating. cfg *ConsumerConfig } -// Pull defines the batch size of messages that will be received -// when using pull based JetStream consumers. -func Pull(batchSize int) SubOpt { - return subOptFn(func(opts *subOpts) error { - if batchSize == 0 { - return errors.New("nats: batch size of 0 not valid") - } - opts.pull = batchSize - return nil - }) -} - -func PullDirect(stream, consumer string, batchSize int) SubOpt { - return subOptFn(func(opts *subOpts) error { - if batchSize == 0 { - return errors.New("nats: batch size of 0 not valid") - } - opts.stream = stream - opts.consumer = consumer - opts.pull = batchSize - return nil - }) -} - // ManualAck disables auto ack functionality for async subscriptions. func ManualAck() SubOpt { return subOptFn(func(opts *subOpts) error { @@ -881,21 +881,296 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { return js.getConsumerInfo(stream, consumer) } -func (sub *Subscription) Poll() error { - sub.mu.Lock() - if sub.jsi == nil || sub.jsi.deliver != _EMPTY_ || sub.jsi.pull == 0 { - sub.mu.Unlock() - return ErrTypeSubscription +type pullOpts struct { + ttl time.Duration + ctx context.Context +} + +type PullOpt interface { + configurePull(opts *pullOpts) error +} + +// PullMaxWaiting defines the max inflight pull requests to be delivered more messages. +func PullMaxWaiting(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxWaiting = n + return nil + }) +} + +var errNoMessages = errors.New("nats: no messages") + +// Fetch pulls a batch of messages from a stream for a pull consumer. +func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { + if sub == nil { + return nil, ErrBadSubscription } - batch := sub.jsi.pull - nc, reply := sub.conn, sub.Subject + + var o pullOpts + for _, opt := range opts { + if err := opt.configurePull(&o); err != nil { + return nil, err + } + } + if o.ctx != nil && o.ttl != 0 { + return nil, ErrContextAndTimeout + } + + sub.mu.Lock() + if sub.jsi == nil || sub.typ != PullSubscription { + sub.mu.Unlock() + return nil, ErrTypeSubscription + } + + nc, _ := sub.conn, sub.Subject stream, consumer := sub.jsi.stream, sub.jsi.consumer js := sub.jsi.js + + ttl := o.ttl + if ttl == 0 { + ttl = js.opts.wait + } sub.mu.Unlock() - req, _ := json.Marshal(&nextRequest{Batch: batch}) - reqNext := js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer)) - return nc.PublishRequest(reqNext, reply, req) + // Use the given context or setup a default one for the span + // of the pull batch request. + var ( + ctx = o.ctx + err error + cancel context.CancelFunc + ) + if o.ctx == nil { + ctx, cancel = context.WithTimeout(context.Background(), ttl) + defer cancel() + } + + // Check if context not done already before making the request. + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + return nil, err + } + + // Check for empty payload message and process synchronously + // any status messages. + checkMsg := func(msg *Msg) error { + if len(msg.Data) == 0 { + switch msg.Header.Get(statusHdr) { + case noResponders: + return ErrNoResponders + case noMessages: + return errNoMessages + case "400", "408", "409": + return errors.New(msg.Header.Get(descrHdr)) + } + } + return nil + } + + checkCtxErr := func(err error) error { + if o.ctx == nil && err == context.DeadlineExceeded { + return ErrTimeout + } + return err + } + + var ( + gotNoMessages bool + nr = &nextRequest{Batch: batch, NoWait: true} + req, _ = json.Marshal(nr) + reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer)) + expires = ttl - 10*time.Millisecond + msgs = make([]*Msg, 0) + ) + + // In case of only one message, then can already handle with built-in request functions. + if batch == 1 { + resp, err := nc.RequestWithContext(ctx, reqNext, req) + if err != nil { + return nil, checkCtxErr(err) + } + + // In case of a no messages instant error, then fallback + // into longer version of pull batch request. + err = checkMsg(resp) + if err != nil { + if err == errNoMessages { + // Use old request style for the retry of the pull request + // in order to use auto UNSUB 1 to prevent the server + // from delivering a message when there is no more interest. + nr.NoWait = false + nr.Expires = expires + req, _ = json.Marshal(nr) + resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req) + if err != nil { + return nil, checkCtxErr(err) + } + + // This next message, could also be an error + // (e.g. 408 due to request timeout). + err = checkMsg(resp) + if err != nil { + return nil, err + } + return []*Msg{resp}, nil + } else { + // Hard error + return nil, checkCtxErr(err) + } + } + return []*Msg{resp}, nil + } + + // Setup a request where we will wait for the first response + // in case of errors, then dispatch the rest of the replies + // to the channel. + inbox := NewInbox() + + mch := make(chan *Msg, batch) + s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil) + if err != nil { + return nil, err + } + + // Remove interest in the subscription at the end so that the + // this inbox does not get delivered the results intended + // for another request. + defer s.Unsubscribe() + + // Make a publish request to get results of the pull. + err = nc.publish(reqNext, inbox, nil, req) + if err != nil { + s.Unsubscribe() + return nil, err + } + + // Try to get the first message or error with NoWait. + var ( + firstMsg *Msg + ok bool + ) + select { + case firstMsg, ok = <-mch: + if !ok { + err = s.getNextMsgErr() + } else { + err = s.processNextMsgDelivered(firstMsg) + if err == nil { + err = checkMsg(firstMsg) + } + } + case <-ctx.Done(): + err = checkCtxErr(ctx.Err()) + } + + // If the first error is 'no more messages', then switch into + // longer form version of the request that waits for messages. + if err == errNoMessages { + gotNoMessages = true + } else if err != nil { + // We should be getting the response from the server + // in case we got a poll error, so stop and cleanup. + s.Unsubscribe() + return nil, err + } + + if gotNoMessages { + // We started with a 404 response right away, so fallback into + // second request that waits longer for messages to delivered. + nr.NoWait = false + nr.Expires = expires + req, _ = json.Marshal(nr) + + // Since first message was an error we UNSUB (batch+1) + // since we are counting it as the first message. + err = s.AutoUnsubscribe(batch + 1) + if err != nil { + return nil, err + } + + // Make another request and wait for the messages... + err = nc.publish(reqNext, inbox, nil, req) + if err != nil { + s.Unsubscribe() + return nil, err + } + + // Try to get the first result again or return the error. + select { + case firstMsg, ok = <-mch: + if !ok { + err = s.getNextMsgErr() + } else { + err = s.processNextMsgDelivered(firstMsg) + if err == nil { + err = checkMsg(firstMsg) + } + } + case <-ctx.Done(): + err = checkCtxErr(ctx.Err()) + } + if err != nil { + s.Unsubscribe() + return nil, err + } + // Check again if the delivered next message is a status error. + err = checkMsg(firstMsg) + if err != nil { + s.Unsubscribe() + return nil, err + } + } else { + // We are receiving messages at this point. Send UNSUB to let + // the server clear interest once enough replies are delivered. + err = s.AutoUnsubscribe(batch) + if err != nil { + return nil, err + } + } + + msgs = append(msgs, firstMsg) + for { + var ( + msg *Msg + ok bool + ) + select { + case msg, ok = <-mch: + if !ok { + err = s.getNextMsgErr() + } else { + err = s.processNextMsgDelivered(msg) + if err == nil { + err = checkMsg(msg) + } + } + case <-ctx.Done(): + return msgs, checkCtxErr(err) + } + if err != nil { + // Discard the error which may have been a timeout + // or 408 request timeout status from the server, + // and just the return delivered messages. + break + } + if msg != nil { + msgs = append(msgs, msg) + } + + if len(msgs) == batch { + // Done! + break + } + } + + return msgs, nil } func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { @@ -940,7 +1215,7 @@ func (m *Msg) checkReply() (*js, bool, error) { return nil, false, nil } js := sub.jsi.js - isPullMode := sub.jsi.pull > 0 + isPullMode := sub.jsi.pull sub.mu.Unlock() return js, isPullMode, nil @@ -956,7 +1231,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { return err } } - js, isPullMode, err := m.checkReply() + js, _, err := m.checkReply() if err != nil { return err } @@ -978,20 +1253,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error { wait = js.opts.wait } - if isPullMode { - if bytes.Equal(ackType, AckAck) { - err = nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext) - } else if bytes.Equal(ackType, AckNak) || bytes.Equal(ackType, AckTerm) { - err = nc.PublishRequest(m.Reply, m.Sub.Subject, []byte("+NXT {\"batch\":1}")) - } - if sync && err == nil { - if ctx != nil { - _, err = nc.RequestWithContext(ctx, m.Reply, nil) - } else { - _, err = nc.Request(m.Reply, nil, wait) - } - } - } else if sync { + if sync { if ctx != nil { _, err = nc.RequestWithContext(ctx, m.Reply, ackType) } else { diff --git a/nats.go b/nats.go index b19021a..46a8d06 100644 --- a/nats.go +++ b/nats.go @@ -2823,6 +2823,7 @@ const ( statusHdr = "Status" descrHdr = "Description" noResponders = "503" + noMessages = "404" statusLen = 3 // e.g. 20x, 40x, 50x ) @@ -3284,8 +3285,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) { return nil, ErrInvalidConnection } mch := make(chan *Msg, nc.Opts.SubChanLen) - s, e := nc.subscribe(subj, _EMPTY_, nil, mch, true, nil) - return s, e + return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil) } // QueueSubscribe creates an asynchronous queue subscriber on the given subject. @@ -3302,8 +3302,7 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription // given message synchronously using Subscription.NextMsg(). func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) { mch := make(chan *Msg, nc.Opts.SubChanLen) - s, e := nc.subscribe(subj, queue, nil, mch, true, nil) - return s, e + return nc.subscribe(subj, queue, nil, mch, true, nil) } // QueueSubscribeSyncWithChan will express interest in the given subject. @@ -3447,6 +3446,7 @@ const ( SyncSubscription ChanSubscription NilSubscription + PullSubscription ) // Type returns the type of Subscription. @@ -3715,7 +3715,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { s.mu.Lock() nc := s.conn max := s.max - jsi := s.jsi // Update some stats. s.delivered++ @@ -3738,12 +3737,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { } } - // In case this is a JetStream message and in pull mode - // then check whether it is an JS API error. - if jsi != nil && jsi.pull > 0 && len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders { - return ErrNoResponders - } - return nil } diff --git a/test/js_test.go b/test/js_test.go index 7326619..e2b5da1 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -457,18 +457,30 @@ func TestJetStreamSubscribe(t *testing.T) { // Now try pull based subscribers. // Check some error conditions first. - if _, err := js.Subscribe("bar", func(m *nats.Msg) {}, nats.Pull(1)); err != nats.ErrPullModeNotAllowed { - t.Fatalf("Expected an error trying to do PullMode on callback based subscriber, got %v", err) + if _, err := js.Subscribe("bar", nil); err != nats.ErrBadSubscription { + t.Fatalf("Expected an error trying to create subscriber with nil callback, got %v", err) + } + + // Durable name is required for now. + sub, err = js.PullSubscribe("bar") + if err == nil { + t.Fatalf("Unexpected success") + } + if err != nil && err.Error() != `consumer in pull mode requires a durable name` { + t.Errorf("Expected consumer in pull mode error, got %v", err) } batch := 5 - sub, err = js.SubscribeSync("bar", nats.Durable("rip"), nats.Pull(batch)) + sub, err = js.PullSubscribe("bar", nats.Durable("rip")) if err != nil { t.Fatalf("Unexpected error: %v", err) } // The first batch if available should be delivered and queued up. - waitForPending(t, batch) + bmsgs, err := sub.Fetch(batch) + if err != nil { + t.Fatal(err) + } if info, _ := sub.ConsumerInfo(); info.NumAckPending != batch || info.NumPending != uint64(batch) { t.Fatalf("Expected %d pending ack, and %d still waiting to be delivered, got %d and %d", batch, batch, info.NumAckPending, info.NumPending) @@ -476,27 +488,39 @@ func TestJetStreamSubscribe(t *testing.T) { // Now go ahead and consume these and ack, but not ack+next. for i := 0; i < batch; i++ { - m, err := sub.NextMsg(10 * time.Millisecond) + m := bmsgs[i] + err = m.Ack() if err != nil { - t.Fatalf("Unexpected error: %v", err) + t.Fatal(err) } - m.Respond(nats.AckAck) } if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) } - - // Now we are stuck so to speak. So we can unstick the sub by calling poll. waitForPending(t, 0) - sub.Poll() - waitForPending(t, batch) + + // Make a request for 10 but should only receive a few. + bmsgs, err = sub.Fetch(10, nats.MaxWait(2*time.Second)) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + got := len(bmsgs) + expected := 5 + if got != expected { + t.Errorf("Expected: %v, got: %v", expected, got) + } + + for _, msg := range bmsgs { + msg.Ack() + } + sub.Drain() // Now test attaching to a pull based durable. // Test that if we are attaching that the subjects will match up. rip from // above was created with a filtered subject of bar, so this should fail. - _, err = js.SubscribeSync("baz", nats.Durable("rip"), nats.Pull(batch)) + _, err = js.PullSubscribe("baz", nats.Durable("rip")) if err != nats.ErrSubjectMismatch { t.Fatalf("Expected a %q error but got %q", nats.ErrSubjectMismatch, err) } @@ -506,49 +530,30 @@ func TestJetStreamSubscribe(t *testing.T) { js.Publish("bar", msg) } - sub, err = js.SubscribeSync("bar", nats.Durable("rip"), nats.Pull(batch)) + sub, err = js.PullSubscribe("bar", nats.Durable("rip")) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer sub.Unsubscribe() - waitForPending(t, batch) + // Fetch messages a couple of times. + expected = 5 + bmsgs, err = sub.Fetch(expected, nats.MaxWait(2*time.Second)) + if err != nil { + t.Fatal(err) + } + + got = len(bmsgs) + if got != expected { + t.Errorf("Expected: %v, got: %v", expected, got) + } info, err = sub.ConsumerInfo() if err != nil { t.Fatalf("Unexpected error: %v", err) } - if info.NumAckPending != batch*2 || info.NumPending != uint64(toSend-batch) { - t.Fatalf("Expected ack pending of %d and pending to be %d, got %d %d", batch*2, toSend-batch, info.NumAckPending, info.NumPending) - } - - // Create a new pull based consumer. - batch = 1 - msgs := make(chan *nats.Msg, 100) - sub, err = js.ChanSubscribe("baz", msgs, nats.Durable("dlc"), nats.Pull(batch)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Since this sub is on 'baz' no messages are waiting for us to start. - waitForPending(t, 0) - - // Now send in 10 messages to baz. - for i := 0; i < toSend; i++ { - js.Publish("baz", msg) - } - // We should get 1 queued up. - waitForPending(t, batch) - - for received := 0; received < toSend; { - select { - case m := <-msgs: - received++ - // This will do the AckNext version since it knows we are pull based. - m.Ack() - case <-time.After(time.Second): - t.Fatalf("Timeout waiting for messages") - } + if info.NumAckPending != batch || info.NumPending != uint64(toSend-batch) { + t.Fatalf("Expected ack pending of %d and pending to be %d, got %d %d", batch, toSend-batch, info.NumAckPending, info.NumPending) } // Prevent invalid durable names @@ -612,12 +617,13 @@ func TestJetStreamAckPending_Pull(t *testing.T) { } } - sub, err := js.SubscribeSync("foo", + ackPendingLimit := 3 + + sub, err := js.PullSubscribe("foo", nats.Durable("dname-pull-ack-wait"), nats.AckWait(100*time.Millisecond), nats.MaxDeliver(5), - nats.MaxAckPending(3), - nats.Pull(15), + nats.MaxAckPending(ackPendingLimit), ) if err != nil { t.Fatal(err) @@ -626,15 +632,29 @@ func TestJetStreamAckPending_Pull(t *testing.T) { // 3 messages delivered 5 times. expected := 15 - timeout := time.Now().Add(2 * time.Second) + + // Fetching more than ack pending is an error. + _, err = sub.Fetch(expected) + if err == nil { + t.Fatalf("Unexpected success fetching more messages than ack pending limit") + } + pending := 0 + msgs := make([]*nats.Msg, 0) + timeout := time.Now().Add(2 * time.Second) for time.Now().Before(timeout) { - if pending, _, _ = sub.Pending(); pending >= expected { + ms, err := sub.Fetch(ackPendingLimit) + if err != nil || (ms != nil && len(ms) == 0) { + continue + } + + msgs = append(msgs, ms...) + if len(msgs) >= expected { break } time.Sleep(10 * time.Millisecond) } - if pending < expected { + if len(msgs) < expected { t.Errorf("Expected %v, got %v", expected, pending) } @@ -680,10 +700,8 @@ func TestJetStreamAckPending_Pull(t *testing.T) { } acks := map[int]int{} - ackPending := 3 - timeout = time.Now().Add(2 * time.Second) - for time.Now().Before(timeout) { + for _, m := range msgs { info, err := sub.ConsumerInfo() if err != nil { t.Fatal(err) @@ -692,17 +710,6 @@ func TestJetStreamAckPending_Pull(t *testing.T) { t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) } - // Continue to ack all messages until no more pending. - pending, _, _ = sub.Pending() - if pending == 0 { - break - } - - m, err := sub.NextMsg(100 * time.Millisecond) - if err != nil { - t.Fatalf("Error getting next message: %v", err) - } - if err := m.AckSync(); err != nil { t.Fatalf("Error on ack message: %v", err) } @@ -734,7 +741,7 @@ func TestJetStreamAckPending_Pull(t *testing.T) { } } - _, err = sub.NextMsg(100 * time.Millisecond) + _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)) if err != nats.ErrTimeout { t.Errorf("Expected timeout, got: %v", err) } @@ -1695,26 +1702,6 @@ func TestJetStreamImportDirectOnly(t *testing.T) { t.Errorf("Unexpected error: %v", err) } } - - // Now pull based consumer. - batch := 10 - sub, err = js.SubscribeSync("ORDERS", nats.PullDirect("ORDERS", "d1", batch)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - waitForPending(t, batch) - - for i := 0; i < toSend; i++ { - m, err := sub.NextMsg(100 * time.Millisecond) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - // Tests that acks flow since we need these to do AckNext for this to work. - err = m.Ack() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - } } func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { @@ -2010,28 +1997,6 @@ func TestJetStreamPullBasedStall(t *testing.T) { nc.Publish("STALL", msg) } nc.Flush() - - batch := 100 - msgs := make(chan *nats.Msg, batch-2) - sub, err := js.ChanSubscribe("STALL", msgs, nats.Durable("dlc"), nats.Pull(batch)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer sub.Unsubscribe() - - for received := 0; received < toSend; { - select { - case m := <-msgs: - received++ - meta, _ := m.MetaData() - if meta.Consumer != uint64(received) { - t.Fatalf("Missed something, wanted %d but got %d", received, meta.Consumer) - } - m.Ack() - case <-time.After(time.Second): - t.Fatalf("Timeout waiting for messages, last received was %d", received) - } - } } func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { @@ -2589,16 +2554,17 @@ func TestJetStream_Unsubscribe(t *testing.T) { }); err != nil { t.Fatalf("Unexpected error: %v", err) } - subC, err := js.SubscribeSync("foo.C", nats.Durable("wq"), nats.Pull(1)) + subC, err := js.PullSubscribe("foo.C", nats.Durable("wq")) if err != nil { t.Fatalf("Unexpected error: %v", err) } fetchConsumers(t, 1) - msg, err := subC.NextMsg(2 * time.Second) + msgs, err := subC.Fetch(1, nats.MaxWait(2*time.Second)) if err != nil { t.Errorf("Unexpected error getting message: %v", err) } + msg := msgs[0] got := string(msg.Data) expected := "C" if got != expected { @@ -3563,18 +3529,6 @@ func TestJetStream_ClusterReconnect(t *testing.T) { replicas := []int{1, 3} t.Run("pull sub", func(t *testing.T) { - for _, r := range replicas { - t.Run(fmt.Sprintf("n=%d r=%d", n, r), func(t *testing.T) { - stream := &nats.StreamConfig{ - Name: fmt.Sprintf("foo-r%d", r), - Replicas: r, - } - withJSClusterAndStream(t, fmt.Sprintf("PULLR%d", r), n, stream, testJetStream_ClusterReconnectPullSubscriber) - }) - } - }) - - t.Run("pull qsub", func(t *testing.T) { for _, r := range replicas { t.Run(fmt.Sprintf("n=%d r=%d", n, r), func(t *testing.T) { stream := &nats.StreamConfig{ @@ -3599,142 +3553,16 @@ func TestJetStream_ClusterReconnect(t *testing.T) { }) t.Run("qsub durable", func(t *testing.T) { - r := 3 - t.Run(fmt.Sprintf("n=%d r=%d", n, r), func(t *testing.T) { - stream := &nats.StreamConfig{ - Name: fmt.Sprintf("bar-r%d", r), - Replicas: r, - } - withJSClusterAndStream(t, fmt.Sprintf("QSUBR%d", r), n, stream, testJetStream_ClusterReconnectDurableQueueSubscriber) - }) - }) -} - -func testJetStream_ClusterReconnectPullSubscriber(t *testing.T, subject string, srvs ...*jsServer) { - var ( - recvd int - srvA = srvs[0] - totalMsgs = 20 - durable = nats.Durable("d1") - reconnected = make(chan struct{}, 2) - reconnectDone bool - ) - nc, err := nats.Connect(srvA.ClientURL(), - nats.ReconnectHandler(func(nc *nats.Conn) { - reconnected <- struct{}{} - - // Bring back the server after the reconnect event. - if !reconnectDone { - reconnectDone = true - srvA.Restart() - } - }), - ) - if err != nil { - t.Error(err) - } - defer nc.Close() - - js, err := nc.JetStream() - if err != nil { - t.Error(err) - } - - for i := 0; i < 10; i++ { - payload := fmt.Sprintf("i:%d", i) - _, err := js.Publish(subject, []byte(payload)) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - } - - sub, err := js.SubscribeSync(subject, durable, nats.Pull(1)) - if err != nil { - t.Error(err) - } - - for i := 10; i < totalMsgs; i++ { - payload := fmt.Sprintf("i:%d", i) - _, err := js.Publish(subject, []byte(payload)) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - } - - ctx, done := context.WithTimeout(context.Background(), 10*time.Second) - defer done() - -NextMsg: - for recvd < totalMsgs { - select { - case <-ctx.Done(): - t.Fatalf("Timeout waiting for messages, expected: %d, got: %d", totalMsgs, recvd) - default: - } - - pending, _, _ := sub.Pending() - if pending == 0 { - err = sub.Poll() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - } - - // Server will shutdown after a couple of messages which will result - // in empty messages with an status unavailable error. - msg, err := sub.NextMsg(2 * time.Second) - if err == nats.ErrNoResponders || err == nats.ErrTimeout { - // Backoff before asking for more messages. - time.Sleep(100 * time.Millisecond) - continue NextMsg - } else if err != nil { - t.Errorf("Unexpected error: %v", err) - continue NextMsg - } - - if len(msg.Data) == 0 && msg.Header.Get("Status") == "503" { - t.Fatal("Got 503 JetStream API message!") - } - - got := string(msg.Data) - expected := fmt.Sprintf("i:%d", recvd) - if got != expected { - // Missed a message, but continue checking for the rest. - recvd++ - t.Logf("WARN: Expected %v, got: %v", expected, got) - } - - // Add a few retries since there can be errors during the reconnect. - timeout := time.Now().Add(5 * time.Second) - RetryAck: - for time.Now().Before(timeout) { - err = msg.AckSync() - if err != nil { - // During the reconnection, both of these errors can occur. - if err == nats.ErrNoResponders || err == nats.ErrTimeout { - // Wait for reconnection event to occur to continue. - select { - case <-reconnected: - continue RetryAck - case <-time.After(100 * time.Millisecond): - continue RetryAck - case <-ctx.Done(): - t.Fatal("Timed out waiting for reconnect") - } + for _, r := range replicas { + t.Run(fmt.Sprintf("n=%d r=%d", n, r), func(t *testing.T) { + stream := &nats.StreamConfig{ + Name: fmt.Sprintf("bar-r%d", r), + Replicas: r, } - - t.Errorf("Unexpected error: %v", err) - continue RetryAck - } - break RetryAck + withJSClusterAndStream(t, fmt.Sprintf("QSUBR%d", r), n, stream, testJetStream_ClusterReconnectDurableQueueSubscriber) + }) } - recvd++ - - // Shutdown the server after a couple of messages. - if recvd == 2 { - srvA.Shutdown() - } - } + }) } func testJetStream_ClusterReconnectDurableQueueSubscriber(t *testing.T, subject string, srvs ...*jsServer) { @@ -4044,7 +3872,7 @@ func testJetStream_ClusterReconnectPullQueueSubscriber(t *testing.T, subject str subs := make([]*nats.Subscription, 0) for i := 0; i < 5; i++ { - sub, err := js.QueueSubscribeSync(subject, "wq", durable, nats.Pull(1)) + sub, err := js.PullSubscribe(subject, durable, nats.PullMaxWaiting(5)) if err != nil { t.Fatal(err) } @@ -4071,16 +3899,9 @@ NextMsg: } for qsub, sub := range subs { - if pending, _, _ := sub.Pending(); pending == 0 { - err = sub.Poll() - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - } - // Server will shutdown after a couple of messages which will result // in empty messages with an status unavailable error. - msg, err := sub.NextMsg(2 * time.Second) + msgs, err := sub.Fetch(1, nats.MaxWait(2*time.Second)) if err == nats.ErrNoResponders || err == nats.ErrTimeout { // Backoff before asking for more messages. time.Sleep(100 * time.Millisecond) @@ -4089,6 +3910,11 @@ NextMsg: t.Errorf("Unexpected error: %v", err) continue NextMsg } + msg := msgs[0] + if len(msg.Data) == 0 && msg.Header.Get("Status") == "503" { + t.Fatal("Got 503 JetStream API message!") + } + recvd[string(msg.Data)]++ recvdQ[qsub] = append(recvdQ[qsub], msg) @@ -4158,3 +3984,296 @@ func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) { t.Fatal(err.Error()) } } + +func TestJetStreamPullSubscribeOptions(t *testing.T) { + withJSCluster(t, "FOPTS", 3, testJetStreamFetchOptions) +} + +func testJetStreamFetchOptions(t *testing.T, srvs ...*jsServer) { + srv := srvs[0] + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Error(err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatal(err) + } + + subject := "WQ" + _, err = js.AddStream(&nats.StreamConfig{ + Name: subject, + Replicas: 1, + }) + if err != nil { + t.Fatal(err) + } + + sendMsgs := func(t *testing.T, totalMsgs int) { + t.Helper() + for i := 0; i < totalMsgs; i++ { + payload := fmt.Sprintf("i:%d", i) + _, err := js.Publish(subject, []byte(payload)) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + } + } + + t.Run("batch size", func(t *testing.T) { + defer js.PurgeStream(subject) + + expected := 10 + sendMsgs(t, expected) + sub, err := js.PullSubscribe(subject, nats.Durable("batch-size")) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + msgs, err := sub.Fetch(expected, nats.MaxWait(2*time.Second)) + if err != nil { + t.Fatal(err) + } + + for _, msg := range msgs { + msg.AckSync() + } + + got := len(msgs) + if got != expected { + t.Fatalf("Got %v messages, expected at least: %v", got, expected) + } + + // Next fetch will timeout since no more messages. + _, err = sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout fetching next message, got: %v", err) + } + + expected = 5 + sendMsgs(t, expected) + msgs, err = sub.Fetch(expected, nats.MaxWait(1*time.Second)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + got = len(msgs) + if got != expected { + t.Fatalf("Got %v messages, expected at least: %v", got, expected) + } + + for _, msg := range msgs { + msg.Ack() + } + }) + + t.Run("sub drain is no op", func(t *testing.T) { + defer js.PurgeStream(subject) + + expected := 10 + sendMsgs(t, expected) + sub, err := js.PullSubscribe(subject, nats.Durable("batch-ctx")) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + msgs, err := sub.Fetch(expected, nats.MaxWait(2*time.Second)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + got := len(msgs) + if got != expected { + t.Fatalf("Got %v messages, expected at least: %v", got, expected) + } + err = sub.Drain() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + }) + + t.Run("pull with context", func(t *testing.T) { + defer js.PurgeStream(subject) + + expected := 10 + sendMsgs(t, expected) + sub, err := js.PullSubscribe(subject, nats.Durable("batch-ctx")) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Should fail with expired context. + _, err = sub.Fetch(expected, nats.Context(ctx)) + if err == nil { + t.Fatal("Unexpected success") + } + if err != context.Canceled { + t.Errorf("Expected context deadline exceeded error, got: %v", err) + } + + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + msgs, err := sub.Fetch(expected, nats.Context(ctx)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + got := len(msgs) + if got != expected { + t.Fatalf("Got %v messages, expected at least: %v", got, expected) + } + + for _, msg := range msgs { + msg.AckSync() + } + + // Next fetch will timeout since no more messages. + _, err = sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout fetching next message, got: %v", err) + } + + expected = 5 + sendMsgs(t, expected) + msgs, err = sub.Fetch(expected, nats.MaxWait(1*time.Second)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + got = len(msgs) + if got != expected { + t.Fatalf("Got %v messages, expected at least: %v", got, expected) + } + + for _, msg := range msgs { + msg.Ack() + } + }) + + t.Run("fetch after unsubscribe", func(t *testing.T) { + defer js.PurgeStream(subject) + + expected := 10 + sendMsgs(t, expected) + sub, err := js.PullSubscribe(subject, nats.Durable("fetch-unsub")) + if err != nil { + t.Fatal(err) + } + + err = sub.Unsubscribe() + if err != nil { + t.Fatal(err) + } + + _, err = sub.Fetch(1, nats.MaxWait(500*time.Millisecond)) + if err == nil { + t.Fatal("Unexpected success") + } + if err != nil && (err != nats.ErrTimeout && err != nats.ErrNoResponders) { + t.Fatalf("Unexpected error: %v", err) + } + }) + + t.Run("max waiting timeout", func(t *testing.T) { + defer js.PurgeStream(subject) + + expected := 10 + sendMsgs(t, expected) + + sub, err := js.PullSubscribe(subject, nats.Durable("max-waiting")) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + // Poll more than the default max of waiting/inflight pull requests, + // so that We will get only 408 timeout errors. + errCh := make(chan error, 1024) + defer close(errCh) + var wg sync.WaitGroup + for i := 0; i < 1024; i++ { + wg.Add(1) + + go func() { + _, err := sub.Fetch(1, nats.MaxWait(500*time.Millisecond)) + defer wg.Done() + if err != nil { + errCh <- err + } + }() + } + wg.Wait() + + select { + case <-time.After(1 * time.Second): + t.Fatal("Expected RequestTimeout (408) error due to many inflight pulls") + case err := <-errCh: + if err != nil && (err.Error() != `Request Timeout` && err != nats.ErrTimeout) { + t.Errorf("Expected request timeout fetching next message, got: %+v", err) + } + } + }) + + t.Run("no wait", func(t *testing.T) { + defer js.PurgeStream(subject) + + expected := 10 + sendMsgs(t, expected) + sub, err := js.PullSubscribe(subject, nats.Durable("no-wait")) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + defer done() + recvd := make([]*nats.Msg, 0) + + Loop: + for range time.NewTicker(100 * time.Millisecond).C { + select { + case <-ctx.Done(): + break Loop + default: + } + + msgs, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + recvd = append(recvd, msgs[0]) + + for _, msg := range msgs { + err = msg.AckSync() + if err != nil { + t.Error(err) + } + } + + if len(recvd) == expected { + done() + break + } + } + + got := len(recvd) + if got != expected { + t.Fatalf("Got %v messages, expected at least: %v", got, expected) + } + + // There should only be timeout errors since no more messages. + msgs, err := sub.Fetch(expected, nats.MaxWait(2*time.Second)) + if err == nil { + t.Fatal("Unexpected success", len(msgs)) + } + if err != nats.ErrTimeout { + t.Fatalf("Expected timeout error, got: %v", err) + } + }) +}