diff --git a/go.mod b/go.mod index 255a68e..cd4aa36 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/kelvinmwinuka/memstore go 1.20 require ( - github.com/hashicorp/go-hclog v1.5.0 + github.com/hashicorp/memberlist v0.5.0 github.com/hashicorp/raft v1.5.0 + github.com/sethvargo/go-retry v0.2.4 github.com/tidwall/resp v0.1.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -14,12 +15,12 @@ require ( github.com/fatih/color v1.13.0 // indirect github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-multierror v1.0.0 // indirect github.com/hashicorp/go-sockaddr v1.0.0 // indirect github.com/hashicorp/golang-lru v0.5.0 // indirect - github.com/hashicorp/memberlist v0.5.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/miekg/dns v1.1.26 // indirect diff --git a/go.sum b/go.sum index 4c0df19..b794aae 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= +github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -128,6 +130,7 @@ golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/main.go b/server/main.go index 8045f22..ff3c39e 100644 --- a/server/main.go +++ b/server/main.go @@ -23,12 +23,13 @@ type Data struct { } type Server struct { - config Config - data Data - commands []Command - raft *raft.Raft - memberList *memberlist.Memberlist - cancelCh *chan (os.Signal) + config Config + data Data + commands []Command + raft *raft.Raft + memberList *memberlist.Memberlist + broadcastQueue *memberlist.TransmitLimitedQueue + cancelCh *chan (os.Signal) } func (server *Server) Lock() { @@ -165,7 +166,7 @@ func (server *Server) Start() { } server.MemberListInit() - server.RaftInit() + // server.RaftInit() if conf.HTTP { server.StartHTTP() diff --git a/server/memberlist.go b/server/memberlist.go index a1f3d7c..f574852 100644 --- a/server/memberlist.go +++ b/server/memberlist.go @@ -1,17 +1,57 @@ package main import ( + "context" + "encoding/json" "fmt" "log" + "time" "github.com/hashicorp/memberlist" + retry "github.com/sethvargo/go-retry" ) +type BroadcastMessage struct { + Action string `json:"Action"` + ServerID string `json:"ServerID"` + ServerAddr string `json:"ServerAddr"` +} + +func (broadcastMessage *BroadcastMessage) Invalidates(other memberlist.Broadcast) bool { + mb, ok := other.(*BroadcastMessage) + + if !ok { + return false + } + + if mb.ServerID == broadcastMessage.ServerID && mb.Action == "RaftJoin" { + return true + } + + return false +} + +func (broadcastMessage *BroadcastMessage) Message() []byte { + msg, err := json.Marshal(broadcastMessage) + + if err != nil { + fmt.Println(err) + return []byte{} + } + + return msg +} + +func (broadcastMessage *BroadcastMessage) Finished() { + // No-Op +} + func (server *Server) MemberListInit() { // Triggered before RaftInit cfg := memberlist.DefaultLocalConfig() cfg.BindAddr = server.config.BindAddr cfg.BindPort = int(server.config.MemberListBindPort) + cfg.Delegate = server list, err := memberlist.Create(cfg) server.memberList = list @@ -21,16 +61,42 @@ func (server *Server) MemberListInit() { } if server.config.JoinAddr != "" { - n, err := server.memberList.Join([]string{server.config.JoinAddr}) + ctx := context.Background() + + backoffPolicy := RetryBackoff(retry.NewFibonacci(1*time.Second), 0, 0, 0, 0) + + err := retry.Do(ctx, backoffPolicy, func(ctx context.Context) error { + fmt.Printf("%s trying to joing the cluster...\n", server.config.ServerID) + return retry.RetryableError(fmt.Errorf("there's a retryable error")) + }) if err != nil { log.Fatal(err) } - - fmt.Printf("Joined cluster. Contacted %d nodes.\n", n) } } +func (server *Server) NodeMeta(limit int) []byte { + return []byte{} +} + +func (server *Server) NotifyMsg(msg []byte) { + fmt.Println(string(msg)) +} + +func (server *Server) GetBroadcasts(overhead, limit int) [][]byte { + return server.broadcastQueue.GetBroadcasts(overhead, limit) +} + +func (server *Server) LocalState(join bool) []byte { + // No-Op + return []byte{} +} + +func (server *Server) MergeRemoteState(buf []byte, join bool) { + // No-Op +} + func (server *Server) MemberListShutdown() { // Triggered after RaftShutdown // Gracefully leave memberlist cluster diff --git a/server/utils.go b/server/utils.go index 887d5db..0cda2f7 100644 --- a/server/utils.go +++ b/server/utils.go @@ -10,7 +10,9 @@ import ( "math/big" "reflect" "strings" + "time" + "github.com/sethvargo/go-retry" "github.com/tidwall/resp" ) @@ -164,3 +166,25 @@ func ReadMessage(r *bufio.ReadWriter) (message string, err error) { return fmt.Sprintf("%s\r\n", string(bytes.Join(line, []byte("\r\n")))), nil } + +func RetryBackoff(b retry.Backoff, maxRetries uint64, jitter, cappedDuration, maxDuration time.Duration) retry.Backoff { + backoff := b + + if maxRetries > 0 { + backoff = retry.WithMaxRetries(maxRetries, backoff) + } + + if jitter > 0 { + backoff = retry.WithJitter(jitter, backoff) + } + + if cappedDuration > 0 { + backoff = retry.WithCappedDuration(cappedDuration, backoff) + } + + if maxDuration > 0 { + backoff = retry.WithMaxDuration(maxDuration, backoff) + } + + return backoff +} diff --git a/vendor/github.com/sethvargo/go-retry/LICENSE b/vendor/github.com/sethvargo/go-retry/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/sethvargo/go-retry/Makefile b/vendor/github.com/sethvargo/go-retry/Makefile new file mode 100644 index 0000000..247d4a2 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/Makefile @@ -0,0 +1,8 @@ +test: + @go test \ + -count=1 \ + -race \ + -short \ + -timeout=5m \ + ./... +.PHONY: test diff --git a/vendor/github.com/sethvargo/go-retry/README.md b/vendor/github.com/sethvargo/go-retry/README.md new file mode 100644 index 0000000..6721618 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/README.md @@ -0,0 +1,186 @@ +# Retry + +[![GoDoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://pkg.go.dev/mod/github.com/sethvargo/go-retry) +[![GitHub Actions](https://img.shields.io/github/workflow/status/sethvargo/go-retry/Test?style=flat-square)](https://github.com/sethvargo/go-retry/actions?query=workflow%3ATest) + +Retry is a Go library for facilitating retry logic and backoff. It's highly +extensible with full control over how and when retries occur. You can also write +your own custom backoff functions by implementing the Backoff interface. + +## Features + +- **Extensible** - Inspired by Go's built-in HTTP package, this Go backoff and + retry library is extensible via middleware. You can write custom backoff + functions or use a provided filter. + +- **Independent** - No external dependencies besides the Go standard library, + meaning it won't bloat your project. + +- **Concurrent** - Unless otherwise specified, everything is safe for concurrent + use. + +- **Context-aware** - Use native Go contexts to control cancellation. + +## Usage + +Here is an example use for connecting to a database using Go's `database/sql` +package: + +```golang +package main + +import ( + "context" + "database/sql" + "log" + "time" + + "github.com/sethvargo/go-retry" +) + +func main() { + db, err := sql.Open("mysql", "...") + if err != nil { + log.Fatal(err) + } + + ctx := context.Background() + if err := retry.Fibonacci(ctx, 1*time.Second, func(ctx context.Context) error { + if err := db.PingContext(ctx); err != nil { + // This marks the error as retryable + return retry.RetryableError(err) + } + return nil + }); err != nil { + log.Fatal(err) + } +} +``` + +## Backoffs + +In addition to your own custom algorithms, there are built-in algorithms for +backoff in the library. + +### Constant + +A very rudimentary backoff, just returns a constant value. Here is an example: + +```text +1s -> 1s -> 1s -> 1s -> 1s -> 1s +``` + +Usage: + +```golang +NewConstant(1 * time.Second) +``` + +### Exponential + +Arguably the most common backoff, the next value is double the previous value. +Here is an example: + +```text +1s -> 2s -> 4s -> 8s -> 16s -> 32s -> 64s +``` + +Usage: + +```golang +NewExponential(1 * time.Second) +``` + +### Fibonacci + +The Fibonacci backoff uses the Fibonacci sequence to calculate the backoff. The +next value is the sum of the current value and the previous value. This means +retires happen quickly at first, but then gradually take slower, ideal for +network-type issues. Here is an example: + +```text +1s -> 1s -> 2s -> 3s -> 5s -> 8s -> 13s +``` + +Usage: + +```golang +NewFibonacci(1 * time.Second) +``` + +## Modifiers (Middleware) + +The built-in backoff algorithms never terminate and have no caps or limits - you +control their behavior with middleware. There's built-in middleware, but you can +also write custom middleware. + +### Jitter + +To reduce the changes of a thundering herd, add random jitter to the returned +value. + +```golang +b := NewFibonacci(1 * time.Second) + +// Return the next value, +/- 500ms +b = WithJitter(500*time.Millisecond, b) + +// Return the next value, +/- 5% of the result +b = WithJitterPercent(5, b) +``` + +### MaxRetries + +To terminate a retry, specify the maximum number of _retries_. Note this +is _retries_, not _attempts_. Attempts is retries + 1. + +```golang +b := NewFibonacci(1 * time.Second) + +// Stop after 4 retries, when the 5th attempt has failed. In this example, the worst case elapsed +// time would be 1s + 1s + 2s + 3s = 7s. +b = WithMaxRetries(4, b) +``` + +### CappedDuration + +To ensure an individual calculated duration never exceeds a value, use a cap: + +```golang +b := NewFibonacci(1 * time.Second) + +// Ensure the maximum value is 2s. In this example, the sleep values would be +// 1s, 1s, 2s, 2s, 2s, 2s... +b = WithCappedDuration(2 * time.Second, b) +``` + +### WithMaxDuration + +For a best-effort limit on the total execution time, specify a max duration: + +```golang +b := NewFibonacci(1 * time.Second) + +// Ensure the maximum total retry time is 5s. +b = WithMaxDuration(5 * time.Second, b) +``` + +## Benchmarks + +Here are benchmarks against some other popular Go backoff and retry libraries. +You can run these benchmarks yourself via the `benchmark/` folder. Commas and +spacing fixed for clarity. + +```text +Benchmark/cenkalti-7 13,052,668 87.3 ns/op +Benchmark/lestrrat-7 902,044 1,355 ns/op +Benchmark/sethvargo-7 203,914,245 5.73 ns/op +``` + +## Notes and Caveats + +- Randomization uses `math/rand` seeded with the Unix timestamp instead of + `crypto/rand`. +- Ordering of addition of multiple modifiers will make a difference. + For example; ensure you add `CappedDuration` before `WithMaxDuration`, otherwise it may early out too early. + Another example is you could add `Jitter` before or after capping depending on your desired outcome. diff --git a/vendor/github.com/sethvargo/go-retry/backoff.go b/vendor/github.com/sethvargo/go-retry/backoff.go new file mode 100644 index 0000000..de3974f --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/backoff.go @@ -0,0 +1,134 @@ +package retry + +import ( + "sync" + "time" +) + +// Backoff is an interface that backs off. +type Backoff interface { + // Next returns the time duration to wait and whether to stop. + Next() (next time.Duration, stop bool) +} + +var _ Backoff = (BackoffFunc)(nil) + +// BackoffFunc is a backoff expressed as a function. +type BackoffFunc func() (time.Duration, bool) + +// Next implements Backoff. +func (b BackoffFunc) Next() (time.Duration, bool) { + return b() +} + +// WithJitter wraps a backoff function and adds the specified jitter. j can be +// interpreted as "+/- j". For example, if j were 5 seconds and the backoff +// returned 20s, the value could be between 15 and 25 seconds. The value can +// never be less than 0. +func WithJitter(j time.Duration, next Backoff) Backoff { + r := newLockedRandom(time.Now().UnixNano()) + + return BackoffFunc(func() (time.Duration, bool) { + val, stop := next.Next() + if stop { + return 0, true + } + + diff := time.Duration(r.Int63n(int64(j)*2) - int64(j)) + val = val + diff + if val < 0 { + val = 0 + } + return val, false + }) +} + +// WithJitterPercent wraps a backoff function and adds the specified jitter +// percentage. j can be interpreted as "+/- j%". For example, if j were 5 and +// the backoff returned 20s, the value could be between 19 and 21 seconds. The +// value can never be less than 0 or greater than 100. +func WithJitterPercent(j uint64, next Backoff) Backoff { + r := newLockedRandom(time.Now().UnixNano()) + + return BackoffFunc(func() (time.Duration, bool) { + val, stop := next.Next() + if stop { + return 0, true + } + + // Get a value between -j and j, the convert to a percentage + top := r.Int63n(int64(j)*2) - int64(j) + pct := 1 - float64(top)/100.0 + + val = time.Duration(float64(val) * pct) + if val < 0 { + val = 0 + } + return val, false + }) +} + +// WithMaxRetries executes the backoff function up until the maximum attempts. +func WithMaxRetries(max uint64, next Backoff) Backoff { + var l sync.Mutex + var attempt uint64 + + return BackoffFunc(func() (time.Duration, bool) { + l.Lock() + defer l.Unlock() + + if attempt >= max { + return 0, true + } + attempt++ + + val, stop := next.Next() + if stop { + return 0, true + } + + return val, false + }) +} + +// WithCappedDuration sets a maximum on the duration returned from the next +// backoff. This is NOT a total backoff time, but rather a cap on the maximum +// value a backoff can return. Without another middleware, the backoff will +// continue infinitely. +func WithCappedDuration(cap time.Duration, next Backoff) Backoff { + return BackoffFunc(func() (time.Duration, bool) { + val, stop := next.Next() + if stop { + return 0, true + } + + if val <= 0 || val > cap { + val = cap + } + return val, false + }) +} + +// WithMaxDuration sets a maximum on the total amount of time a backoff should +// execute. It's best-effort, and should not be used to guarantee an exact +// amount of time. +func WithMaxDuration(timeout time.Duration, next Backoff) Backoff { + start := time.Now() + + return BackoffFunc(func() (time.Duration, bool) { + diff := timeout - time.Since(start) + if diff <= 0 { + return 0, true + } + + val, stop := next.Next() + if stop { + return 0, true + } + + if val <= 0 || val > diff { + val = diff + } + return val, false + }) +} diff --git a/vendor/github.com/sethvargo/go-retry/backoff_constant.go b/vendor/github.com/sethvargo/go-retry/backoff_constant.go new file mode 100644 index 0000000..ef01fa0 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/backoff_constant.go @@ -0,0 +1,25 @@ +package retry + +import ( + "context" + "time" +) + +// Constant is a wrapper around Retry that uses a constant backoff. It panics if +// the given base is less than zero. +func Constant(ctx context.Context, t time.Duration, f RetryFunc) error { + return Do(ctx, NewConstant(t), f) +} + +// NewConstant creates a new constant backoff using the value t. The wait time +// is the provided constant value. It panics if the given base is less than +// zero. +func NewConstant(t time.Duration) Backoff { + if t <= 0 { + panic("t must be greater than 0") + } + + return BackoffFunc(func() (time.Duration, bool) { + return t, false + }) +} diff --git a/vendor/github.com/sethvargo/go-retry/backoff_exponential.go b/vendor/github.com/sethvargo/go-retry/backoff_exponential.go new file mode 100644 index 0000000..a85b498 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/backoff_exponential.go @@ -0,0 +1,47 @@ +package retry + +import ( + "context" + "math" + "sync/atomic" + "time" +) + +type exponentialBackoff struct { + base time.Duration + attempt uint64 +} + +// Exponential is a wrapper around Retry that uses an exponential backoff. See +// NewExponential. +func Exponential(ctx context.Context, base time.Duration, f RetryFunc) error { + return Do(ctx, NewExponential(base), f) +} + +// NewExponential creates a new exponential backoff using the starting value of +// base and doubling on each failure (1, 2, 4, 8, 16, 32, 64...), up to max. +// +// Once it overflows, the function constantly returns the maximum time.Duration +// for a 64-bit integer. +// +// It panics if the given base is less than zero. +func NewExponential(base time.Duration) Backoff { + if base <= 0 { + panic("base must be greater than 0") + } + + return &exponentialBackoff{ + base: base, + } +} + +// Next implements Backoff. It is safe for concurrent use. +func (b *exponentialBackoff) Next() (time.Duration, bool) { + next := b.base << (atomic.AddUint64(&b.attempt, 1) - 1) + if next <= 0 { + atomic.AddUint64(&b.attempt, ^uint64(0)) + next = math.MaxInt64 + } + + return next, false +} diff --git a/vendor/github.com/sethvargo/go-retry/backoff_fibonacci.go b/vendor/github.com/sethvargo/go-retry/backoff_fibonacci.go new file mode 100644 index 0000000..250a026 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/backoff_fibonacci.go @@ -0,0 +1,56 @@ +package retry + +import ( + "context" + "math" + "sync/atomic" + "time" + "unsafe" +) + +type state [2]time.Duration + +type fibonacciBackoff struct { + state unsafe.Pointer +} + +// Fibonacci is a wrapper around Retry that uses a Fibonacci backoff. See +// NewFibonacci. +func Fibonacci(ctx context.Context, base time.Duration, f RetryFunc) error { + return Do(ctx, NewFibonacci(base), f) +} + +// NewFibonacci creates a new Fibonacci backoff using the starting value of +// base. The wait time is the sum of the previous two wait times on each failed +// attempt (1, 1, 2, 3, 5, 8, 13...). +// +// Once it overflows, the function constantly returns the maximum time.Duration +// for a 64-bit integer. +// +// It panics if the given base is less than zero. +func NewFibonacci(base time.Duration) Backoff { + if base <= 0 { + panic("base must be greater than 0") + } + + return &fibonacciBackoff{ + state: unsafe.Pointer(&state{0, base}), + } +} + +// Next implements Backoff. It is safe for concurrent use. +func (b *fibonacciBackoff) Next() (time.Duration, bool) { + for { + curr := atomic.LoadPointer(&b.state) + currState := (*state)(curr) + next := currState[0] + currState[1] + + if next <= 0 { + return math.MaxInt64, false + } + + if atomic.CompareAndSwapPointer(&b.state, curr, unsafe.Pointer(&state{currState[1], next})) { + return next, false + } + } +} diff --git a/vendor/github.com/sethvargo/go-retry/rand.go b/vendor/github.com/sethvargo/go-retry/rand.go new file mode 100644 index 0000000..4799fb0 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/rand.go @@ -0,0 +1,54 @@ +package retry + +import ( + "math/rand" + "sync" +) + +type lockedSource struct { + src *rand.Rand + mu sync.Mutex +} + +var _ rand.Source64 = (*lockedSource)(nil) + +func newLockedRandom(seed int64) *lockedSource { + return &lockedSource{src: rand.New(rand.NewSource(seed))} +} + +// Int63 mimics math/rand.(*Rand).Int63 with mutex locked. +func (r *lockedSource) Int63() int64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.src.Int63() +} + +// Seed mimics math/rand.(*Rand).Seed with mutex locked. +func (r *lockedSource) Seed(seed int64) { + r.mu.Lock() + defer r.mu.Unlock() + r.src.Seed(seed) +} + +// Uint64 mimics math/rand.(*Rand).Uint64 with mutex locked. +func (r *lockedSource) Uint64() uint64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.src.Uint64() +} + +// Int63n mimics math/rand.(*Rand).Int63n with mutex locked. +func (r *lockedSource) Int63n(n int64) int64 { + if n <= 0 { + panic("invalid argument to Int63n") + } + if n&(n-1) == 0 { // n is power of two, can mask + return r.Int63() & (n - 1) + } + max := int64((1 << 63) - 1 - (1<<63)%uint64(n)) + v := r.Int63() + for v > max { + v = r.Int63() + } + return v % n +} diff --git a/vendor/github.com/sethvargo/go-retry/retry.go b/vendor/github.com/sethvargo/go-retry/retry.go new file mode 100644 index 0000000..a7e10b9 --- /dev/null +++ b/vendor/github.com/sethvargo/go-retry/retry.go @@ -0,0 +1,92 @@ +// Package retry provides helpers for retrying. +// +// This package defines flexible interfaces for retrying Go functions that may +// be flakey or eventually consistent. It abstracts the "backoff" (how long to +// wait between tries) and "retry" (execute the function again) mechanisms for +// maximum flexibility. Furthermore, everything is an interface, so you can +// define your own implementations. +// +// The package is modeled after Go's built-in HTTP package, making it easy to +// customize the built-in backoff with your own custom logic. Additionally, +// callers specify which errors are retryable by wrapping them. This is helpful +// with complex operations where only certain results should retry. +package retry + +import ( + "context" + "errors" + "time" +) + +// RetryFunc is a function passed to retry. +type RetryFunc func(ctx context.Context) error + +type retryableError struct { + err error +} + +// RetryableError marks an error as retryable. +func RetryableError(err error) error { + if err == nil { + return nil + } + return &retryableError{err} +} + +// Unwrap implements error wrapping. +func (e *retryableError) Unwrap() error { + return e.err +} + +// Error returns the error string. +func (e *retryableError) Error() string { + if e.err == nil { + return "retryable: " + } + return "retryable: " + e.err.Error() +} + +// Do wraps a function with a backoff to retry. The provided context is the same +// context passed to the RetryFunc. +func Do(ctx context.Context, b Backoff, f RetryFunc) error { + for { + // Return immediately if ctx is canceled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + err := f(ctx) + if err == nil { + return nil + } + + // Not retryable + var rerr *retryableError + if !errors.As(err, &rerr) { + return err + } + + next, stop := b.Next() + if stop { + return rerr.Unwrap() + } + + // ctx.Done() has priority, so we test it alone first + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + t := time.NewTimer(next) + select { + case <-ctx.Done(): + t.Stop() + return ctx.Err() + case <-t.C: + continue + } + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e98b3e9..e673842 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -46,6 +46,9 @@ github.com/miekg/dns # github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 ## explicit github.com/sean-/seed +# github.com/sethvargo/go-retry v0.2.4 +## explicit; go 1.16 +github.com/sethvargo/go-retry # github.com/tidwall/resp v0.1.1 ## explicit; go 1.19 github.com/tidwall/resp