mirror of
https://github.com/oarkflow/mq.git
synced 2025-11-02 20:04:02 +08:00
init: publisher
This commit is contained in:
29
consumer.go
29
consumer.go
@@ -5,10 +5,11 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/oarkflow/mq/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Consumer struct {
|
type Consumer struct {
|
||||||
@@ -109,38 +110,26 @@ func (c *Consumer) readMessage(ctx context.Context, message []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
maxRetries = 5
|
|
||||||
initialDelay = 2 * time.Second
|
|
||||||
maxBackoff = 30 * time.Second // Upper limit for backoff delay
|
|
||||||
jitterPercent = 0.5 // 50% jitter
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *Consumer) AttemptConnect() error {
|
func (c *Consumer) AttemptConnect() error {
|
||||||
var conn net.Conn
|
var conn net.Conn
|
||||||
var err error
|
var err error
|
||||||
delay := initialDelay
|
delay := c.opts.initialDelay
|
||||||
for i := 0; i < maxRetries; i++ {
|
for i := 0; i < c.opts.maxRetries; i++ {
|
||||||
conn, err = net.Dial("tcp", c.opts.brokerAddr)
|
conn, err = net.Dial("tcp", c.opts.brokerAddr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.conn = conn
|
c.conn = conn
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
sleepDuration := calculateJitter(delay)
|
sleepDuration := utils.CalculateJitter(delay, c.opts.jitterPercent)
|
||||||
fmt.Printf("Failed connecting to %s (attempt %d/%d): %v, Retrying in %v...\n", c.opts.brokerAddr, i+1, maxRetries, err, sleepDuration)
|
fmt.Printf("Failed connecting to %s (attempt %d/%d): %v, Retrying in %v...\n", c.opts.brokerAddr, i+1, c.opts.maxRetries, err, sleepDuration)
|
||||||
time.Sleep(sleepDuration)
|
time.Sleep(sleepDuration)
|
||||||
delay *= 2
|
delay *= 2
|
||||||
if delay > maxBackoff {
|
if delay > c.opts.maxBackoff {
|
||||||
delay = maxBackoff
|
delay = c.opts.maxBackoff
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("could not connect to server %s after %d attempts: %w", c.opts.brokerAddr, maxRetries, err)
|
return fmt.Errorf("could not connect to server %s after %d attempts: %w", c.opts.brokerAddr, c.opts.maxRetries, err)
|
||||||
}
|
|
||||||
|
|
||||||
func calculateJitter(baseDelay time.Duration) time.Duration {
|
|
||||||
jitter := time.Duration(rand.Float64()*jitterPercent*float64(baseDelay)) - time.Duration(jitterPercent*float64(baseDelay)/2)
|
|
||||||
return baseDelay + jitter
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) readConn(ctx context.Context, conn net.Conn, message []byte) error {
|
func (c *Consumer) readConn(ctx context.Context, conn net.Conn, message []byte) error {
|
||||||
|
|||||||
11
utils/retry.go
Normal file
11
utils/retry.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func CalculateJitter(baseDelay time.Duration, percent float64) time.Duration {
|
||||||
|
jitter := time.Duration(rand.Float64()*percent*float64(baseDelay)) - time.Duration(percent*float64(baseDelay)/2)
|
||||||
|
return baseDelay + jitter
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user