update: use oarkflow/json

This commit is contained in:
Oarkflow
2025-03-29 15:27:50 +05:45
parent abc07e9360
commit bcf5fff33f
5 changed files with 26 additions and 36 deletions

View File

@@ -45,7 +45,7 @@ func main() {
metrics := pool.Metrics() metrics := pool.Metrics()
v1.Logger.Info().Msgf("Metrics: %+v", metrics) v1.Logger.Info().Msgf("Metrics: %+v", metrics)
pool.Stop() pool.Stop()
v1.Logger.Info().Msgf("Dead Letter Queue has %d tasks", len(pool.DLQ.Task())) v1.Logger.Info().Msgf("Dead Letter Queue has %d tasks", len(pool.DLQ().Tasks()))
}() }()
go func() { go func() {

View File

@@ -13,10 +13,12 @@ func main() {
Payload: payload, Payload: payload,
} }
publisher := mq.NewPublisher("publish-1") publisher := mq.NewPublisher("publish-1")
// publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key")) for i := 0; i < 10000000; i++ {
err := publisher.Publish(context.Background(), task, "queue") // publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"))
if err != nil { err := publisher.Publish(context.Background(), task, "queue1")
panic(err) if err != nil {
panic(err)
}
} }
fmt.Println("Async task published successfully") fmt.Println("Async task published successfully")
} }

32
pool.go
View File

@@ -55,7 +55,7 @@ func NewDeadLetterQueue() *DeadLetterQueue {
} }
} }
func (dlq *DeadLetterQueue) Task() []*QueueTask { func (dlq *DeadLetterQueue) Tasks() []*QueueTask {
return dlq.tasks return dlq.tasks
} }
@@ -138,7 +138,7 @@ type Pool struct {
completionCallback CompletionCallback completionCallback CompletionCallback
taskAvailableCond *sync.Cond taskAvailableCond *sync.Cond
callback Callback callback Callback
DLQ *DeadLetterQueue dlq *DeadLetterQueue
taskQueue PriorityQueue taskQueue PriorityQueue
overflowBuffer []*QueueTask overflowBuffer []*QueueTask
metrics Metrics metrics Metrics
@@ -164,6 +164,7 @@ type Pool struct {
circuitBreakerFailureCount int32 circuitBreakerFailureCount int32
gracefulShutdownTimeout time.Duration gracefulShutdownTimeout time.Duration
plugins []Plugin plugins []Plugin
port int
} }
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
@@ -176,7 +177,8 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
backoffDuration: Config.BackoffDuration, backoffDuration: Config.BackoffDuration,
maxRetries: Config.MaxRetries, maxRetries: Config.MaxRetries,
logger: Logger, logger: Logger,
DLQ: NewDeadLetterQueue(), port: 1234,
dlq: NewDeadLetterQueue(),
metricsRegistry: NewInMemoryMetricsRegistry(), metricsRegistry: NewInMemoryMetricsRegistry(),
diagnosticsEnabled: true, diagnosticsEnabled: true,
gracefulShutdownTimeout: 10 * time.Second, gracefulShutdownTimeout: 10 * time.Second,
@@ -200,7 +202,7 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
func validateDynamicConfig(c *DynamicConfig) error { func validateDynamicConfig(c *DynamicConfig) error {
if c.Timeout <= 0 { if c.Timeout <= 0 {
return errors.New("Timeout must be positive") return errors.New("timeout must be positive")
} }
if c.BatchSize <= 0 { if c.BatchSize <= 0 {
return errors.New("BatchSize must be > 0") return errors.New("BatchSize must be > 0")
@@ -259,6 +261,10 @@ func (wp *Pool) Start(numWorkers int) {
go wp.monitorIdleWorkers() go wp.monitorIdleWorkers()
} }
func (wp *Pool) DLQ() *DeadLetterQueue {
return wp.dlq
}
func (wp *Pool) worker() { func (wp *Pool) worker() {
defer wp.wg.Done() defer wp.wg.Done()
for { for {
@@ -299,10 +305,6 @@ func (wp *Pool) processNextBatch() {
} }
func (wp *Pool) handleTask(task *QueueTask) { func (wp *Pool) handleTask(task *QueueTask) {
if err := validateTaskInput(task.payload); err != nil {
wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Validation failed: %v", err)
return
}
ctx, cancel := context.WithTimeout(task.ctx, wp.timeout) ctx, cancel := context.WithTimeout(task.ctx, wp.timeout)
defer cancel() defer cancel()
taskSize := int64(utils.SizeOf(task.payload)) taskSize := int64(utils.SizeOf(task.payload))
@@ -312,9 +314,7 @@ func (wp *Pool) handleTask(task *QueueTask) {
result := wp.handler(ctx, task.payload) result := wp.handler(ctx, task.payload)
executionTime := time.Since(startTime).Milliseconds() executionTime := time.Since(startTime).Milliseconds()
atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime) atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime)
if wp.thresholds.LongExecution > 0 && executionTime > wp.thresholds.LongExecution.Milliseconds() {
// Warning thresholds check
if wp.thresholds.LongExecution > 0 && executionTime > int64(wp.thresholds.LongExecution.Milliseconds()) {
wp.logger.Warn().Str("taskID", task.payload.ID).Msgf("Exceeded execution time threshold: %d ms", executionTime) wp.logger.Warn().Str("taskID", task.payload.ID).Msgf("Exceeded execution time threshold: %d ms", executionTime)
} }
if wp.thresholds.HighMemory > 0 && taskSize > wp.thresholds.HighMemory { if wp.thresholds.HighMemory > 0 && taskSize > wp.thresholds.HighMemory {
@@ -325,8 +325,6 @@ func (wp *Pool) handleTask(task *QueueTask) {
atomic.AddInt64(&wp.metrics.ErrorCount, 1) atomic.AddInt64(&wp.metrics.ErrorCount, 1)
wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Error processing task: %v", result.Error) wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Error processing task: %v", result.Error)
wp.backoffAndStore(task) wp.backoffAndStore(task)
// Circuit breaker check
if wp.circuitBreaker.Enabled { if wp.circuitBreaker.Enabled {
newCount := atomic.AddInt32(&wp.circuitBreakerFailureCount, 1) newCount := atomic.AddInt32(&wp.circuitBreakerFailureCount, 1)
if newCount >= int32(wp.circuitBreaker.FailureThreshold) { if newCount >= int32(wp.circuitBreaker.FailureThreshold) {
@@ -375,7 +373,7 @@ func (wp *Pool) backoffAndStore(task *QueueTask) {
time.Sleep(sleepDuration) time.Sleep(sleepDuration)
} else { } else {
wp.logger.Error().Str("taskID", task.payload.ID).Msg("Task failed after maximum retries") wp.logger.Error().Str("taskID", task.payload.ID).Msg("Task failed after maximum retries")
wp.DLQ.Add(task) wp.dlq.Add(task)
} }
} }
@@ -431,13 +429,9 @@ func (wp *Pool) adjustWorkers(newWorkerCount int) {
} }
func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error { func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error {
// Check circuit breaker state
if wp.circuitBreaker.Enabled && wp.circuitBreakerOpen { if wp.circuitBreaker.Enabled && wp.circuitBreakerOpen {
return fmt.Errorf("circuit breaker open, task rejected") return fmt.Errorf("circuit breaker open, task rejected")
} }
if err := validateTaskInput(payload); err != nil {
return fmt.Errorf("invalid task input: %w", err)
}
if payload.ID == "" { if payload.ID == "" {
payload.ID = NewID() payload.ID = NewID()
} }
@@ -572,7 +566,7 @@ func (wp *Pool) startHealthServer() {
if wp.gracefulShutdown { if wp.gracefulShutdown {
status = "shutting down" status = "shutting down"
} }
fmt.Fprintf(w, "status: %s\nworkers: %d\nqueueLength: %d\n", _, _ = fmt.Fprintf(w, "status: %s\nworkers: %d\nqueueLength: %d\n",
status, atomic.LoadInt32(&wp.numOfWorkers), len(wp.taskQueue)) status, atomic.LoadInt32(&wp.numOfWorkers), len(wp.taskQueue))
}) })
server := &http.Server{ server := &http.Server{

View File

@@ -54,6 +54,12 @@ func WithBatchSize(batchSize int) PoolOption {
} }
} }
func WithHealthServicePort(port int) PoolOption {
return func(p *Pool) {
p.port = port
}
}
func WithHandler(handler Handler) PoolOption { func WithHandler(handler Handler) PoolOption {
return func(p *Pool) { return func(p *Pool) {
p.handler = handler p.handler = handler

View File

@@ -2,7 +2,6 @@ package mq
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@@ -448,21 +447,10 @@ func nextWeekday(t time.Time, weekday time.Weekday) time.Time {
} }
return t.AddDate(0, 0, daysUntil) return t.AddDate(0, 0, daysUntil)
} }
func validateTaskInput(task *Task) error {
if task.Payload == nil {
return errors.New("task payload cannot be nil")
}
Logger.Info().Str("taskID", task.ID).Msg("Task validated")
return nil
}
func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) { func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if err := validateTaskInput(payload); err != nil {
Logger.Error().Err(err).Msg("Invalid task input")
return
}
options := defaultSchedulerOptions() options := defaultSchedulerOptions()
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(options)