From 2829e7345018d17339be9b3fd5d8cb2e46997899 Mon Sep 17 00:00:00 2001 From: sujit Date: Wed, 30 Jul 2025 10:14:20 +0545 Subject: [PATCH] update --- admin_server.go | 58 +----- consumer.go | 175 ++++++++++++------ examples/minimal_admin/main.go | 2 +- .../minimal_admin/static/admin/js/admin.js | 4 +- examples/reconnection_test/main.go | 71 +++++++ mq.go | 136 +++++++++++++- task.go | 4 + 7 files changed, 338 insertions(+), 112 deletions(-) create mode 100644 examples/reconnection_test/main.go diff --git a/admin_server.go b/admin_server.go index 5ca9cad..141037c 100644 --- a/admin_server.go +++ b/admin_server.go @@ -764,6 +764,8 @@ func (a *AdminServer) handlePauseConsumer(w http.ResponseWriter, r *http.Request a.logger.Info("Consumer pause requested", logger.Field{Key: "consumer_id", Value: consumerID}) + a.broker.PauseConsumer(r.Context(), consumerID) + // In a real implementation, you would find the consumer and pause it // For now, we'll just acknowledge the request @@ -799,6 +801,8 @@ func (a *AdminServer) handleResumeConsumer(w http.ResponseWriter, r *http.Reques a.logger.Info("Consumer resume requested", logger.Field{Key: "consumer_id", Value: consumerID}) + a.broker.ResumeConsumer(r.Context(), consumerID) + w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(http.StatusOK) @@ -831,6 +835,8 @@ func (a *AdminServer) handleStopConsumer(w http.ResponseWriter, r *http.Request) a.logger.Info("Consumer stop requested", logger.Field{Key: "consumer_id", Value: consumerID}) + a.broker.StopConsumer(r.Context(), consumerID) + w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.WriteHeader(http.StatusOK) @@ -1018,59 +1024,11 @@ func (a *AdminServer) getQueues() []*AdminQueueInfo { } func (a *AdminServer) getConsumers() []*AdminConsumerMetrics { - // This would need to be implemented based on how you track consumers - // For now, return sample data as placeholder - consumers := []*AdminConsumerMetrics{ - { - ID: "consumer-1", - Queue: "demo_queue", - Status: "active", - ProcessedTasks: 150, - ErrorCount: 2, - LastActivity: time.Now().Add(-30 * time.Second), - MaxConcurrentTasks: 10, - TaskTimeout: 30, - MaxRetries: 3, - }, - { - ID: "consumer-2", - Queue: "priority_queue", - Status: "paused", - ProcessedTasks: 89, - ErrorCount: 0, - LastActivity: time.Now().Add(-2 * time.Minute), - MaxConcurrentTasks: 5, - TaskTimeout: 60, - MaxRetries: 5, - }, - } - return consumers + return a.broker.GetConsumers() } func (a *AdminServer) getPools() []*AdminPoolMetrics { - // This would need to be implemented based on how you track pools - // For now, return sample data as placeholder - pools := []*AdminPoolMetrics{ - { - ID: "pool-1", - Workers: 10, - QueueSize: 100, - ActiveTasks: 7, - Status: "running", - MaxMemoryLoad: 1024 * 1024 * 512, // 512MB - LastActivity: time.Now().Add(-10 * time.Second), - }, - { - ID: "pool-2", - Workers: 5, - QueueSize: 50, - ActiveTasks: 2, - Status: "running", - MaxMemoryLoad: 1024 * 1024 * 256, // 256MB - LastActivity: time.Now().Add(-1 * time.Minute), - }, - } - return pools + return a.broker.GetPools() } func (a *AdminServer) getBrokerInfo() *AdminBrokerInfo { diff --git a/consumer.go b/consumer.go index 569fa62..181f0b2 100644 --- a/consumer.go +++ b/consumer.go @@ -37,20 +37,23 @@ type Processor interface { } type Consumer struct { - conn net.Conn - handler Handler - pool *Pool - opts *Options - id string - queue string - pIDs storage.IMap[string, bool] - connMutex sync.RWMutex - isConnected int32 // atomic flag - isShutdown int32 // atomic flag - shutdown chan struct{} - reconnectCh chan struct{} - healthTicker *time.Ticker - logger logger.Logger + conn net.Conn + handler Handler + pool *Pool + opts *Options + id string + queue string + pIDs storage.IMap[string, bool] + connMutex sync.RWMutex + isConnected int32 // atomic flag + isShutdown int32 // atomic flag + shutdown chan struct{} + reconnectCh chan struct{} + healthTicker *time.Ticker + logger logger.Logger + reconnectAttempts int32 // track consecutive reconnection attempts + lastReconnectAttempt time.Time // track when last reconnect was attempted + reconnectMutex sync.Mutex // protect reconnection attempt tracking } func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer { @@ -587,9 +590,11 @@ func (c *Consumer) Consume(ctx context.Context) error { c.logger.Info("Reconnection triggered", logger.Field{Key: "consumer_id", Value: c.id}) if err := c.handleReconnection(ctx); err != nil { - c.logger.Error("Reconnection failed", + c.logger.Error("Reconnection failed, will retry based on backoff policy", logger.Field{Key: "consumer_id", Value: c.id}, logger.Field{Key: "error", Value: err.Error()}) + // The handleReconnection method now implements its own backoff, + // so we don't need to do anything here except continue the loop } default: @@ -635,10 +640,76 @@ func (c *Consumer) processWithTimeout(ctx context.Context) error { } // Read message without timeout - consumer should be long-running background service - return c.readMessage(ctx, conn) + err := c.readMessage(ctx, conn) + + // If message was processed successfully, reset reconnection attempts + if err == nil { + if atomic.LoadInt32(&c.reconnectAttempts) > 0 { + atomic.StoreInt32(&c.reconnectAttempts, 0) + c.logger.Debug("Reset reconnection attempts after successful message processing", + logger.Field{Key: "consumer_id", Value: c.id}) + } + } + + return err } func (c *Consumer) handleReconnection(ctx context.Context) error { + c.reconnectMutex.Lock() + defer c.reconnectMutex.Unlock() + + // Increment reconnection attempts + attempts := atomic.AddInt32(&c.reconnectAttempts, 1) + + // Calculate backoff based on consecutive attempts + backoffDelay := utils.CalculateJitter( + c.opts.initialDelay*time.Duration(1< c.opts.maxBackoff { + backoffDelay = c.opts.maxBackoff + } + + // If we've been reconnecting too frequently, implement circuit breaker logic + timeSinceLastAttempt := time.Since(c.lastReconnectAttempt) + if attempts > 1 && timeSinceLastAttempt < backoffDelay { + remainingWait := backoffDelay - timeSinceLastAttempt + c.logger.Warn("Throttling reconnection attempt", + logger.Field{Key: "consumer_id", Value: c.id}, + logger.Field{Key: "consecutive_attempts", Value: int(attempts)}, + logger.Field{Key: "wait_duration", Value: remainingWait.String()}) + + // Wait with context cancellation support + select { + case <-time.After(remainingWait): + case <-ctx.Done(): + return ctx.Err() + case <-c.shutdown: + return fmt.Errorf("consumer is shutdown") + } + } + + c.lastReconnectAttempt = time.Now() + + // If we've exceeded reasonable attempts, implement longer backoff + if attempts > int32(c.opts.maxRetries*2) { + longBackoff := 5 * time.Minute // Long circuit breaker period + c.logger.Warn("Too many consecutive reconnection attempts, entering long backoff", + logger.Field{Key: "consumer_id", Value: c.id}, + logger.Field{Key: "consecutive_attempts", Value: int(attempts)}, + logger.Field{Key: "backoff_duration", Value: longBackoff.String()}) + + select { + case <-time.After(longBackoff): + case <-ctx.Done(): + return ctx.Err() + case <-c.shutdown: + return fmt.Errorf("consumer is shutdown") + } + } + // Mark as disconnected atomic.StoreInt32(&c.isConnected, 0) @@ -650,48 +721,42 @@ func (c *Consumer) handleReconnection(ctx context.Context) error { } c.connMutex.Unlock() - // Attempt reconnection with exponential backoff - backoff := c.opts.initialDelay - maxRetries := c.opts.maxRetries + c.logger.Info("Attempting reconnection", + logger.Field{Key: "consumer_id", Value: c.id}, + logger.Field{Key: "attempt", Value: int(attempts)}, + logger.Field{Key: "backoff_delay", Value: backoffDelay.String()}) - for attempt := 1; attempt <= maxRetries; attempt++ { - if atomic.LoadInt32(&c.isShutdown) == 1 { - return fmt.Errorf("consumer is shutdown") - } - - if err := c.attemptConnect(); err != nil { - if attempt == maxRetries { - return fmt.Errorf("failed to reconnect after %d attempts: %w", maxRetries, err) - } - - sleepDuration := utils.CalculateJitter(backoff, c.opts.jitterPercent) - c.logger.Warn("Reconnection attempt failed, retrying", - logger.Field{Key: "consumer_id", Value: c.id}, - logger.Field{Key: "attempt", Value: fmt.Sprintf("%d/%d", attempt, maxRetries)}, - logger.Field{Key: "retry_in", Value: sleepDuration.String()}) - - time.Sleep(sleepDuration) - backoff *= 2 - if backoff > c.opts.maxBackoff { - backoff = c.opts.maxBackoff - } - continue - } - - // Reconnection successful, resubscribe - if err := c.subscribe(ctx, c.queue); err != nil { - c.logger.Error("Failed to resubscribe after reconnection", - logger.Field{Key: "consumer_id", Value: c.id}, - logger.Field{Key: "error", Value: err.Error()}) - continue - } - - c.logger.Info("Successfully reconnected and resubscribed", - logger.Field{Key: "consumer_id", Value: c.id}) - return nil + // Attempt reconnection + if err := c.attemptConnect(); err != nil { + c.logger.Error("Reconnection failed", + logger.Field{Key: "consumer_id", Value: c.id}, + logger.Field{Key: "attempt", Value: int(attempts)}, + logger.Field{Key: "error", Value: err.Error()}) + return fmt.Errorf("failed to reconnect (attempt %d): %w", attempts, err) } - return fmt.Errorf("failed to reconnect") + // Reconnection successful, try to resubscribe + if err := c.subscribe(ctx, c.queue); err != nil { + c.logger.Error("Failed to resubscribe after reconnection", + logger.Field{Key: "consumer_id", Value: c.id}, + logger.Field{Key: "error", Value: err.Error()}) + return fmt.Errorf("failed to resubscribe after reconnection: %w", err) + } + + // Reset reconnection attempts on successful reconnection + atomic.StoreInt32(&c.reconnectAttempts, 0) + + c.logger.Info("Successfully reconnected and resubscribed", + logger.Field{Key: "consumer_id", Value: c.id}) + return nil +} + +// Helper function to get minimum of two integers +func minInt(a, b int) int { + if a < b { + return a + } + return b } func isConnectionError(err error) bool { diff --git a/examples/minimal_admin/main.go b/examples/minimal_admin/main.go index 32b1256..0d7a695 100644 --- a/examples/minimal_admin/main.go +++ b/examples/minimal_admin/main.go @@ -19,7 +19,7 @@ func main() { fmt.Println("✅ Logger created") // Create broker - broker := mq.NewBroker(mq.WithLogger(lg)) + broker := mq.NewBroker(mq.WithLogger(lg), mq.WithBrokerURL(":8081")) fmt.Println("✅ Broker created") // Start broker diff --git a/examples/minimal_admin/static/admin/js/admin.js b/examples/minimal_admin/static/admin/js/admin.js index b2ca3a3..6fdb104 100644 --- a/examples/minimal_admin/static/admin/js/admin.js +++ b/examples/minimal_admin/static/admin/js/admin.js @@ -810,7 +810,7 @@ class MQAdminDashboard { const consumer = this.data.consumers.find(c => c.id === consumerId); const action = consumer?.status === 'paused' ? 'resume' : 'pause'; - const response = await fetch(`/api/admin/consumers/${consumerId}/${action}`, { + const response = await fetch(`/api/admin/consumers/${action}?id=${consumerId}`, { method: 'POST' }); @@ -828,7 +828,7 @@ class MQAdminDashboard { async stopConsumer(consumerId) { this.confirmAction('stop this consumer', async () => { try { - const response = await fetch(`/api/admin/consumers/${consumerId}/stop`, { + const response = await fetch(`/api/admin/consumers/stop?id=${consumerId}`, { method: 'POST' }); diff --git a/examples/reconnection_test/main.go b/examples/reconnection_test/main.go new file mode 100644 index 0000000..8be80ab --- /dev/null +++ b/examples/reconnection_test/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/oarkflow/mq" +) + +// Simple task handler for testing +func testHandler(ctx context.Context, task *mq.Task) mq.Result { + fmt.Printf("Processing task: %s\n", task.ID) + return mq.Result{ + Status: "SUCCESS", + Payload: []byte(fmt.Sprintf(`{"message": "Processed task %s"}`, task.ID)), + } +} + +func main() { + fmt.Println("=== Consumer Reconnection Test ===") + fmt.Println("This test demonstrates improved reconnection behavior with jitter retry.") + fmt.Println("Start a broker on :8081 and then stop it to see reconnection attempts.") + fmt.Println() + + // Create consumer with custom retry configuration + consumer := mq.NewConsumer( + "test-consumer", + "test-queue", + testHandler, + mq.WithBrokerURL(":8081"), + mq.WithMaxRetries(3), // Limit initial connection retries + mq.WithInitialDelay(1*time.Second), // Start with 1 second delay + mq.WithMaxBackoff(30*time.Second), // Cap at 30 seconds + mq.WithHTTPApi(true), + mq.WithWorkerPool(10, 2, 1000), + ) + + // Start consumer in a goroutine so we can observe the behavior + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + fmt.Println("Starting consumer...") + if err := consumer.Consume(ctx); err != nil { + log.Printf("Consumer stopped with error: %v", err) + } + }() + + // Keep the main thread alive to observe reconnection behavior + fmt.Println("Consumer started. You can now:") + fmt.Println("1. Start a broker on :8081") + fmt.Println("2. Stop the broker to trigger reconnection attempts") + fmt.Println("3. Restart the broker to see successful reconnection") + fmt.Println("4. Press Ctrl+C to exit") + fmt.Println() + fmt.Println("Watch the logs to see the improved reconnection behavior with:") + fmt.Println("- Exponential backoff with jitter") + fmt.Println("- Throttling of rapid reconnection attempts") + fmt.Println("- Long backoff periods after many failed attempts") + fmt.Println("- Reset of retry counters on successful connections") + + // Sleep for a long time to observe behavior + time.Sleep(10 * time.Minute) + + // Graceful shutdown + fmt.Println("Shutting down consumer...") + consumer.Close() + fmt.Println("Test completed.") +} diff --git a/mq.go b/mq.go index 8b21639..3126617 100644 --- a/mq.go +++ b/mq.go @@ -339,9 +339,18 @@ type QueuedTask struct { } type consumer struct { - conn net.Conn - id string - state consts.ConsumerState + conn net.Conn + id string + state consts.ConsumerState + queue string + pool *Pool + metrics *ConsumerMetrics +} + +type ConsumerMetrics struct { + ProcessedTasks int64 + ErrorCount int64 + LastActivity time.Time } type publisher struct { @@ -818,13 +827,41 @@ func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Con if !ok { q = b.NewQueue(queueName) } - con := &consumer{id: consumerID, conn: conn} + + // Create consumer with proper initialization + con := &consumer{ + id: consumerID, + conn: conn, + state: consts.ConsumerStateActive, + queue: queueName, + pool: nil, // Pool will be set when consumer connects + metrics: &ConsumerMetrics{ + ProcessedTasks: 0, + ErrorCount: 0, + LastActivity: time.Now(), + }, + } + b.consumers.Set(consumerID, con) q.consumers.Set(consumerID, con) log.Printf("BROKER - SUBSCRIBE ~> %s on %s", consumerID, queueName) return consumerID } +func (b *Broker) UpdateConsumerPool(consumerID string, pool *Pool) { + if con, exists := b.consumers.Get(consumerID); exists { + con.pool = pool + } +} + +func (b *Broker) UpdateConsumerMetrics(consumerID string, processedTasks, errorCount int64) { + if con, exists := b.consumers.Get(consumerID); exists && con.metrics != nil { + con.metrics.ProcessedTasks = processedTasks + con.metrics.ErrorCount = errorCount + con.metrics.LastActivity = time.Now() + } +} + func (b *Broker) RemoveConsumer(consumerID string, queues ...string) { if len(queues) > 0 { for _, queueName := range queues { @@ -1709,3 +1746,94 @@ func (cb *EnhancedCircuitBreaker) Call(fn func() error) error { type InMemoryMessageStore struct { messages storage.IMap[string, *StoredMessage] } + +func (b *Broker) GetConsumers() []*AdminConsumerMetrics { + consumers := []*AdminConsumerMetrics{} + b.consumers.ForEach(func(id string, con *consumer) bool { + // Get status based on consumer state + status := "active" + switch con.state { + case consts.ConsumerStateActive: + status = "active" + case consts.ConsumerStatePaused: + status = "paused" + case consts.ConsumerStateStopped: + status = "stopped" + } + + // Handle cases where pool might be nil + maxConcurrentTasks := 0 + taskTimeout := 0 + maxRetries := 0 + + if con.pool != nil { + config := con.pool.GetCurrentConfig() + maxConcurrentTasks = config.NumberOfWorkers + taskTimeout = int(config.Timeout.Seconds()) + maxRetries = config.MaxRetries + } + + // Ensure metrics is not nil + processedTasks := int64(0) + errorCount := int64(0) + lastActivity := time.Now() + + if con.metrics != nil { + processedTasks = con.metrics.ProcessedTasks + errorCount = con.metrics.ErrorCount + lastActivity = con.metrics.LastActivity + } + + consumers = append(consumers, &AdminConsumerMetrics{ + ID: id, + Queue: con.queue, + Status: status, + ProcessedTasks: processedTasks, + ErrorCount: errorCount, + LastActivity: lastActivity, + MaxConcurrentTasks: maxConcurrentTasks, + TaskTimeout: taskTimeout, + MaxRetries: maxRetries, + }) + return true + }) + return consumers +} + +func (b *Broker) GetPools() []*AdminPoolMetrics { + pools := []*AdminPoolMetrics{} + b.queues.ForEach(func(name string, queue *Queue) bool { + // Initialize default values + workers := 0 + queueSize := 0 + activeTasks := 0 + maxMemoryLoad := int64(0) + lastActivity := time.Now() + + // Get metrics from queue if available + if queue.metrics != nil { + workers = queue.metrics.WorkerCount + queueSize = queue.metrics.QueueDepth + activeTasks = queue.metrics.ActiveTasks + maxMemoryLoad = queue.metrics.MaxMemoryLoad + lastActivity = queue.metrics.LastActivity + } + + // If metrics are empty, try to get some basic info from the queue + if queueSize == 0 && queue.tasks != nil { + queueSize = len(queue.tasks) + } + + pools = append(pools, &AdminPoolMetrics{ + ID: name, + Workers: workers, + QueueSize: queueSize, + ActiveTasks: activeTasks, + Status: "running", // Default status + MaxMemoryLoad: maxMemoryLoad, + LastActivity: lastActivity, + }) + return true + }) + return pools +} diff --git a/task.go b/task.go index 9cf544c..8b67851 100644 --- a/task.go +++ b/task.go @@ -31,6 +31,10 @@ type QueueMetrics struct { CurrentDepth int64 `json:"current_depth"` AverageLatency time.Duration `json:"average_latency"` LastActivity time.Time `json:"last_activity"` + WorkerCount int `json:"worker_count"` + QueueDepth int `json:"queue_depth"` + ActiveTasks int `json:"active_tasks"` + MaxMemoryLoad int64 `json:"max_memory_load"` } func newQueue(name string, queueSize int) *Queue {