Installed go-retry for retrying various actions on the server. Created RetryBackoff function to create the retry policy on demand.

This commit is contained in:
Kelvin Clement Mwinuka
2023-07-28 14:47:52 +08:00
parent 044edff4e8
commit 5f3a0ae56e
15 changed files with 914 additions and 12 deletions

5
go.mod
View File

@@ -3,8 +3,9 @@ module github.com/kelvinmwinuka/memstore
go 1.20
require (
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/memberlist v0.5.0
github.com/hashicorp/raft v1.5.0
github.com/sethvargo/go-retry v0.2.4
github.com/tidwall/resp v0.1.1
gopkg.in/yaml.v3 v3.0.1
)
@@ -14,12 +15,12 @@ require (
github.com/fatih/color v1.13.0 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.0.0 // indirect
github.com/hashicorp/go-sockaddr v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/memberlist v0.5.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/miekg/dns v1.1.26 // indirect

3
go.sum
View File

@@ -98,6 +98,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -128,6 +130,7 @@ golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@@ -28,6 +28,7 @@ type Server struct {
commands []Command
raft *raft.Raft
memberList *memberlist.Memberlist
broadcastQueue *memberlist.TransmitLimitedQueue
cancelCh *chan (os.Signal)
}
@@ -165,7 +166,7 @@ func (server *Server) Start() {
}
server.MemberListInit()
server.RaftInit()
// server.RaftInit()
if conf.HTTP {
server.StartHTTP()

View File

@@ -1,17 +1,57 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hashicorp/memberlist"
retry "github.com/sethvargo/go-retry"
)
type BroadcastMessage struct {
Action string `json:"Action"`
ServerID string `json:"ServerID"`
ServerAddr string `json:"ServerAddr"`
}
func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool {
mb, ok := other.(*BroadcastMessage)
if !ok {
return false
}
if mb.ServerID == broadcastMessage.ServerID && mb.Action == "RaftJoin" {
return true
}
return false
}
func (broadcastMessage *BroadcastMessage) Message() []byte {
msg, err := json.Marshal(broadcastMessage)
if err != nil {
fmt.Println(err)
return []byte{}
}
return msg
}
func (broadcastMessage *BroadcastMessage) Finished() {
// No-Op
}
func (server *Server) MemberListInit() {
// Triggered before RaftInit
cfg := memberlist.DefaultLocalConfig()
cfg.BindAddr = server.config.BindAddr
cfg.BindPort = int(server.config.MemberListBindPort)
cfg.Delegate = server
list, err := memberlist.Create(cfg)
server.memberList = list
@@ -21,16 +61,42 @@ func (server *Server) MemberListInit() {
}
if server.config.JoinAddr != "" {
n, err := server.memberList.Join([]string{server.config.JoinAddr})
ctx := context.Background()
backoffPolicy := RetryBackoff(retry.NewFibonacci(1*time.Second), 0, 0, 0, 0)
err := retry.Do(ctx, backoffPolicy, func(ctx context.Context) error {
fmt.Printf("%s trying to joing the cluster...\n", server.config.ServerID)
return retry.RetryableError(fmt.Errorf("there's a retryable error"))
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Joined cluster. Contacted %d nodes.\n", n)
}
}
func (server *Server) NodeMeta(limit int) []byte {
return []byte{}
}
func (server *Server) NotifyMsg(msg []byte) {
fmt.Println(string(msg))
}
func (server *Server) GetBroadcasts(overhead, limit int) [][]byte {
return server.broadcastQueue.GetBroadcasts(overhead, limit)
}
func (server *Server) LocalState(join bool) []byte {
// No-Op
return []byte{}
}
func (server *Server) MergeRemoteState(buf []byte, join bool) {
// No-Op
}
func (server *Server) MemberListShutdown() {
// Triggered after RaftShutdown
// Gracefully leave memberlist cluster

View File

@@ -10,7 +10,9 @@ import (
"math/big"
"reflect"
"strings"
"time"
"github.com/sethvargo/go-retry"
"github.com/tidwall/resp"
)
@@ -164,3 +166,25 @@ func ReadMessage(r *bufio.ReadWriter) (message string, err error) {
return fmt.Sprintf("%s\r\n", string(bytes.Join(line, []byte("\r\n")))), nil
}
func RetryBackoff(b retry.Backoff, maxRetries uint64, jitter, cappedDuration, maxDuration time.Duration) retry.Backoff {
backoff := b
if maxRetries > 0 {
backoff = retry.WithMaxRetries(maxRetries, backoff)
}
if jitter > 0 {
backoff = retry.WithJitter(jitter, backoff)
}
if cappedDuration > 0 {
backoff = retry.WithCappedDuration(cappedDuration, backoff)
}
if maxDuration > 0 {
backoff = retry.WithMaxDuration(maxDuration, backoff)
}
return backoff
}

202
vendor/github.com/sethvargo/go-retry/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

8
vendor/github.com/sethvargo/go-retry/Makefile generated vendored Normal file
View File

@@ -0,0 +1,8 @@
test:
@go test \
-count=1 \
-race \
-short \
-timeout=5m \
./...
.PHONY: test

186
vendor/github.com/sethvargo/go-retry/README.md generated vendored Normal file
View File

@@ -0,0 +1,186 @@
# Retry
[![GoDoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://pkg.go.dev/mod/github.com/sethvargo/go-retry)
[![GitHub Actions](https://img.shields.io/github/workflow/status/sethvargo/go-retry/Test?style=flat-square)](https://github.com/sethvargo/go-retry/actions?query=workflow%3ATest)
Retry is a Go library for facilitating retry logic and backoff. It's highly
extensible with full control over how and when retries occur. You can also write
your own custom backoff functions by implementing the Backoff interface.
## Features
- **Extensible** - Inspired by Go's built-in HTTP package, this Go backoff and
retry library is extensible via middleware. You can write custom backoff
functions or use a provided filter.
- **Independent** - No external dependencies besides the Go standard library,
meaning it won't bloat your project.
- **Concurrent** - Unless otherwise specified, everything is safe for concurrent
use.
- **Context-aware** - Use native Go contexts to control cancellation.
## Usage
Here is an example use for connecting to a database using Go's `database/sql`
package:
```golang
package main
import (
"context"
"database/sql"
"log"
"time"
"github.com/sethvargo/go-retry"
)
func main() {
db, err := sql.Open("mysql", "...")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
if err := retry.Fibonacci(ctx, 1*time.Second, func(ctx context.Context) error {
if err := db.PingContext(ctx); err != nil {
// This marks the error as retryable
return retry.RetryableError(err)
}
return nil
}); err != nil {
log.Fatal(err)
}
}
```
## Backoffs
In addition to your own custom algorithms, there are built-in algorithms for
backoff in the library.
### Constant
A very rudimentary backoff, just returns a constant value. Here is an example:
```text
1s -> 1s -> 1s -> 1s -> 1s -> 1s
```
Usage:
```golang
NewConstant(1 * time.Second)
```
### Exponential
Arguably the most common backoff, the next value is double the previous value.
Here is an example:
```text
1s -> 2s -> 4s -> 8s -> 16s -> 32s -> 64s
```
Usage:
```golang
NewExponential(1 * time.Second)
```
### Fibonacci
The Fibonacci backoff uses the Fibonacci sequence to calculate the backoff. The
next value is the sum of the current value and the previous value. This means
retires happen quickly at first, but then gradually take slower, ideal for
network-type issues. Here is an example:
```text
1s -> 1s -> 2s -> 3s -> 5s -> 8s -> 13s
```
Usage:
```golang
NewFibonacci(1 * time.Second)
```
## Modifiers (Middleware)
The built-in backoff algorithms never terminate and have no caps or limits - you
control their behavior with middleware. There's built-in middleware, but you can
also write custom middleware.
### Jitter
To reduce the changes of a thundering herd, add random jitter to the returned
value.
```golang
b := NewFibonacci(1 * time.Second)
// Return the next value, +/- 500ms
b = WithJitter(500*time.Millisecond, b)
// Return the next value, +/- 5% of the result
b = WithJitterPercent(5, b)
```
### MaxRetries
To terminate a retry, specify the maximum number of _retries_. Note this
is _retries_, not _attempts_. Attempts is retries + 1.
```golang
b := NewFibonacci(1 * time.Second)
// Stop after 4 retries, when the 5th attempt has failed. In this example, the worst case elapsed
// time would be 1s + 1s + 2s + 3s = 7s.
b = WithMaxRetries(4, b)
```
### CappedDuration
To ensure an individual calculated duration never exceeds a value, use a cap:
```golang
b := NewFibonacci(1 * time.Second)
// Ensure the maximum value is 2s. In this example, the sleep values would be
// 1s, 1s, 2s, 2s, 2s, 2s...
b = WithCappedDuration(2 * time.Second, b)
```
### WithMaxDuration
For a best-effort limit on the total execution time, specify a max duration:
```golang
b := NewFibonacci(1 * time.Second)
// Ensure the maximum total retry time is 5s.
b = WithMaxDuration(5 * time.Second, b)
```
## Benchmarks
Here are benchmarks against some other popular Go backoff and retry libraries.
You can run these benchmarks yourself via the `benchmark/` folder. Commas and
spacing fixed for clarity.
```text
Benchmark/cenkalti-7 13,052,668 87.3 ns/op
Benchmark/lestrrat-7 902,044 1,355 ns/op
Benchmark/sethvargo-7 203,914,245 5.73 ns/op
```
## Notes and Caveats
- Randomization uses `math/rand` seeded with the Unix timestamp instead of
`crypto/rand`.
- Ordering of addition of multiple modifiers will make a difference.
For example; ensure you add `CappedDuration` before `WithMaxDuration`, otherwise it may early out too early.
Another example is you could add `Jitter` before or after capping depending on your desired outcome.

134
vendor/github.com/sethvargo/go-retry/backoff.go generated vendored Normal file
View File

@@ -0,0 +1,134 @@
package retry
import (
"sync"
"time"
)
// Backoff is an interface that backs off.
type Backoff interface {
// Next returns the time duration to wait and whether to stop.
Next() (next time.Duration, stop bool)
}
var _ Backoff = (BackoffFunc)(nil)
// BackoffFunc is a backoff expressed as a function.
type BackoffFunc func() (time.Duration, bool)
// Next implements Backoff.
func (b BackoffFunc) Next() (time.Duration, bool) {
return b()
}
// WithJitter wraps a backoff function and adds the specified jitter. j can be
// interpreted as "+/- j". For example, if j were 5 seconds and the backoff
// returned 20s, the value could be between 15 and 25 seconds. The value can
// never be less than 0.
func WithJitter(j time.Duration, next Backoff) Backoff {
r := newLockedRandom(time.Now().UnixNano())
return BackoffFunc(func() (time.Duration, bool) {
val, stop := next.Next()
if stop {
return 0, true
}
diff := time.Duration(r.Int63n(int64(j)*2) - int64(j))
val = val + diff
if val < 0 {
val = 0
}
return val, false
})
}
// WithJitterPercent wraps a backoff function and adds the specified jitter
// percentage. j can be interpreted as "+/- j%". For example, if j were 5 and
// the backoff returned 20s, the value could be between 19 and 21 seconds. The
// value can never be less than 0 or greater than 100.
func WithJitterPercent(j uint64, next Backoff) Backoff {
r := newLockedRandom(time.Now().UnixNano())
return BackoffFunc(func() (time.Duration, bool) {
val, stop := next.Next()
if stop {
return 0, true
}
// Get a value between -j and j, the convert to a percentage
top := r.Int63n(int64(j)*2) - int64(j)
pct := 1 - float64(top)/100.0
val = time.Duration(float64(val) * pct)
if val < 0 {
val = 0
}
return val, false
})
}
// WithMaxRetries executes the backoff function up until the maximum attempts.
func WithMaxRetries(max uint64, next Backoff) Backoff {
var l sync.Mutex
var attempt uint64
return BackoffFunc(func() (time.Duration, bool) {
l.Lock()
defer l.Unlock()
if attempt >= max {
return 0, true
}
attempt++
val, stop := next.Next()
if stop {
return 0, true
}
return val, false
})
}
// WithCappedDuration sets a maximum on the duration returned from the next
// backoff. This is NOT a total backoff time, but rather a cap on the maximum
// value a backoff can return. Without another middleware, the backoff will
// continue infinitely.
func WithCappedDuration(cap time.Duration, next Backoff) Backoff {
return BackoffFunc(func() (time.Duration, bool) {
val, stop := next.Next()
if stop {
return 0, true
}
if val <= 0 || val > cap {
val = cap
}
return val, false
})
}
// WithMaxDuration sets a maximum on the total amount of time a backoff should
// execute. It's best-effort, and should not be used to guarantee an exact
// amount of time.
func WithMaxDuration(timeout time.Duration, next Backoff) Backoff {
start := time.Now()
return BackoffFunc(func() (time.Duration, bool) {
diff := timeout - time.Since(start)
if diff <= 0 {
return 0, true
}
val, stop := next.Next()
if stop {
return 0, true
}
if val <= 0 || val > diff {
val = diff
}
return val, false
})
}

View File

@@ -0,0 +1,25 @@
package retry
import (
"context"
"time"
)
// Constant is a wrapper around Retry that uses a constant backoff. It panics if
// the given base is less than zero.
func Constant(ctx context.Context, t time.Duration, f RetryFunc) error {
return Do(ctx, NewConstant(t), f)
}
// NewConstant creates a new constant backoff using the value t. The wait time
// is the provided constant value. It panics if the given base is less than
// zero.
func NewConstant(t time.Duration) Backoff {
if t <= 0 {
panic("t must be greater than 0")
}
return BackoffFunc(func() (time.Duration, bool) {
return t, false
})
}

View File

@@ -0,0 +1,47 @@
package retry
import (
"context"
"math"
"sync/atomic"
"time"
)
type exponentialBackoff struct {
base time.Duration
attempt uint64
}
// Exponential is a wrapper around Retry that uses an exponential backoff. See
// NewExponential.
func Exponential(ctx context.Context, base time.Duration, f RetryFunc) error {
return Do(ctx, NewExponential(base), f)
}
// NewExponential creates a new exponential backoff using the starting value of
// base and doubling on each failure (1, 2, 4, 8, 16, 32, 64...), up to max.
//
// Once it overflows, the function constantly returns the maximum time.Duration
// for a 64-bit integer.
//
// It panics if the given base is less than zero.
func NewExponential(base time.Duration) Backoff {
if base <= 0 {
panic("base must be greater than 0")
}
return &exponentialBackoff{
base: base,
}
}
// Next implements Backoff. It is safe for concurrent use.
func (b *exponentialBackoff) Next() (time.Duration, bool) {
next := b.base << (atomic.AddUint64(&b.attempt, 1) - 1)
if next <= 0 {
atomic.AddUint64(&b.attempt, ^uint64(0))
next = math.MaxInt64
}
return next, false
}

View File

@@ -0,0 +1,56 @@
package retry
import (
"context"
"math"
"sync/atomic"
"time"
"unsafe"
)
type state [2]time.Duration
type fibonacciBackoff struct {
state unsafe.Pointer
}
// Fibonacci is a wrapper around Retry that uses a Fibonacci backoff. See
// NewFibonacci.
func Fibonacci(ctx context.Context, base time.Duration, f RetryFunc) error {
return Do(ctx, NewFibonacci(base), f)
}
// NewFibonacci creates a new Fibonacci backoff using the starting value of
// base. The wait time is the sum of the previous two wait times on each failed
// attempt (1, 1, 2, 3, 5, 8, 13...).
//
// Once it overflows, the function constantly returns the maximum time.Duration
// for a 64-bit integer.
//
// It panics if the given base is less than zero.
func NewFibonacci(base time.Duration) Backoff {
if base <= 0 {
panic("base must be greater than 0")
}
return &fibonacciBackoff{
state: unsafe.Pointer(&state{0, base}),
}
}
// Next implements Backoff. It is safe for concurrent use.
func (b *fibonacciBackoff) Next() (time.Duration, bool) {
for {
curr := atomic.LoadPointer(&b.state)
currState := (*state)(curr)
next := currState[0] + currState[1]
if next <= 0 {
return math.MaxInt64, false
}
if atomic.CompareAndSwapPointer(&b.state, curr, unsafe.Pointer(&state{currState[1], next})) {
return next, false
}
}
}

54
vendor/github.com/sethvargo/go-retry/rand.go generated vendored Normal file
View File

@@ -0,0 +1,54 @@
package retry
import (
"math/rand"
"sync"
)
type lockedSource struct {
src *rand.Rand
mu sync.Mutex
}
var _ rand.Source64 = (*lockedSource)(nil)
func newLockedRandom(seed int64) *lockedSource {
return &lockedSource{src: rand.New(rand.NewSource(seed))}
}
// Int63 mimics math/rand.(*Rand).Int63 with mutex locked.
func (r *lockedSource) Int63() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.src.Int63()
}
// Seed mimics math/rand.(*Rand).Seed with mutex locked.
func (r *lockedSource) Seed(seed int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.src.Seed(seed)
}
// Uint64 mimics math/rand.(*Rand).Uint64 with mutex locked.
func (r *lockedSource) Uint64() uint64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.src.Uint64()
}
// Int63n mimics math/rand.(*Rand).Int63n with mutex locked.
func (r *lockedSource) Int63n(n int64) int64 {
if n <= 0 {
panic("invalid argument to Int63n")
}
if n&(n-1) == 0 { // n is power of two, can mask
return r.Int63() & (n - 1)
}
max := int64((1 << 63) - 1 - (1<<63)%uint64(n))
v := r.Int63()
for v > max {
v = r.Int63()
}
return v % n
}

92
vendor/github.com/sethvargo/go-retry/retry.go generated vendored Normal file
View File

@@ -0,0 +1,92 @@
// Package retry provides helpers for retrying.
//
// This package defines flexible interfaces for retrying Go functions that may
// be flakey or eventually consistent. It abstracts the "backoff" (how long to
// wait between tries) and "retry" (execute the function again) mechanisms for
// maximum flexibility. Furthermore, everything is an interface, so you can
// define your own implementations.
//
// The package is modeled after Go's built-in HTTP package, making it easy to
// customize the built-in backoff with your own custom logic. Additionally,
// callers specify which errors are retryable by wrapping them. This is helpful
// with complex operations where only certain results should retry.
package retry
import (
"context"
"errors"
"time"
)
// RetryFunc is a function passed to retry.
type RetryFunc func(ctx context.Context) error
type retryableError struct {
err error
}
// RetryableError marks an error as retryable.
func RetryableError(err error) error {
if err == nil {
return nil
}
return &retryableError{err}
}
// Unwrap implements error wrapping.
func (e *retryableError) Unwrap() error {
return e.err
}
// Error returns the error string.
func (e *retryableError) Error() string {
if e.err == nil {
return "retryable: <nil>"
}
return "retryable: " + e.err.Error()
}
// Do wraps a function with a backoff to retry. The provided context is the same
// context passed to the RetryFunc.
func Do(ctx context.Context, b Backoff, f RetryFunc) error {
for {
// Return immediately if ctx is canceled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err := f(ctx)
if err == nil {
return nil
}
// Not retryable
var rerr *retryableError
if !errors.As(err, &rerr) {
return err
}
next, stop := b.Next()
if stop {
return rerr.Unwrap()
}
// ctx.Done() has priority, so we test it alone first
select {
case <-ctx.Done():
return ctx.Err()
default:
}
t := time.NewTimer(next)
select {
case <-ctx.Done():
t.Stop()
return ctx.Err()
case <-t.C:
continue
}
}
}

3
vendor/modules.txt vendored
View File

@@ -46,6 +46,9 @@ github.com/miekg/dns
# github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529
## explicit
github.com/sean-/seed
# github.com/sethvargo/go-retry v0.2.4
## explicit; go 1.16
github.com/sethvargo/go-retry
# github.com/tidwall/resp v0.1.1
## explicit; go 1.19
github.com/tidwall/resp