From bcf5fff33f1de6dc7895ff7f6cb08a3b8f91a095 Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Sat, 29 Mar 2025 15:27:50 +0545 Subject: [PATCH] update: use `oarkflow/json` --- examples/pool.go | 2 +- examples/publisher.go | 10 ++++++---- pool.go | 32 +++++++++++++------------------- pool_options.go | 6 ++++++ scheduler.go | 12 ------------ 5 files changed, 26 insertions(+), 36 deletions(-) diff --git a/examples/pool.go b/examples/pool.go index cee6ab9..33a3cec 100644 --- a/examples/pool.go +++ b/examples/pool.go @@ -45,7 +45,7 @@ func main() { metrics := pool.Metrics() v1.Logger.Info().Msgf("Metrics: %+v", metrics) pool.Stop() - v1.Logger.Info().Msgf("Dead Letter Queue has %d tasks", len(pool.DLQ.Task())) + v1.Logger.Info().Msgf("Dead Letter Queue has %d tasks", len(pool.DLQ().Tasks())) }() go func() { diff --git a/examples/publisher.go b/examples/publisher.go index 58a1b63..749f2e4 100644 --- a/examples/publisher.go +++ b/examples/publisher.go @@ -13,10 +13,12 @@ func main() { Payload: payload, } publisher := mq.NewPublisher("publish-1") - // publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key")) - err := publisher.Publish(context.Background(), task, "queue") - if err != nil { - panic(err) + for i := 0; i < 10000000; i++ { + // publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key")) + err := publisher.Publish(context.Background(), task, "queue1") + if err != nil { + panic(err) + } } fmt.Println("Async task published successfully") } diff --git a/pool.go b/pool.go index 2706e79..c9ab92d 100644 --- a/pool.go +++ b/pool.go @@ -55,7 +55,7 @@ func NewDeadLetterQueue() *DeadLetterQueue { } } -func (dlq *DeadLetterQueue) Task() []*QueueTask { +func (dlq *DeadLetterQueue) Tasks() []*QueueTask { return dlq.tasks } @@ -138,7 +138,7 @@ type Pool struct { completionCallback CompletionCallback taskAvailableCond *sync.Cond callback Callback - DLQ *DeadLetterQueue + dlq *DeadLetterQueue taskQueue PriorityQueue overflowBuffer []*QueueTask metrics Metrics @@ -164,6 +164,7 @@ type Pool struct { circuitBreakerFailureCount int32 gracefulShutdownTimeout time.Duration plugins []Plugin + port int } func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { @@ -176,7 +177,8 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { backoffDuration: Config.BackoffDuration, maxRetries: Config.MaxRetries, logger: Logger, - DLQ: NewDeadLetterQueue(), + port: 1234, + dlq: NewDeadLetterQueue(), metricsRegistry: NewInMemoryMetricsRegistry(), diagnosticsEnabled: true, gracefulShutdownTimeout: 10 * time.Second, @@ -200,7 +202,7 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { func validateDynamicConfig(c *DynamicConfig) error { if c.Timeout <= 0 { - return errors.New("Timeout must be positive") + return errors.New("timeout must be positive") } if c.BatchSize <= 0 { return errors.New("BatchSize must be > 0") @@ -259,6 +261,10 @@ func (wp *Pool) Start(numWorkers int) { go wp.monitorIdleWorkers() } +func (wp *Pool) DLQ() *DeadLetterQueue { + return wp.dlq +} + func (wp *Pool) worker() { defer wp.wg.Done() for { @@ -299,10 +305,6 @@ func (wp *Pool) processNextBatch() { } func (wp *Pool) handleTask(task *QueueTask) { - if err := validateTaskInput(task.payload); err != nil { - wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Validation failed: %v", err) - return - } ctx, cancel := context.WithTimeout(task.ctx, wp.timeout) defer cancel() taskSize := int64(utils.SizeOf(task.payload)) @@ -312,9 +314,7 @@ func (wp *Pool) handleTask(task *QueueTask) { result := wp.handler(ctx, task.payload) executionTime := time.Since(startTime).Milliseconds() atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime) - - // Warning thresholds check - if wp.thresholds.LongExecution > 0 && executionTime > int64(wp.thresholds.LongExecution.Milliseconds()) { + if wp.thresholds.LongExecution > 0 && executionTime > wp.thresholds.LongExecution.Milliseconds() { wp.logger.Warn().Str("taskID", task.payload.ID).Msgf("Exceeded execution time threshold: %d ms", executionTime) } if wp.thresholds.HighMemory > 0 && taskSize > wp.thresholds.HighMemory { @@ -325,8 +325,6 @@ func (wp *Pool) handleTask(task *QueueTask) { atomic.AddInt64(&wp.metrics.ErrorCount, 1) wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Error processing task: %v", result.Error) wp.backoffAndStore(task) - - // Circuit breaker check if wp.circuitBreaker.Enabled { newCount := atomic.AddInt32(&wp.circuitBreakerFailureCount, 1) if newCount >= int32(wp.circuitBreaker.FailureThreshold) { @@ -375,7 +373,7 @@ func (wp *Pool) backoffAndStore(task *QueueTask) { time.Sleep(sleepDuration) } else { wp.logger.Error().Str("taskID", task.payload.ID).Msg("Task failed after maximum retries") - wp.DLQ.Add(task) + wp.dlq.Add(task) } } @@ -431,13 +429,9 @@ func (wp *Pool) adjustWorkers(newWorkerCount int) { } func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error { - // Check circuit breaker state if wp.circuitBreaker.Enabled && wp.circuitBreakerOpen { return fmt.Errorf("circuit breaker open, task rejected") } - if err := validateTaskInput(payload); err != nil { - return fmt.Errorf("invalid task input: %w", err) - } if payload.ID == "" { payload.ID = NewID() } @@ -572,7 +566,7 @@ func (wp *Pool) startHealthServer() { if wp.gracefulShutdown { status = "shutting down" } - fmt.Fprintf(w, "status: %s\nworkers: %d\nqueueLength: %d\n", + _, _ = fmt.Fprintf(w, "status: %s\nworkers: %d\nqueueLength: %d\n", status, atomic.LoadInt32(&wp.numOfWorkers), len(wp.taskQueue)) }) server := &http.Server{ diff --git a/pool_options.go b/pool_options.go index 9921783..6b4a10d 100644 --- a/pool_options.go +++ b/pool_options.go @@ -54,6 +54,12 @@ func WithBatchSize(batchSize int) PoolOption { } } +func WithHealthServicePort(port int) PoolOption { + return func(p *Pool) { + p.port = port + } +} + func WithHandler(handler Handler) PoolOption { return func(p *Pool) { p.handler = handler diff --git a/scheduler.go b/scheduler.go index 550b254..75dc1ab 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,7 +2,6 @@ package mq import ( "context" - "errors" "fmt" "strconv" "strings" @@ -448,21 +447,10 @@ func nextWeekday(t time.Time, weekday time.Weekday) time.Time { } return t.AddDate(0, 0, daysUntil) } -func validateTaskInput(task *Task) error { - if task.Payload == nil { - return errors.New("task payload cannot be nil") - } - Logger.Info().Str("taskID", task.ID).Msg("Task validated") - return nil -} func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) { s.mu.Lock() defer s.mu.Unlock() - if err := validateTaskInput(payload); err != nil { - Logger.Error().Err(err).Msg("Invalid task input") - return - } options := defaultSchedulerOptions() for _, opt := range opts { opt(options)