Merge branch 'main' of https://github.com/nats-io/nats.go into jsv2

Improve readme

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>

Fix msg size calculation

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>

Add connection event listeners

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>

Fix leaky goroutines in tests

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This commit is contained in:
Piotr Piotrowski
2023-05-16 12:50:12 +02:00
12 changed files with 174 additions and 65 deletions

60
js.go
View File

@@ -1660,8 +1660,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
// If we are creating or updating let's process that request.
consName := o.cfg.Name
if shouldCreate {
info, err := js.upsertConsumer(stream, cfg.Durable, ccreq.Config)
if cfg.Durable != "" {
consName = cfg.Durable
} else if consName == "" {
consName = getHash(nuid.Next())
}
info, err := js.upsertConsumer(stream, consName, ccreq.Config)
if err != nil {
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
@@ -1964,40 +1970,22 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
cfg.DeliverPolicy = DeliverByStartSequencePolicy
cfg.OptStartSeq = sseq
ccSubj := fmt.Sprintf(apiLegacyConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
sub.mu.Unlock()
consName := nuid.Next()
cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
if err != nil {
pushErr(err)
return
}
resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if errors.Is(err, ErrNoResponders) || errors.Is(err, ErrTimeout) {
var apiErr *APIError
if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) {
// if creating consumer failed, retry
return
}
pushErr(err)
return
}
var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
pushErr(err)
return
}
if cinfo.Error != nil {
if cinfo.Error.ErrorCode == JSErrCodeInsufficientResourcesErr {
} else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeInsufficientResourcesErr {
// retry for insufficient resources, as it may mean that client is connected to a running
// server in cluster while the server hosting R1 JetStream resources is restarting
return
}
pushErr(cinfo.Error)
pushErr(err)
return
}
@@ -2489,6 +2477,14 @@ func ConsumerMemoryStorage() SubOpt {
})
}
// ConsumerName sets the name for a consumer.
func ConsumerName(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Name = name
return nil
})
}
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
@@ -3586,3 +3582,17 @@ func (st *StorageType) UnmarshalJSON(data []byte) error {
}
return nil
}
// Length of our hash used for named consumers.
const nameHashLen = 8
// Computes a hash for the given `name`.
func getHash(name string) string {
sha := sha256.New()
sha.Write([]byte(name))
b := sha.Sum(nil)
for i := 0; i < nameHashLen; i++ {
b[i] = rdigits[int(b[i]%base)]
}
return string(b[:nameHashLen])
}