This commit is contained in:
sujit
2025-07-30 10:14:20 +05:45
parent 7fa6f3a658
commit 2829e73450
7 changed files with 338 additions and 112 deletions

View File

@@ -764,6 +764,8 @@ func (a *AdminServer) handlePauseConsumer(w http.ResponseWriter, r *http.Request
a.logger.Info("Consumer pause requested", logger.Field{Key: "consumer_id", Value: consumerID}) a.logger.Info("Consumer pause requested", logger.Field{Key: "consumer_id", Value: consumerID})
a.broker.PauseConsumer(r.Context(), consumerID)
// In a real implementation, you would find the consumer and pause it // In a real implementation, you would find the consumer and pause it
// For now, we'll just acknowledge the request // For now, we'll just acknowledge the request
@@ -799,6 +801,8 @@ func (a *AdminServer) handleResumeConsumer(w http.ResponseWriter, r *http.Reques
a.logger.Info("Consumer resume requested", logger.Field{Key: "consumer_id", Value: consumerID}) a.logger.Info("Consumer resume requested", logger.Field{Key: "consumer_id", Value: consumerID})
a.broker.ResumeConsumer(r.Context(), consumerID)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@@ -831,6 +835,8 @@ func (a *AdminServer) handleStopConsumer(w http.ResponseWriter, r *http.Request)
a.logger.Info("Consumer stop requested", logger.Field{Key: "consumer_id", Value: consumerID}) a.logger.Info("Consumer stop requested", logger.Field{Key: "consumer_id", Value: consumerID})
a.broker.StopConsumer(r.Context(), consumerID)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
@@ -1018,59 +1024,11 @@ func (a *AdminServer) getQueues() []*AdminQueueInfo {
} }
func (a *AdminServer) getConsumers() []*AdminConsumerMetrics { func (a *AdminServer) getConsumers() []*AdminConsumerMetrics {
// This would need to be implemented based on how you track consumers return a.broker.GetConsumers()
// For now, return sample data as placeholder
consumers := []*AdminConsumerMetrics{
{
ID: "consumer-1",
Queue: "demo_queue",
Status: "active",
ProcessedTasks: 150,
ErrorCount: 2,
LastActivity: time.Now().Add(-30 * time.Second),
MaxConcurrentTasks: 10,
TaskTimeout: 30,
MaxRetries: 3,
},
{
ID: "consumer-2",
Queue: "priority_queue",
Status: "paused",
ProcessedTasks: 89,
ErrorCount: 0,
LastActivity: time.Now().Add(-2 * time.Minute),
MaxConcurrentTasks: 5,
TaskTimeout: 60,
MaxRetries: 5,
},
}
return consumers
} }
func (a *AdminServer) getPools() []*AdminPoolMetrics { func (a *AdminServer) getPools() []*AdminPoolMetrics {
// This would need to be implemented based on how you track pools return a.broker.GetPools()
// For now, return sample data as placeholder
pools := []*AdminPoolMetrics{
{
ID: "pool-1",
Workers: 10,
QueueSize: 100,
ActiveTasks: 7,
Status: "running",
MaxMemoryLoad: 1024 * 1024 * 512, // 512MB
LastActivity: time.Now().Add(-10 * time.Second),
},
{
ID: "pool-2",
Workers: 5,
QueueSize: 50,
ActiveTasks: 2,
Status: "running",
MaxMemoryLoad: 1024 * 1024 * 256, // 256MB
LastActivity: time.Now().Add(-1 * time.Minute),
},
}
return pools
} }
func (a *AdminServer) getBrokerInfo() *AdminBrokerInfo { func (a *AdminServer) getBrokerInfo() *AdminBrokerInfo {

View File

@@ -37,20 +37,23 @@ type Processor interface {
} }
type Consumer struct { type Consumer struct {
conn net.Conn conn net.Conn
handler Handler handler Handler
pool *Pool pool *Pool
opts *Options opts *Options
id string id string
queue string queue string
pIDs storage.IMap[string, bool] pIDs storage.IMap[string, bool]
connMutex sync.RWMutex connMutex sync.RWMutex
isConnected int32 // atomic flag isConnected int32 // atomic flag
isShutdown int32 // atomic flag isShutdown int32 // atomic flag
shutdown chan struct{} shutdown chan struct{}
reconnectCh chan struct{} reconnectCh chan struct{}
healthTicker *time.Ticker healthTicker *time.Ticker
logger logger.Logger logger logger.Logger
reconnectAttempts int32 // track consecutive reconnection attempts
lastReconnectAttempt time.Time // track when last reconnect was attempted
reconnectMutex sync.Mutex // protect reconnection attempt tracking
} }
func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer { func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer {
@@ -587,9 +590,11 @@ func (c *Consumer) Consume(ctx context.Context) error {
c.logger.Info("Reconnection triggered", c.logger.Info("Reconnection triggered",
logger.Field{Key: "consumer_id", Value: c.id}) logger.Field{Key: "consumer_id", Value: c.id})
if err := c.handleReconnection(ctx); err != nil { if err := c.handleReconnection(ctx); err != nil {
c.logger.Error("Reconnection failed", c.logger.Error("Reconnection failed, will retry based on backoff policy",
logger.Field{Key: "consumer_id", Value: c.id}, logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "error", Value: err.Error()}) logger.Field{Key: "error", Value: err.Error()})
// The handleReconnection method now implements its own backoff,
// so we don't need to do anything here except continue the loop
} }
default: default:
@@ -635,10 +640,76 @@ func (c *Consumer) processWithTimeout(ctx context.Context) error {
} }
// Read message without timeout - consumer should be long-running background service // Read message without timeout - consumer should be long-running background service
return c.readMessage(ctx, conn) err := c.readMessage(ctx, conn)
// If message was processed successfully, reset reconnection attempts
if err == nil {
if atomic.LoadInt32(&c.reconnectAttempts) > 0 {
atomic.StoreInt32(&c.reconnectAttempts, 0)
c.logger.Debug("Reset reconnection attempts after successful message processing",
logger.Field{Key: "consumer_id", Value: c.id})
}
}
return err
} }
func (c *Consumer) handleReconnection(ctx context.Context) error { func (c *Consumer) handleReconnection(ctx context.Context) error {
c.reconnectMutex.Lock()
defer c.reconnectMutex.Unlock()
// Increment reconnection attempts
attempts := atomic.AddInt32(&c.reconnectAttempts, 1)
// Calculate backoff based on consecutive attempts
backoffDelay := utils.CalculateJitter(
c.opts.initialDelay*time.Duration(1<<minInt(int(attempts-1), 6)), // Cap exponential growth at 2^6
c.opts.jitterPercent,
)
// Cap maximum backoff
if backoffDelay > c.opts.maxBackoff {
backoffDelay = c.opts.maxBackoff
}
// If we've been reconnecting too frequently, implement circuit breaker logic
timeSinceLastAttempt := time.Since(c.lastReconnectAttempt)
if attempts > 1 && timeSinceLastAttempt < backoffDelay {
remainingWait := backoffDelay - timeSinceLastAttempt
c.logger.Warn("Throttling reconnection attempt",
logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "consecutive_attempts", Value: int(attempts)},
logger.Field{Key: "wait_duration", Value: remainingWait.String()})
// Wait with context cancellation support
select {
case <-time.After(remainingWait):
case <-ctx.Done():
return ctx.Err()
case <-c.shutdown:
return fmt.Errorf("consumer is shutdown")
}
}
c.lastReconnectAttempt = time.Now()
// If we've exceeded reasonable attempts, implement longer backoff
if attempts > int32(c.opts.maxRetries*2) {
longBackoff := 5 * time.Minute // Long circuit breaker period
c.logger.Warn("Too many consecutive reconnection attempts, entering long backoff",
logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "consecutive_attempts", Value: int(attempts)},
logger.Field{Key: "backoff_duration", Value: longBackoff.String()})
select {
case <-time.After(longBackoff):
case <-ctx.Done():
return ctx.Err()
case <-c.shutdown:
return fmt.Errorf("consumer is shutdown")
}
}
// Mark as disconnected // Mark as disconnected
atomic.StoreInt32(&c.isConnected, 0) atomic.StoreInt32(&c.isConnected, 0)
@@ -650,48 +721,42 @@ func (c *Consumer) handleReconnection(ctx context.Context) error {
} }
c.connMutex.Unlock() c.connMutex.Unlock()
// Attempt reconnection with exponential backoff c.logger.Info("Attempting reconnection",
backoff := c.opts.initialDelay logger.Field{Key: "consumer_id", Value: c.id},
maxRetries := c.opts.maxRetries logger.Field{Key: "attempt", Value: int(attempts)},
logger.Field{Key: "backoff_delay", Value: backoffDelay.String()})
for attempt := 1; attempt <= maxRetries; attempt++ { // Attempt reconnection
if atomic.LoadInt32(&c.isShutdown) == 1 { if err := c.attemptConnect(); err != nil {
return fmt.Errorf("consumer is shutdown") c.logger.Error("Reconnection failed",
} logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "attempt", Value: int(attempts)},
if err := c.attemptConnect(); err != nil { logger.Field{Key: "error", Value: err.Error()})
if attempt == maxRetries { return fmt.Errorf("failed to reconnect (attempt %d): %w", attempts, err)
return fmt.Errorf("failed to reconnect after %d attempts: %w", maxRetries, err)
}
sleepDuration := utils.CalculateJitter(backoff, c.opts.jitterPercent)
c.logger.Warn("Reconnection attempt failed, retrying",
logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "attempt", Value: fmt.Sprintf("%d/%d", attempt, maxRetries)},
logger.Field{Key: "retry_in", Value: sleepDuration.String()})
time.Sleep(sleepDuration)
backoff *= 2
if backoff > c.opts.maxBackoff {
backoff = c.opts.maxBackoff
}
continue
}
// Reconnection successful, resubscribe
if err := c.subscribe(ctx, c.queue); err != nil {
c.logger.Error("Failed to resubscribe after reconnection",
logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "error", Value: err.Error()})
continue
}
c.logger.Info("Successfully reconnected and resubscribed",
logger.Field{Key: "consumer_id", Value: c.id})
return nil
} }
return fmt.Errorf("failed to reconnect") // Reconnection successful, try to resubscribe
if err := c.subscribe(ctx, c.queue); err != nil {
c.logger.Error("Failed to resubscribe after reconnection",
logger.Field{Key: "consumer_id", Value: c.id},
logger.Field{Key: "error", Value: err.Error()})
return fmt.Errorf("failed to resubscribe after reconnection: %w", err)
}
// Reset reconnection attempts on successful reconnection
atomic.StoreInt32(&c.reconnectAttempts, 0)
c.logger.Info("Successfully reconnected and resubscribed",
logger.Field{Key: "consumer_id", Value: c.id})
return nil
}
// Helper function to get minimum of two integers
func minInt(a, b int) int {
if a < b {
return a
}
return b
} }
func isConnectionError(err error) bool { func isConnectionError(err error) bool {

View File

@@ -19,7 +19,7 @@ func main() {
fmt.Println("✅ Logger created") fmt.Println("✅ Logger created")
// Create broker // Create broker
broker := mq.NewBroker(mq.WithLogger(lg)) broker := mq.NewBroker(mq.WithLogger(lg), mq.WithBrokerURL(":8081"))
fmt.Println("✅ Broker created") fmt.Println("✅ Broker created")
// Start broker // Start broker

View File

@@ -810,7 +810,7 @@ class MQAdminDashboard {
const consumer = this.data.consumers.find(c => c.id === consumerId); const consumer = this.data.consumers.find(c => c.id === consumerId);
const action = consumer?.status === 'paused' ? 'resume' : 'pause'; const action = consumer?.status === 'paused' ? 'resume' : 'pause';
const response = await fetch(`/api/admin/consumers/${consumerId}/${action}`, { const response = await fetch(`/api/admin/consumers/${action}?id=${consumerId}`, {
method: 'POST' method: 'POST'
}); });
@@ -828,7 +828,7 @@ class MQAdminDashboard {
async stopConsumer(consumerId) { async stopConsumer(consumerId) {
this.confirmAction('stop this consumer', async () => { this.confirmAction('stop this consumer', async () => {
try { try {
const response = await fetch(`/api/admin/consumers/${consumerId}/stop`, { const response = await fetch(`/api/admin/consumers/stop?id=${consumerId}`, {
method: 'POST' method: 'POST'
}); });

View File

@@ -0,0 +1,71 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/oarkflow/mq"
)
// Simple task handler for testing
func testHandler(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("Processing task: %s\n", task.ID)
return mq.Result{
Status: "SUCCESS",
Payload: []byte(fmt.Sprintf(`{"message": "Processed task %s"}`, task.ID)),
}
}
func main() {
fmt.Println("=== Consumer Reconnection Test ===")
fmt.Println("This test demonstrates improved reconnection behavior with jitter retry.")
fmt.Println("Start a broker on :8081 and then stop it to see reconnection attempts.")
fmt.Println()
// Create consumer with custom retry configuration
consumer := mq.NewConsumer(
"test-consumer",
"test-queue",
testHandler,
mq.WithBrokerURL(":8081"),
mq.WithMaxRetries(3), // Limit initial connection retries
mq.WithInitialDelay(1*time.Second), // Start with 1 second delay
mq.WithMaxBackoff(30*time.Second), // Cap at 30 seconds
mq.WithHTTPApi(true),
mq.WithWorkerPool(10, 2, 1000),
)
// Start consumer in a goroutine so we can observe the behavior
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
fmt.Println("Starting consumer...")
if err := consumer.Consume(ctx); err != nil {
log.Printf("Consumer stopped with error: %v", err)
}
}()
// Keep the main thread alive to observe reconnection behavior
fmt.Println("Consumer started. You can now:")
fmt.Println("1. Start a broker on :8081")
fmt.Println("2. Stop the broker to trigger reconnection attempts")
fmt.Println("3. Restart the broker to see successful reconnection")
fmt.Println("4. Press Ctrl+C to exit")
fmt.Println()
fmt.Println("Watch the logs to see the improved reconnection behavior with:")
fmt.Println("- Exponential backoff with jitter")
fmt.Println("- Throttling of rapid reconnection attempts")
fmt.Println("- Long backoff periods after many failed attempts")
fmt.Println("- Reset of retry counters on successful connections")
// Sleep for a long time to observe behavior
time.Sleep(10 * time.Minute)
// Graceful shutdown
fmt.Println("Shutting down consumer...")
consumer.Close()
fmt.Println("Test completed.")
}

136
mq.go
View File

@@ -339,9 +339,18 @@ type QueuedTask struct {
} }
type consumer struct { type consumer struct {
conn net.Conn conn net.Conn
id string id string
state consts.ConsumerState state consts.ConsumerState
queue string
pool *Pool
metrics *ConsumerMetrics
}
type ConsumerMetrics struct {
ProcessedTasks int64
ErrorCount int64
LastActivity time.Time
} }
type publisher struct { type publisher struct {
@@ -818,13 +827,41 @@ func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Con
if !ok { if !ok {
q = b.NewQueue(queueName) q = b.NewQueue(queueName)
} }
con := &consumer{id: consumerID, conn: conn}
// Create consumer with proper initialization
con := &consumer{
id: consumerID,
conn: conn,
state: consts.ConsumerStateActive,
queue: queueName,
pool: nil, // Pool will be set when consumer connects
metrics: &ConsumerMetrics{
ProcessedTasks: 0,
ErrorCount: 0,
LastActivity: time.Now(),
},
}
b.consumers.Set(consumerID, con) b.consumers.Set(consumerID, con)
q.consumers.Set(consumerID, con) q.consumers.Set(consumerID, con)
log.Printf("BROKER - SUBSCRIBE ~> %s on %s", consumerID, queueName) log.Printf("BROKER - SUBSCRIBE ~> %s on %s", consumerID, queueName)
return consumerID return consumerID
} }
func (b *Broker) UpdateConsumerPool(consumerID string, pool *Pool) {
if con, exists := b.consumers.Get(consumerID); exists {
con.pool = pool
}
}
func (b *Broker) UpdateConsumerMetrics(consumerID string, processedTasks, errorCount int64) {
if con, exists := b.consumers.Get(consumerID); exists && con.metrics != nil {
con.metrics.ProcessedTasks = processedTasks
con.metrics.ErrorCount = errorCount
con.metrics.LastActivity = time.Now()
}
}
func (b *Broker) RemoveConsumer(consumerID string, queues ...string) { func (b *Broker) RemoveConsumer(consumerID string, queues ...string) {
if len(queues) > 0 { if len(queues) > 0 {
for _, queueName := range queues { for _, queueName := range queues {
@@ -1709,3 +1746,94 @@ func (cb *EnhancedCircuitBreaker) Call(fn func() error) error {
type InMemoryMessageStore struct { type InMemoryMessageStore struct {
messages storage.IMap[string, *StoredMessage] messages storage.IMap[string, *StoredMessage]
} }
func (b *Broker) GetConsumers() []*AdminConsumerMetrics {
consumers := []*AdminConsumerMetrics{}
b.consumers.ForEach(func(id string, con *consumer) bool {
// Get status based on consumer state
status := "active"
switch con.state {
case consts.ConsumerStateActive:
status = "active"
case consts.ConsumerStatePaused:
status = "paused"
case consts.ConsumerStateStopped:
status = "stopped"
}
// Handle cases where pool might be nil
maxConcurrentTasks := 0
taskTimeout := 0
maxRetries := 0
if con.pool != nil {
config := con.pool.GetCurrentConfig()
maxConcurrentTasks = config.NumberOfWorkers
taskTimeout = int(config.Timeout.Seconds())
maxRetries = config.MaxRetries
}
// Ensure metrics is not nil
processedTasks := int64(0)
errorCount := int64(0)
lastActivity := time.Now()
if con.metrics != nil {
processedTasks = con.metrics.ProcessedTasks
errorCount = con.metrics.ErrorCount
lastActivity = con.metrics.LastActivity
}
consumers = append(consumers, &AdminConsumerMetrics{
ID: id,
Queue: con.queue,
Status: status,
ProcessedTasks: processedTasks,
ErrorCount: errorCount,
LastActivity: lastActivity,
MaxConcurrentTasks: maxConcurrentTasks,
TaskTimeout: taskTimeout,
MaxRetries: maxRetries,
})
return true
})
return consumers
}
func (b *Broker) GetPools() []*AdminPoolMetrics {
pools := []*AdminPoolMetrics{}
b.queues.ForEach(func(name string, queue *Queue) bool {
// Initialize default values
workers := 0
queueSize := 0
activeTasks := 0
maxMemoryLoad := int64(0)
lastActivity := time.Now()
// Get metrics from queue if available
if queue.metrics != nil {
workers = queue.metrics.WorkerCount
queueSize = queue.metrics.QueueDepth
activeTasks = queue.metrics.ActiveTasks
maxMemoryLoad = queue.metrics.MaxMemoryLoad
lastActivity = queue.metrics.LastActivity
}
// If metrics are empty, try to get some basic info from the queue
if queueSize == 0 && queue.tasks != nil {
queueSize = len(queue.tasks)
}
pools = append(pools, &AdminPoolMetrics{
ID: name,
Workers: workers,
QueueSize: queueSize,
ActiveTasks: activeTasks,
Status: "running", // Default status
MaxMemoryLoad: maxMemoryLoad,
LastActivity: lastActivity,
})
return true
})
return pools
}

View File

@@ -31,6 +31,10 @@ type QueueMetrics struct {
CurrentDepth int64 `json:"current_depth"` CurrentDepth int64 `json:"current_depth"`
AverageLatency time.Duration `json:"average_latency"` AverageLatency time.Duration `json:"average_latency"`
LastActivity time.Time `json:"last_activity"` LastActivity time.Time `json:"last_activity"`
WorkerCount int `json:"worker_count"`
QueueDepth int `json:"queue_depth"`
ActiveTasks int `json:"active_tasks"`
MaxMemoryLoad int64 `json:"max_memory_load"`
} }
func newQueue(name string, queueSize int) *Queue { func newQueue(name string, queueSize int) *Queue {