js: PubAckFuture as interface

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
Waldemar Quevedo
2021-03-31 11:35:42 -07:00
parent 1114ca8c9a
commit f42e6f27d9

37
js.go
View File

@@ -97,11 +97,11 @@ type JetStream interface {
// PublishAsync publishes a message to JetStream and returns a PubAckFuture. // PublishAsync publishes a message to JetStream and returns a PubAckFuture.
// The data should not be changed until the PubAckFuture has been processed. // The data should not be changed until the PubAckFuture has been processed.
PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture. // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
// The message should not be changed until the PubAckFuture has been processed. // The message should not be changed until the PubAckFuture has been processed.
PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
// PublishAsyncPending returns the number of async publishes outstanding for this context. // PublishAsyncPending returns the number of async publishes outstanding for this context.
PublishAsyncPending() int PublishAsyncPending() int
@@ -143,7 +143,7 @@ type js struct {
mu sync.RWMutex mu sync.RWMutex
rpre string rpre string
rsub *Subscription rsub *Subscription
pafs map[string]*PubAckFuture pafs map[string]*pubAckFuture
stc chan struct{} stc chan struct{}
dch chan struct{} dch chan struct{}
} }
@@ -346,7 +346,18 @@ func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
} }
// PubAckFuture is a future for a PubAck. // PubAckFuture is a future for a PubAck.
type PubAckFuture struct { type PubAckFuture interface {
// Ok returns a receive only channel that can be used to get a PubAck.
Ok() <-chan *PubAck
// Err returns a receive only channel that can be used to get the error from an async publish.
Err() <-chan error
// Msg returns the message that was sent to the server.
Msg() *Msg
}
type pubAckFuture struct {
js *js js *js
msg *Msg msg *Msg
pa *PubAck pa *PubAck
@@ -356,7 +367,7 @@ type PubAckFuture struct {
doneCh chan *PubAck doneCh chan *PubAck
} }
func (paf *PubAckFuture) Ok() <-chan *PubAck { func (paf *pubAckFuture) Ok() <-chan *PubAck {
paf.js.mu.Lock() paf.js.mu.Lock()
defer paf.js.mu.Unlock() defer paf.js.mu.Unlock()
@@ -370,7 +381,7 @@ func (paf *PubAckFuture) Ok() <-chan *PubAck {
return paf.doneCh return paf.doneCh
} }
func (paf *PubAckFuture) Err() <-chan error { func (paf *pubAckFuture) Err() <-chan error {
paf.js.mu.Lock() paf.js.mu.Lock()
defer paf.js.mu.Unlock() defer paf.js.mu.Unlock()
@@ -384,7 +395,7 @@ func (paf *PubAckFuture) Err() <-chan error {
return paf.errCh return paf.errCh
} }
func (paf *PubAckFuture) Msg() *Msg { func (paf *pubAckFuture) Msg() *Msg {
paf.js.mu.RLock() paf.js.mu.RLock()
defer paf.js.mu.RUnlock() defer paf.js.mu.RUnlock()
return paf.msg return paf.msg
@@ -426,10 +437,10 @@ func (js *js) newAsyncReply() string {
} }
// registerPAF will register for a PubAckFuture. // registerPAF will register for a PubAckFuture.
func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) { func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
js.mu.Lock() js.mu.Lock()
if js.pafs == nil { if js.pafs == nil {
js.pafs = make(map[string]*PubAckFuture) js.pafs = make(map[string]*pubAckFuture)
} }
paf.js = js paf.js = js
js.pafs[id] = paf js.pafs[id] = paf
@@ -440,7 +451,7 @@ func (js *js) registerPAF(id string, paf *PubAckFuture) (int, int) {
} }
// Lock should be held. // Lock should be held.
func (js *js) getPAF(id string) *PubAckFuture { func (js *js) getPAF(id string) *pubAckFuture {
if js.pafs == nil { if js.pafs == nil {
return nil return nil
} }
@@ -565,11 +576,11 @@ func PublishAsyncMaxPending(max int) JSOpt {
} }
// PublishAsync publishes a message to JetStream and returns a PubAckFuture // PublishAsync publishes a message to JetStream and returns a PubAckFuture
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (*PubAckFuture, error) { func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...) return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
} }
func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) { func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
var o pubOpts var o pubOpts
if len(opts) > 0 { if len(opts) > 0 {
if m.Header == nil { if m.Header == nil {
@@ -610,7 +621,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (*PubAckFuture, error) {
return nil, errors.New("nats: error creating async reply handler") return nil, errors.New("nats: error creating async reply handler")
} }
id := m.Reply[aReplyPreLen:] id := m.Reply[aReplyPreLen:]
paf := &PubAckFuture{msg: m, st: time.Now()} paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf) numPending, maxPending := js.registerPAF(id, paf)
if maxPending > 0 && numPending >= maxPending { if maxPending > 0 && numPending >= maxPending {