js: Add js.PullSubscribe and sub.Fetch APIs for pull consumers

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
Waldemar Quevedo
2021-02-23 14:25:58 -08:00
parent d70f82c056
commit ac8b51a3ab
4 changed files with 717 additions and 353 deletions

396
js.go
View File

@@ -1,4 +1,4 @@
// Copyright 2020 The NATS Authors
// Copyright 2020-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -105,6 +105,9 @@ type JetStream interface {
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
// PullSubscribe creates a Subscription that can fetch messages.
PullSubscribe(subj string, opts ...SubOpt) (*Subscription, error)
}
// JetStreamContext is the public interface for JetStream.
@@ -351,6 +354,11 @@ func (ttl MaxWait) configureJSContext(js *jsOpts) error {
return nil
}
func (ttl MaxWait) configurePull(opts *pullOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}
// AckWait sets the maximum amount of time we will wait for an ack.
type AckWait time.Duration
@@ -369,12 +377,17 @@ type ContextOpt struct {
context.Context
}
func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
opts.ctx = ctx
return nil
}
func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
func (ctx ContextOpt) configurePull(opts *pullOpts) error {
opts.ctx = ctx
return nil
}
@@ -427,9 +440,9 @@ type SequencePair struct {
// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires *time.Time `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
}
// jsSub includes JetStream subscription info.
@@ -438,7 +451,7 @@ type jsSub struct {
consumer string
stream string
deliver string
pull int
pull bool
durable bool
attached bool
}
@@ -473,6 +486,9 @@ func (opt subOptFn) configureSubscribe(opts *subOpts) error {
// Subscribe will create a subscription to the appropriate stream and consumer.
func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
return nil, ErrBadSubscription
}
return js.subscribe(subj, _EMPTY_, cb, nil, opts)
}
@@ -484,6 +500,9 @@ func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
// QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics.
func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
if cb == nil {
return nil, ErrBadSubscription
}
return js.subscribe(subj, queue, cb, nil, opts)
}
@@ -498,6 +517,11 @@ func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscri
return js.subscribe(subj, _EMPTY_, nil, ch, opts)
}
// PullSubscribe creates a pull subscriber.
func (js *js) PullSubscribe(subj string, opts ...SubOpt) (*Subscription, error) {
return js.subscribe(subj, _EMPTY_, nil, nil, opts)
}
func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []SubOpt) (*Subscription, error) {
cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
o := subOpts{cfg: &cfg}
@@ -509,11 +533,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
}
}
isPullMode := o.pull > 0
if cb != nil && isPullMode {
return nil, ErrPullModeNotAllowed
}
isPullMode := ch == nil && cb == nil
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
if isPullMode && badPullAck {
return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
@@ -592,9 +612,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
if err != nil {
return nil, err
if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}}
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
if err != nil {
return nil, err
}
}
// If we are creating or updating let's process that request.
@@ -668,7 +693,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
// If we are pull based go ahead and fire off the first request to populate.
if isPullMode {
sub.jsi.pull = o.pull
sub.Poll()
}
return sub, nil
@@ -710,38 +734,14 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
type subOpts struct {
// For attaching.
stream, consumer string
// For pull based consumers, batch size for pull
pull int
// For pull based consumers.
pull bool
// For manual ack
mack bool
// For creating or updating.
cfg *ConsumerConfig
}
// Pull defines the batch size of messages that will be received
// when using pull based JetStream consumers.
func Pull(batchSize int) SubOpt {
return subOptFn(func(opts *subOpts) error {
if batchSize == 0 {
return errors.New("nats: batch size of 0 not valid")
}
opts.pull = batchSize
return nil
})
}
func PullDirect(stream, consumer string, batchSize int) SubOpt {
return subOptFn(func(opts *subOpts) error {
if batchSize == 0 {
return errors.New("nats: batch size of 0 not valid")
}
opts.stream = stream
opts.consumer = consumer
opts.pull = batchSize
return nil
})
}
// ManualAck disables auto ack functionality for async subscriptions.
func ManualAck() SubOpt {
return subOptFn(func(opts *subOpts) error {
@@ -881,21 +881,296 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}
func (sub *Subscription) Poll() error {
sub.mu.Lock()
if sub.jsi == nil || sub.jsi.deliver != _EMPTY_ || sub.jsi.pull == 0 {
sub.mu.Unlock()
return ErrTypeSubscription
type pullOpts struct {
ttl time.Duration
ctx context.Context
}
type PullOpt interface {
configurePull(opts *pullOpts) error
}
// PullMaxWaiting defines the max inflight pull requests to be delivered more messages.
func PullMaxWaiting(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxWaiting = n
return nil
})
}
var errNoMessages = errors.New("nats: no messages")
// Fetch pulls a batch of messages from a stream for a pull consumer.
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if sub == nil {
return nil, ErrBadSubscription
}
batch := sub.jsi.pull
nc, reply := sub.conn, sub.Subject
var o pullOpts
for _, opt := range opts {
if err := opt.configurePull(&o); err != nil {
return nil, err
}
}
if o.ctx != nil && o.ttl != 0 {
return nil, ErrContextAndTimeout
}
sub.mu.Lock()
if sub.jsi == nil || sub.typ != PullSubscription {
sub.mu.Unlock()
return nil, ErrTypeSubscription
}
nc, _ := sub.conn, sub.Subject
stream, consumer := sub.jsi.stream, sub.jsi.consumer
js := sub.jsi.js
ttl := o.ttl
if ttl == 0 {
ttl = js.opts.wait
}
sub.mu.Unlock()
req, _ := json.Marshal(&nextRequest{Batch: batch})
reqNext := js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
return nc.PublishRequest(reqNext, reply, req)
// Use the given context or setup a default one for the span
// of the pull batch request.
var (
ctx = o.ctx
err error
cancel context.CancelFunc
)
if o.ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
defer cancel()
}
// Check if context not done already before making the request.
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
err = ctx.Err()
} else {
err = ErrTimeout
}
default:
}
if err != nil {
return nil, err
}
// Check for empty payload message and process synchronously
// any status messages.
checkMsg := func(msg *Msg) error {
if len(msg.Data) == 0 {
switch msg.Header.Get(statusHdr) {
case noResponders:
return ErrNoResponders
case noMessages:
return errNoMessages
case "400", "408", "409":
return errors.New(msg.Header.Get(descrHdr))
}
}
return nil
}
checkCtxErr := func(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
return ErrTimeout
}
return err
}
var (
gotNoMessages bool
nr = &nextRequest{Batch: batch, NoWait: true}
req, _ = json.Marshal(nr)
reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
expires = ttl - 10*time.Millisecond
msgs = make([]*Msg, 0)
)
// In case of only one message, then can already handle with built-in request functions.
if batch == 1 {
resp, err := nc.RequestWithContext(ctx, reqNext, req)
if err != nil {
return nil, checkCtxErr(err)
}
// In case of a no messages instant error, then fallback
// into longer version of pull batch request.
err = checkMsg(resp)
if err != nil {
if err == errNoMessages {
// Use old request style for the retry of the pull request
// in order to use auto UNSUB 1 to prevent the server
// from delivering a message when there is no more interest.
nr.NoWait = false
nr.Expires = expires
req, _ = json.Marshal(nr)
resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req)
if err != nil {
return nil, checkCtxErr(err)
}
// This next message, could also be an error
// (e.g. 408 due to request timeout).
err = checkMsg(resp)
if err != nil {
return nil, err
}
return []*Msg{resp}, nil
} else {
// Hard error
return nil, checkCtxErr(err)
}
}
return []*Msg{resp}, nil
}
// Setup a request where we will wait for the first response
// in case of errors, then dispatch the rest of the replies
// to the channel.
inbox := NewInbox()
mch := make(chan *Msg, batch)
s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil)
if err != nil {
return nil, err
}
// Remove interest in the subscription at the end so that the
// this inbox does not get delivered the results intended
// for another request.
defer s.Unsubscribe()
// Make a publish request to get results of the pull.
err = nc.publish(reqNext, inbox, nil, req)
if err != nil {
s.Unsubscribe()
return nil, err
}
// Try to get the first message or error with NoWait.
var (
firstMsg *Msg
ok bool
)
select {
case firstMsg, ok = <-mch:
if !ok {
err = s.getNextMsgErr()
} else {
err = s.processNextMsgDelivered(firstMsg)
if err == nil {
err = checkMsg(firstMsg)
}
}
case <-ctx.Done():
err = checkCtxErr(ctx.Err())
}
// If the first error is 'no more messages', then switch into
// longer form version of the request that waits for messages.
if err == errNoMessages {
gotNoMessages = true
} else if err != nil {
// We should be getting the response from the server
// in case we got a poll error, so stop and cleanup.
s.Unsubscribe()
return nil, err
}
if gotNoMessages {
// We started with a 404 response right away, so fallback into
// second request that waits longer for messages to delivered.
nr.NoWait = false
nr.Expires = expires
req, _ = json.Marshal(nr)
// Since first message was an error we UNSUB (batch+1)
// since we are counting it as the first message.
err = s.AutoUnsubscribe(batch + 1)
if err != nil {
return nil, err
}
// Make another request and wait for the messages...
err = nc.publish(reqNext, inbox, nil, req)
if err != nil {
s.Unsubscribe()
return nil, err
}
// Try to get the first result again or return the error.
select {
case firstMsg, ok = <-mch:
if !ok {
err = s.getNextMsgErr()
} else {
err = s.processNextMsgDelivered(firstMsg)
if err == nil {
err = checkMsg(firstMsg)
}
}
case <-ctx.Done():
err = checkCtxErr(ctx.Err())
}
if err != nil {
s.Unsubscribe()
return nil, err
}
// Check again if the delivered next message is a status error.
err = checkMsg(firstMsg)
if err != nil {
s.Unsubscribe()
return nil, err
}
} else {
// We are receiving messages at this point. Send UNSUB to let
// the server clear interest once enough replies are delivered.
err = s.AutoUnsubscribe(batch)
if err != nil {
return nil, err
}
}
msgs = append(msgs, firstMsg)
for {
var (
msg *Msg
ok bool
)
select {
case msg, ok = <-mch:
if !ok {
err = s.getNextMsgErr()
} else {
err = s.processNextMsgDelivered(msg)
if err == nil {
err = checkMsg(msg)
}
}
case <-ctx.Done():
return msgs, checkCtxErr(err)
}
if err != nil {
// Discard the error which may have been a timeout
// or 408 request timeout status from the server,
// and just the return delivered messages.
break
}
if msg != nil {
msgs = append(msgs, msg)
}
if len(msgs) == batch {
// Done!
break
}
}
return msgs, nil
}
func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
@@ -940,7 +1215,7 @@ func (m *Msg) checkReply() (*js, bool, error) {
return nil, false, nil
}
js := sub.jsi.js
isPullMode := sub.jsi.pull > 0
isPullMode := sub.jsi.pull
sub.mu.Unlock()
return js, isPullMode, nil
@@ -956,7 +1231,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
return err
}
}
js, isPullMode, err := m.checkReply()
js, _, err := m.checkReply()
if err != nil {
return err
}
@@ -978,20 +1253,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
wait = js.opts.wait
}
if isPullMode {
if bytes.Equal(ackType, AckAck) {
err = nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext)
} else if bytes.Equal(ackType, AckNak) || bytes.Equal(ackType, AckTerm) {
err = nc.PublishRequest(m.Reply, m.Sub.Subject, []byte("+NXT {\"batch\":1}"))
}
if sync && err == nil {
if ctx != nil {
_, err = nc.RequestWithContext(ctx, m.Reply, nil)
} else {
_, err = nc.Request(m.Reply, nil, wait)
}
}
} else if sync {
if sync {
if ctx != nil {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
} else {