mirror of
https://github.com/nats-io/nats.go.git
synced 2025-12-24 13:08:06 +08:00
[IMPROVED] Use errors.Is for err handling, and use skipped bool (#1500)
Signed-off-by: Sasha Melentyev <sasha@m8.ru>
This commit is contained in:
@@ -280,7 +280,7 @@ func (js *jetStream) KeyValue(ctx context.Context, bucket string) (KeyValue, err
|
||||
streamName := fmt.Sprintf(kvBucketNameTmpl, bucket)
|
||||
stream, err := js.Stream(ctx, streamName)
|
||||
if err != nil {
|
||||
if err == ErrStreamNotFound {
|
||||
if errors.Is(err, ErrStreamNotFound) {
|
||||
err = ErrBucketNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -567,7 +567,7 @@ func (kv *kvs) get(ctx context.Context, key string, revision uint64) (KeyValueEn
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == ErrMsgNotFound {
|
||||
if errors.Is(err, ErrMsgNotFound) {
|
||||
err = ErrKeyNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -619,7 +619,7 @@ func (e *kve) Operation() KeyValueOp { return e.op }
|
||||
func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) {
|
||||
e, err := kv.get(ctx, key, kvLatestRevision)
|
||||
if err != nil {
|
||||
if err == ErrKeyDeleted {
|
||||
if errors.Is(err, ErrKeyDeleted) {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -632,7 +632,7 @@ func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) {
|
||||
func (kv *kvs) GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) {
|
||||
e, err := kv.get(ctx, key, revision)
|
||||
if err != nil {
|
||||
if err == ErrKeyDeleted {
|
||||
if errors.Is(err, ErrKeyDeleted) {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -677,7 +677,7 @@ func (kv *kvs) Create(ctx context.Context, key string, value []byte) (revision u
|
||||
return v, nil
|
||||
}
|
||||
|
||||
if e, err := kv.get(ctx, key, kvLatestRevision); err == ErrKeyDeleted {
|
||||
if e, err := kv.get(ctx, key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) {
|
||||
return kv.Update(ctx, key, value, e.Revision())
|
||||
}
|
||||
|
||||
|
||||
20
js.go
20
js.go
@@ -545,7 +545,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
|
||||
for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
|
||||
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
|
||||
if o.ctx != nil {
|
||||
select {
|
||||
@@ -567,7 +567,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == ErrNoResponders {
|
||||
if errors.Is(err, ErrNoResponders) {
|
||||
err = ErrNoStreamResponse
|
||||
}
|
||||
return nil, err
|
||||
@@ -1601,7 +1601,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
|
||||
if consumer != _EMPTY_ && !o.skipCInfo {
|
||||
info, err = js.ConsumerInfo(stream, consumer)
|
||||
notFoundErr = errors.Is(err, ErrConsumerNotFound)
|
||||
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
|
||||
lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
switch {
|
||||
@@ -2831,7 +2831,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
|
||||
// are no messages.
|
||||
msg, err = sub.nextMsgWithContext(ctx, true, false)
|
||||
if err != nil {
|
||||
if err == errNoMessages {
|
||||
if errors.Is(err, errNoMessages) {
|
||||
err = nil
|
||||
}
|
||||
break
|
||||
@@ -2911,13 +2911,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
|
||||
usrMsg, err = checkMsg(msg, true, noWait)
|
||||
if err == nil && usrMsg {
|
||||
msgs = append(msgs, msg)
|
||||
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
|
||||
} else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 {
|
||||
// If we have a 404/408 for our "no_wait" request and have
|
||||
// not collected any message, then resend request to
|
||||
// wait this time.
|
||||
noWait = false
|
||||
err = sendReq()
|
||||
} else if err == ErrTimeout && len(msgs) == 0 {
|
||||
} else if errors.Is(err, ErrTimeout) && len(msgs) == 0 {
|
||||
// If we get a 408, we will bail if we already collected some
|
||||
// messages, otherwise ignore and go back calling nextMsg.
|
||||
err = nil
|
||||
@@ -3100,7 +3100,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
|
||||
// are no messages.
|
||||
msg, err := sub.nextMsgWithContext(ctx, true, false)
|
||||
if err != nil {
|
||||
if err == errNoMessages {
|
||||
if errors.Is(err, errNoMessages) {
|
||||
err = nil
|
||||
}
|
||||
result.err = err
|
||||
@@ -3177,7 +3177,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
|
||||
|
||||
usrMsg, err = checkMsg(msg, true, false)
|
||||
if err != nil {
|
||||
if err == ErrTimeout {
|
||||
if errors.Is(err, ErrTimeout) {
|
||||
if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
|
||||
// ignore timeout message from server if it comes from a different pull request
|
||||
continue
|
||||
@@ -3206,7 +3206,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
|
||||
|
||||
// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
|
||||
func (o *pullOpts) checkCtxErr(err error) error {
|
||||
if o.ctx == nil && err == context.DeadlineExceeded {
|
||||
if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) {
|
||||
return ErrTimeout
|
||||
}
|
||||
return err
|
||||
@@ -3222,7 +3222,7 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
|
||||
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
|
||||
resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
|
||||
if err != nil {
|
||||
if err == ErrNoResponders {
|
||||
if errors.Is(err, ErrNoResponders) {
|
||||
err = ErrJetStreamNotEnabled
|
||||
}
|
||||
return nil, err
|
||||
|
||||
6
jsm.go
6
jsm.go
@@ -297,7 +297,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
|
||||
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
|
||||
if err != nil {
|
||||
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
|
||||
if err == ErrNoResponders {
|
||||
if errors.Is(err, ErrNoResponders) {
|
||||
err = ErrJetStreamNotEnabled
|
||||
}
|
||||
return nil, err
|
||||
@@ -415,7 +415,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
|
||||
|
||||
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
|
||||
if err != nil {
|
||||
if err == ErrNoResponders {
|
||||
if errors.Is(err, ErrNoResponders) {
|
||||
err = ErrJetStreamNotEnabled
|
||||
}
|
||||
return nil, err
|
||||
@@ -1623,7 +1623,7 @@ func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) {
|
||||
|
||||
resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j)
|
||||
if err != nil {
|
||||
if err == ErrNoResponders {
|
||||
if errors.Is(err, ErrNoResponders) {
|
||||
err = ErrJetStreamNotEnabled
|
||||
}
|
||||
return _EMPTY_, err
|
||||
|
||||
12
kv.go
12
kv.go
@@ -359,7 +359,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
|
||||
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
|
||||
si, err := js.StreamInfo(stream)
|
||||
if err != nil {
|
||||
if err == ErrStreamNotFound {
|
||||
if errors.Is(err, ErrStreamNotFound) {
|
||||
err = ErrBucketNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -486,7 +486,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
|
||||
// the stream.
|
||||
// The same logic applies for KVs created pre 2.9.x and
|
||||
// the AllowDirect setting.
|
||||
if err == ErrStreamNameAlreadyInUse {
|
||||
if errors.Is(err, ErrStreamNameAlreadyInUse) {
|
||||
if si, _ = js.StreamInfo(scfg.Name); si != nil {
|
||||
// To compare, make the server's stream info discard
|
||||
// policy same than ours.
|
||||
@@ -558,7 +558,7 @@ func keyValid(key string) bool {
|
||||
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
|
||||
e, err := kv.get(key, kvLatestRevision)
|
||||
if err != nil {
|
||||
if err == ErrKeyDeleted {
|
||||
if errors.Is(err, ErrKeyDeleted) {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -571,7 +571,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) {
|
||||
func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
|
||||
e, err := kv.get(key, revision)
|
||||
if err != nil {
|
||||
if err == ErrKeyDeleted {
|
||||
if errors.Is(err, ErrKeyDeleted) {
|
||||
return nil, ErrKeyNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -608,7 +608,7 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == ErrMsgNotFound {
|
||||
if errors.Is(err, ErrMsgNotFound) {
|
||||
err = ErrKeyNotFound
|
||||
}
|
||||
return nil, err
|
||||
@@ -675,7 +675,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
|
||||
|
||||
// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
|
||||
// so we need to double check.
|
||||
if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
|
||||
if e, err := kv.get(key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) {
|
||||
return kv.Update(key, value, e.Revision())
|
||||
}
|
||||
|
||||
|
||||
@@ -645,7 +645,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
|
||||
if ctx != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() == context.Canceled {
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
err = ctx.Err()
|
||||
} else {
|
||||
err = ErrTimeout
|
||||
@@ -945,7 +945,7 @@ func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, err
|
||||
|
||||
m, err := obs.js.GetLastMsg(stream, metaSubj)
|
||||
if err != nil {
|
||||
if err == ErrMsgNotFound {
|
||||
if errors.Is(err, ErrMsgNotFound) {
|
||||
err = ErrObjectNotFound
|
||||
}
|
||||
return nil, err
|
||||
|
||||
2
timer.go
2
timer.go
@@ -29,7 +29,7 @@ type timerPool struct {
|
||||
|
||||
// Get returns a timer that completes after the given duration.
|
||||
func (tp *timerPool) Get(d time.Duration) *time.Timer {
|
||||
if t, _ := tp.p.Get().(*time.Timer); t != nil {
|
||||
if t, ok := tp.p.Get().(*time.Timer); ok && t != nil {
|
||||
t.Reset(d)
|
||||
return t
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user