diff --git a/examples/pool.go b/examples/pool.go index ca06803..cee6ab9 100644 --- a/examples/pool.go +++ b/examples/pool.go @@ -8,14 +8,16 @@ import ( "syscall" "time" - v1 "github.com/oarkflow/mq/v1" + "github.com/oarkflow/json" + + v1 "github.com/oarkflow/mq" ) func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() pool := v1.NewPool(5, - v1.WithTaskStorage(v1.NewInMemoryTaskStorage()), + v1.WithTaskStorage(v1.NewMemoryTaskStorage(10*time.Minute)), v1.WithHandler(func(ctx context.Context, payload *v1.Task) v1.Result { v1.Logger.Info().Str("taskID", payload.ID).Msg("Processing task payload") time.Sleep(500 * time.Millisecond) @@ -43,14 +45,14 @@ func main() { metrics := pool.Metrics() v1.Logger.Info().Msgf("Metrics: %+v", metrics) pool.Stop() - v1.Logger.Info().Msgf("Dead Letter Queue has %d tasks", len(v1.DLQ.Task())) + v1.Logger.Info().Msgf("Dead Letter Queue has %d tasks", len(pool.DLQ.Task())) }() go func() { for i := 0; i < 50; i++ { task := &v1.Task{ ID: "", - Payload: fmt.Sprintf("Task Payload %d", i), + Payload: json.RawMessage(fmt.Sprintf("Task Payload %d", i)), } if err := pool.EnqueueTask(context.Background(), task, rand.Intn(10)); err != nil { v1.Logger.Error().Err(err).Msg("Failed to enqueue task") diff --git a/pool.go b/pool.go index cf5c0fd..2706e79 100644 --- a/pool.go +++ b/pool.go @@ -3,14 +3,17 @@ package mq import ( "container/heap" "context" + "errors" "fmt" - "log" "math/rand" + "net/http" "sync" "sync/atomic" "time" "github.com/oarkflow/mq/utils" + + "github.com/oarkflow/log" ) type Callback func(ctx context.Context, result Result) error @@ -25,35 +28,134 @@ type Metrics struct { ExecutionTime int64 } -type Pool struct { - taskStorage TaskStorage - scheduler *Scheduler - stop chan struct{} - taskNotify chan struct{} - workerAdjust chan int - handler Handler - completionCallback CompletionCallback - taskAvailableCond *sync.Cond - callback Callback - taskQueue PriorityQueue - overflowBuffer []*QueueTask - metrics Metrics - wg sync.WaitGroup - taskCompletionNotifier sync.WaitGroup - timeout time.Duration - batchSize int - maxMemoryLoad int64 - idleTimeout time.Duration - backoffDuration time.Duration - maxRetries int - overflowBufferLock sync.RWMutex - taskQueueLock sync.Mutex - numOfWorkers int32 - paused bool - logger *log.Logger - gracefulShutdown bool +type Plugin interface { + Initialize(config interface{}) error + BeforeTask(task *QueueTask) + AfterTask(task *QueueTask, result Result) +} - // New fields for enhancements: +type DefaultPlugin struct{} + +func (dp *DefaultPlugin) Initialize(config interface{}) error { return nil } +func (dp *DefaultPlugin) BeforeTask(task *QueueTask) { + Logger.Info().Str("taskID", task.payload.ID).Msg("BeforeTask plugin invoked") +} +func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result) { + Logger.Info().Str("taskID", task.payload.ID).Msg("AfterTask plugin invoked") +} + +type DeadLetterQueue struct { + tasks []*QueueTask + mu sync.Mutex +} + +func NewDeadLetterQueue() *DeadLetterQueue { + return &DeadLetterQueue{ + tasks: make([]*QueueTask, 0), + } +} + +func (dlq *DeadLetterQueue) Task() []*QueueTask { + return dlq.tasks +} + +func (dlq *DeadLetterQueue) Add(task *QueueTask) { + dlq.mu.Lock() + defer dlq.mu.Unlock() + dlq.tasks = append(dlq.tasks, task) + Logger.Warn().Str("taskID", task.payload.ID).Msg("Task added to Dead Letter Queue") +} + +type InMemoryMetricsRegistry struct { + metrics map[string]int64 + mu sync.RWMutex +} + +func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry { + return &InMemoryMetricsRegistry{ + metrics: make(map[string]int64), + } +} + +func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{}) { + m.mu.Lock() + defer m.mu.Unlock() + if v, ok := value.(int64); ok { + m.metrics[metricName] = v + Logger.Info().Str("metric", metricName).Msgf("Registered metric: %d", v) + } +} + +func (m *InMemoryMetricsRegistry) Increment(metricName string) { + m.mu.Lock() + defer m.mu.Unlock() + m.metrics[metricName]++ +} + +func (m *InMemoryMetricsRegistry) Get(metricName string) interface{} { + m.mu.RLock() + defer m.mu.RUnlock() + return m.metrics[metricName] +} + +type WarningThresholds struct { + HighMemory int64 + LongExecution time.Duration +} + +type DynamicConfig struct { + Timeout time.Duration + BatchSize int + MaxMemoryLoad int64 + IdleTimeout time.Duration + BackoffDuration time.Duration + MaxRetries int + ReloadInterval time.Duration + WarningThreshold WarningThresholds +} + +var Config = &DynamicConfig{ + Timeout: 10 * time.Second, + BatchSize: 1, + MaxMemoryLoad: 100 * 1024 * 1024, + IdleTimeout: 5 * time.Minute, + BackoffDuration: 2 * time.Second, + MaxRetries: 3, + ReloadInterval: 30 * time.Second, + WarningThreshold: WarningThresholds{ + HighMemory: 1 * 1024 * 1024, + LongExecution: 2 * time.Second, + }, +} + +type Pool struct { + taskStorage TaskStorage + scheduler *Scheduler + stop chan struct{} + taskNotify chan struct{} + workerAdjust chan int + handler Handler + completionCallback CompletionCallback + taskAvailableCond *sync.Cond + callback Callback + DLQ *DeadLetterQueue + taskQueue PriorityQueue + overflowBuffer []*QueueTask + metrics Metrics + wg sync.WaitGroup + taskCompletionNotifier sync.WaitGroup + timeout time.Duration + batchSize int + maxMemoryLoad int64 + idleTimeout time.Duration + backoffDuration time.Duration + maxRetries int + overflowBufferLock sync.RWMutex + taskQueueLock sync.Mutex + numOfWorkers int32 + paused bool + logger log.Logger + gracefulShutdown bool thresholds ThresholdConfig diagnosticsEnabled bool metricsRegistry MetricsRegistry @@ -61,33 +163,83 @@ type Pool struct { circuitBreakerOpen bool circuitBreakerFailureCount int32 gracefulShutdownTimeout time.Duration + plugins []Plugin } func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { pool := &Pool{ - stop: make(chan struct{}), - taskNotify: make(chan struct{}, numOfWorkers), - batchSize: 1, - timeout: 10 * time.Second, - idleTimeout: 5 * time.Minute, - backoffDuration: 2 * time.Second, - maxRetries: 3, // Set max retries for failed tasks - logger: log.Default(), + stop: make(chan struct{}), + taskNotify: make(chan struct{}, numOfWorkers), + batchSize: Config.BatchSize, + timeout: Config.Timeout, + idleTimeout: Config.IdleTimeout, + backoffDuration: Config.BackoffDuration, + maxRetries: Config.MaxRetries, + logger: Logger, + DLQ: NewDeadLetterQueue(), + metricsRegistry: NewInMemoryMetricsRegistry(), + diagnosticsEnabled: true, + gracefulShutdownTimeout: 10 * time.Second, } pool.scheduler = NewScheduler(pool) pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) for _, opt := range opts { opt(pool) } - if len(pool.taskQueue) == 0 { + if pool.taskQueue == nil { pool.taskQueue = make(PriorityQueue, 0, 10) } heap.Init(&pool.taskQueue) pool.scheduler.Start() pool.Start(numOfWorkers) + startConfigReloader(pool) + go pool.dynamicWorkerScaler() + go pool.startHealthServer() return pool } +func validateDynamicConfig(c *DynamicConfig) error { + if c.Timeout <= 0 { + return errors.New("Timeout must be positive") + } + if c.BatchSize <= 0 { + return errors.New("BatchSize must be > 0") + } + if c.MaxMemoryLoad <= 0 { + return errors.New("MaxMemoryLoad must be > 0") + } + return nil +} + +func startConfigReloader(pool *Pool) { + go func() { + ticker := time.NewTicker(Config.ReloadInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := validateDynamicConfig(Config); err != nil { + Logger.Error().Err(err).Msg("Invalid dynamic config, skipping reload") + continue + } + pool.timeout = Config.Timeout + pool.batchSize = Config.BatchSize + pool.maxMemoryLoad = Config.MaxMemoryLoad + pool.idleTimeout = Config.IdleTimeout + pool.backoffDuration = Config.BackoffDuration + pool.maxRetries = Config.MaxRetries + pool.thresholds = ThresholdConfig{ + HighMemory: Config.WarningThreshold.HighMemory, + LongExecution: Config.WarningThreshold.LongExecution, + } + Logger.Info().Msg("Dynamic configuration reloaded") + case <-pool.stop: + return + } + } + }() +} + func (wp *Pool) Start(numWorkers int) { storedTasks, err := wp.taskStorage.GetAllTasks() if err == nil { @@ -144,13 +296,13 @@ func (wp *Pool) processNextBatch() { wp.handleTask(task) } } - // @TODO - Why was this done? - //if len(tasks) > 0 { - // wp.taskCompletionNotifier.Done() - //} } func (wp *Pool) handleTask(task *QueueTask) { + if err := validateTaskInput(task.payload); err != nil { + wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Validation failed: %v", err) + return + } ctx, cancel := context.WithTimeout(task.ctx, wp.timeout) defer cancel() taskSize := int64(utils.SizeOf(task.payload)) @@ -163,15 +315,15 @@ func (wp *Pool) handleTask(task *QueueTask) { // Warning thresholds check if wp.thresholds.LongExecution > 0 && executionTime > int64(wp.thresholds.LongExecution.Milliseconds()) { - wp.logger.Printf("Warning: Task %s exceeded execution time threshold: %d ms", task.payload.ID, executionTime) + wp.logger.Warn().Str("taskID", task.payload.ID).Msgf("Exceeded execution time threshold: %d ms", executionTime) } if wp.thresholds.HighMemory > 0 && taskSize > wp.thresholds.HighMemory { - wp.logger.Printf("Warning: Task %s memory usage %d exceeded threshold", task.payload.ID, taskSize) + wp.logger.Warn().Str("taskID", task.payload.ID).Msgf("Memory usage %d exceeded threshold", taskSize) } if result.Error != nil { atomic.AddInt64(&wp.metrics.ErrorCount, 1) - wp.logger.Printf("Error processing task %s: %v", task.payload.ID, result.Error) + wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Error processing task: %v", result.Error) wp.backoffAndStore(task) // Circuit breaker check @@ -179,12 +331,12 @@ func (wp *Pool) handleTask(task *QueueTask) { newCount := atomic.AddInt32(&wp.circuitBreakerFailureCount, 1) if newCount >= int32(wp.circuitBreaker.FailureThreshold) { wp.circuitBreakerOpen = true - wp.logger.Println("Circuit breaker opened due to errors") + wp.logger.Warn().Msg("Circuit breaker opened due to errors") go func() { time.Sleep(wp.circuitBreaker.ResetTimeout) atomic.StoreInt32(&wp.circuitBreakerFailureCount, 0) wp.circuitBreakerOpen = false - wp.logger.Println("Circuit breaker reset") + wp.logger.Info().Msg("Circuit breaker reset to closed state") }() } } @@ -198,17 +350,17 @@ func (wp *Pool) handleTask(task *QueueTask) { // Diagnostics logging if enabled if wp.diagnosticsEnabled { - wp.logger.Printf("Task %s executed in %d ms", task.payload.ID, executionTime) + wp.logger.Info().Str("taskID", task.payload.ID).Msgf("Task executed in %d ms", executionTime) } - if wp.callback != nil { if err := wp.callback(ctx, result); err != nil { atomic.AddInt64(&wp.metrics.ErrorCount, 1) - wp.logger.Printf("Error in callback for task %s: %v", task.payload.ID, err) + wp.logger.Error().Str("taskID", task.payload.ID).Msgf("Callback error: %v", err) } } _ = wp.taskStorage.DeleteTask(task.payload.ID) atomic.AddInt64(&wp.metrics.TotalMemoryUsed, -taskSize) + wp.metricsRegistry.Register("task_execution_time", executionTime) } func (wp *Pool) backoffAndStore(task *QueueTask) { @@ -219,10 +371,11 @@ func (wp *Pool) backoffAndStore(task *QueueTask) { backoff := wp.backoffDuration * (1 << (task.retryCount - 1)) jitter := time.Duration(rand.Int63n(int64(backoff) / 2)) sleepDuration := backoff + jitter - wp.logger.Printf("Task %s retry %d: sleeping for %s", task.payload.ID, task.retryCount, sleepDuration) + wp.logger.Info().Str("taskID", task.payload.ID).Msgf("Retry %d: sleeping for %s", task.retryCount, sleepDuration) time.Sleep(sleepDuration) } else { - wp.logger.Printf("Task %s failed after maximum retries", task.payload.ID) + wp.logger.Error().Str("taskID", task.payload.ID).Msg("Task failed after maximum retries") + wp.DLQ.Add(task) } } @@ -282,7 +435,9 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er if wp.circuitBreaker.Enabled && wp.circuitBreakerOpen { return fmt.Errorf("circuit breaker open, task rejected") } - + if err := validateTaskInput(payload); err != nil { + return fmt.Errorf("invalid task input: %w", err) + } if payload.ID == "" { payload.ID = NewID() } @@ -344,9 +499,8 @@ func (wp *Pool) startOverflowDrainer() { func (wp *Pool) drainOverflowBuffer() { wp.overflowBufferLock.Lock() overflowTasks := wp.overflowBuffer - wp.overflowBuffer = nil // Clear buffer + wp.overflowBuffer = nil wp.overflowBufferLock.Unlock() - for _, task := range overflowTasks { select { case wp.taskNotify <- struct{}{}: @@ -363,8 +517,6 @@ func (wp *Pool) Stop() { wp.gracefulShutdown = true wp.Pause() close(wp.stop) - - // Graceful shutdown with timeout support done := make(chan struct{}) go func() { wp.wg.Wait() @@ -373,9 +525,8 @@ func (wp *Pool) Stop() { }() select { case <-done: - // All workers finished gracefully. case <-time.After(wp.gracefulShutdownTimeout): - wp.logger.Println("Graceful shutdown timeout reached") + wp.logger.Warn().Msg("Graceful shutdown timeout reached") } if wp.completionCallback != nil { wp.completionCallback() @@ -395,3 +546,54 @@ func (wp *Pool) Metrics() Metrics { } func (wp *Pool) Scheduler() *Scheduler { return wp.scheduler } + +func (wp *Pool) dynamicWorkerScaler() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + wp.taskQueueLock.Lock() + queueLen := len(wp.taskQueue) + wp.taskQueueLock.Unlock() + newWorkers := queueLen/5 + 1 + wp.logger.Info().Msgf("Auto-scaling: queue length %d, adjusting workers to %d", queueLen, newWorkers) + wp.AdjustWorkerCount(newWorkers) + case <-wp.stop: + return + } + } +} + +func (wp *Pool) startHealthServer() { + mux := http.NewServeMux() + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + status := "OK" + if wp.gracefulShutdown { + status = "shutting down" + } + fmt.Fprintf(w, "status: %s\nworkers: %d\nqueueLength: %d\n", + status, atomic.LoadInt32(&wp.numOfWorkers), len(wp.taskQueue)) + }) + server := &http.Server{ + Addr: ":8080", + Handler: mux, + } + go func() { + wp.logger.Info().Msg("Starting health server on :8080") + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + wp.logger.Error().Err(err).Msg("Health server failed") + } + }() + + go func() { + <-wp.stop + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil { + wp.logger.Error().Err(err).Msg("Health server shutdown failed") + } else { + wp.logger.Info().Msg("Health server shutdown gracefully") + } + }() +} diff --git a/pool_options.go b/pool_options.go index 032092f..9921783 100644 --- a/pool_options.go +++ b/pool_options.go @@ -6,13 +6,14 @@ import ( // New type definitions for enhancements type ThresholdConfig struct { - HighMemory int64 // e.g. in bytes - LongExecution time.Duration // e.g. warning if task execution exceeds + HighMemory int64 + LongExecution time.Duration } type MetricsRegistry interface { Register(metricName string, value interface{}) - // ...other methods as needed... + Increment(metricName string) + Get(metricName string) interface{} } type CircuitBreakerConfig struct { @@ -25,7 +26,6 @@ type PoolOption func(*Pool) func WithTaskQueueSize(size int) PoolOption { return func(p *Pool) { - // Initialize the task queue with the specified size p.taskQueue = make(PriorityQueue, 0, size) } } @@ -72,8 +72,6 @@ func WithTaskStorage(storage TaskStorage) PoolOption { } } -// New option functions: - func WithWarningThresholds(thresholds ThresholdConfig) PoolOption { return func(p *Pool) { p.thresholds = thresholds @@ -103,3 +101,9 @@ func WithGracefulShutdown(timeout time.Duration) PoolOption { p.gracefulShutdownTimeout = timeout } } + +func WithPlugin(plugin Plugin) PoolOption { + return func(p *Pool) { + p.plugins = append(p.plugins, plugin) + } +} diff --git a/publisher.go b/publisher.go index 2fc6bd5..2631a1e 100644 --- a/publisher.go +++ b/publisher.go @@ -10,6 +10,7 @@ import ( "github.com/oarkflow/json" "github.com/oarkflow/json/jsonparser" + "github.com/oarkflow/mq/codec" "github.com/oarkflow/mq/consts" ) @@ -77,16 +78,18 @@ func (p *Publisher) Publish(ctx context.Context, task Task, queue string) error if err != nil { return fmt.Errorf("failed to connect to broker: %w", err) } - defer conn.Close() + defer func() { + _ = conn.Close() + }() return p.send(ctx, queue, task, conn, consts.PUBLISH) } -func (p *Publisher) onClose(ctx context.Context, conn net.Conn) error { +func (p *Publisher) onClose(_ context.Context, conn net.Conn) error { fmt.Println("Publisher Connection closed", p.id, conn.RemoteAddr()) return nil } -func (p *Publisher) onError(ctx context.Context, conn net.Conn, err error) { +func (p *Publisher) onError(_ context.Context, conn net.Conn, err error) { fmt.Println("Error reading from publisher connection:", err, conn.RemoteAddr()) } @@ -99,7 +102,9 @@ func (p *Publisher) Request(ctx context.Context, task Task, queue string) Result err = fmt.Errorf("failed to connect to broker: %w", err) return Result{Error: err} } - defer conn.Close() + defer func() { + _ = conn.Close() + }() err = p.send(ctx, queue, task, conn, consts.PUBLISH) resultCh := make(chan Result) go func() { diff --git a/queue.go b/queue.go index 70f8e2a..acd0d74 100644 --- a/queue.go +++ b/queue.go @@ -45,38 +45,33 @@ func (b *Broker) NewQueue(name string) *Queue { type QueueTask struct { ctx context.Context payload *Task - retryCount int priority int - index int // The index in the heap + retryCount int + index int } type PriorityQueue []*QueueTask func (pq PriorityQueue) Len() int { return len(pq) } - func (pq PriorityQueue) Less(i, j int) bool { return pq[i].priority > pq[j].priority } - func (pq PriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } - func (pq *PriorityQueue) Push(x interface{}) { n := len(*pq) task := x.(*QueueTask) task.index = n *pq = append(*pq, task) } - func (pq *PriorityQueue) Pop() interface{} { old := *pq n := len(old) task := old[n-1] - old[n-1] = nil // avoid memory leak - task.index = -1 // for safety + task.index = -1 *pq = old[0 : n-1] return task } diff --git a/scheduler.go b/scheduler.go index 8bfd3ff..550b254 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,14 +2,19 @@ package mq import ( "context" + "errors" "fmt" "strconv" "strings" "sync" "sync/atomic" "time" + + "github.com/oarkflow/log" ) +var Logger = log.DefaultLogger + type ScheduleOptions struct { Handler Handler Callback Callback @@ -20,7 +25,7 @@ type ScheduleOptions struct { type SchedulerOption func(*ScheduleOptions) -// Helper functions to create SchedulerOptions +// WithSchedulerHandler Helper functions to create SchedulerOptions func WithSchedulerHandler(handler Handler) SchedulerOption { return func(opts *ScheduleOptions) { opts.Handler = handler @@ -177,16 +182,14 @@ func parseCronSpec(cronSpec string) (CronSchedule, error) { } func cronFieldToString(field string, fieldName string) (string, error) { - switch field { - case "*": + if field == "*" { return fmt.Sprintf("every %s", fieldName), nil - default: - values, err := parseCronValue(field) - if err != nil { - return "", fmt.Errorf("invalid %s field: %s", fieldName, err.Error()) - } - return fmt.Sprintf("%s %s", strings.Join(values, ", "), fieldName), nil } + values, err := parseCronValue(field) + if err != nil { + return "", fmt.Errorf("invalid %s field: %s", fieldName, err.Error()) + } + return fmt.Sprintf("%s %s", strings.Join(values, ", "), fieldName), nil } func parseCronValue(field string) ([]string, error) { @@ -223,6 +226,8 @@ type Scheduler struct { } func (s *Scheduler) Start() { + s.mu.Lock() + defer s.mu.Unlock() for _, task := range s.tasks { go s.schedule(task) } @@ -279,46 +284,81 @@ func (s *Scheduler) schedule(task ScheduledTask) { } } +func startSpan(operation string) (context.Context, func()) { + startTime := time.Now() + Logger.Info().Str("operation", operation).Msg("Span started") + ctx := context.WithValue(context.Background(), "traceID", fmt.Sprintf("%d", startTime.UnixNano())) + return ctx, func() { + duration := time.Since(startTime) + Logger.Info().Str("operation", operation).Msgf("Span ended; duration: %v", duration) + } +} + +func acquireDistributedLock(taskID string) bool { + Logger.Info().Str("taskID", taskID).Msg("Acquiring distributed lock (stub)") + return true +} + +func releaseDistributedLock(taskID string) { + Logger.Info().Str("taskID", taskID).Msg("Releasing distributed lock (stub)") +} + +var taskPool = sync.Pool{ + New: func() interface{} { return new(Task) }, +} +var queueTaskPool = sync.Pool{ + New: func() interface{} { return new(QueueTask) }, +} + +func getQueueTask() *QueueTask { + return queueTaskPool.Get().(*QueueTask) +} + // Enhance executeTask with circuit breaker and diagnostics logging support. func (s *Scheduler) executeTask(task ScheduledTask) { - if !task.config.Overlap && !s.pool.gracefulShutdown { - // Prevent overlapping execution if not allowed. - // ...existing code... - } go func() { - // Recover from panic to keep scheduler running. + _, cancelSpan := startSpan("executeTask") + defer cancelSpan() defer func() { if r := recover(); r != nil { - fmt.Printf("Recovered from panic in scheduled task %s: %v\n", task.payload.ID, r) + Logger.Error().Str("taskID", task.payload.ID).Msgf("Recovered from panic: %v", r) } }() start := time.Now() + for _, plug := range s.pool.plugins { + plug.BeforeTask(getQueueTask()) + } + if !acquireDistributedLock(task.payload.ID) { + Logger.Warn().Str("taskID", task.payload.ID).Msg("Failed to acquire distributed lock") + return + } + defer releaseDistributedLock(task.payload.ID) result := task.handler(task.ctx, task.payload) execTime := time.Since(start).Milliseconds() - // Diagnostics logging if enabled. if s.pool.diagnosticsEnabled { - s.pool.logger.Printf("Scheduled task %s executed in %d ms", task.payload.ID, execTime) + Logger.Info().Str("taskID", task.payload.ID).Msgf("Executed in %d ms", execTime) } - // Circuit breaker check similar to pool. if result.Error != nil && s.pool.circuitBreaker.Enabled { newCount := atomic.AddInt32(&s.pool.circuitBreakerFailureCount, 1) if newCount >= int32(s.pool.circuitBreaker.FailureThreshold) { s.pool.circuitBreakerOpen = true - s.pool.logger.Println("Scheduler: circuit breaker opened due to errors") + Logger.Warn().Msg("Circuit breaker opened due to errors") go func() { time.Sleep(s.pool.circuitBreaker.ResetTimeout) atomic.StoreInt32(&s.pool.circuitBreakerFailureCount, 0) s.pool.circuitBreakerOpen = false - s.pool.logger.Println("Scheduler: circuit breaker reset") + Logger.Info().Msg("Circuit breaker reset to closed state") }() } } - // Invoke callback if defined. if task.config.Callback != nil { _ = task.config.Callback(task.ctx, result) } task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result}) - fmt.Printf("Executed scheduled task: %s\n", task.payload.ID) + for _, plug := range s.pool.plugins { + plug.AfterTask(getQueueTask(), result) + } + Logger.Info().Str("taskID", task.payload.ID).Msg("Scheduled task executed") }() } @@ -354,7 +394,7 @@ func (task ScheduledTask) getNextRunTime(now time.Time) time.Time { func (task ScheduledTask) getNextCronRunTime(now time.Time) time.Time { cronSpecs, err := parseCronSpec(task.schedule.CronSpec) if err != nil { - fmt.Println(fmt.Sprintf("Invalid CRON spec format: %s", err.Error())) + Logger.Error().Err(err).Msg("Invalid CRON spec") return now } nextRun := now @@ -367,40 +407,38 @@ func (task ScheduledTask) getNextCronRunTime(now time.Time) time.Time { } func (task ScheduledTask) applyCronField(t time.Time, fieldSpec string, unit string) time.Time { - switch fieldSpec { - case "*": - return t - default: - value, _ := strconv.Atoi(fieldSpec) - switch unit { - case "minute": - if t.Minute() > value { - t = t.Add(time.Hour) - } - t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), value, 0, 0, t.Location()) - case "hour": - if t.Hour() > value { - t = t.AddDate(0, 0, 1) - } - t = time.Date(t.Year(), t.Month(), t.Day(), value, t.Minute(), 0, 0, t.Location()) - case "day": - if t.Day() > value { - t = t.AddDate(0, 1, 0) - } - t = time.Date(t.Year(), t.Month(), value, t.Hour(), t.Minute(), 0, 0, t.Location()) - case "month": - if int(t.Month()) > value { - t = t.AddDate(1, 0, 0) - } - t = time.Date(t.Year(), time.Month(value), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location()) - case "weekday": - weekday := time.Weekday(value) - for t.Weekday() != weekday { - t = t.AddDate(0, 0, 1) - } - } + if fieldSpec == "*" { return t } + value, _ := strconv.Atoi(fieldSpec) + switch unit { + case "minute": + if t.Minute() > value { + t = t.Add(time.Hour) + } + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), value, 0, 0, t.Location()) + case "hour": + if t.Hour() > value { + t = t.AddDate(0, 0, 1) + } + t = time.Date(t.Year(), t.Month(), t.Day(), value, t.Minute(), 0, 0, t.Location()) + case "day": + if t.Day() > value { + t = t.AddDate(0, 1, 0) + } + t = time.Date(t.Year(), t.Month(), value, t.Hour(), t.Minute(), 0, 0, t.Location()) + case "month": + if int(t.Month()) > value { + t = t.AddDate(1, 0, 0) + } + t = time.Date(t.Year(), time.Month(value), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location()) + case "weekday": + weekday := time.Weekday(value) + for t.Weekday() != weekday { + t = t.AddDate(0, 0, 1) + } + } + return t } func nextWeekday(t time.Time, weekday time.Weekday) time.Time { @@ -410,12 +448,21 @@ func nextWeekday(t time.Time, weekday time.Weekday) time.Time { } return t.AddDate(0, 0, daysUntil) } +func validateTaskInput(task *Task) error { + if task.Payload == nil { + return errors.New("task payload cannot be nil") + } + Logger.Info().Str("taskID", task.ID).Msg("Task validated") + return nil +} func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) { s.mu.Lock() defer s.mu.Unlock() - - // Create a default options instance + if err := validateTaskInput(payload); err != nil { + Logger.Error().Err(err).Msg("Invalid task input") + return + } options := defaultSchedulerOptions() for _, opt := range opts { opt(options) @@ -427,9 +474,7 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule options.Callback = s.pool.callback } stop := make(chan struct{}) - - // Create a new ScheduledTask using the provided options - s.tasks = append(s.tasks, ScheduledTask{ + newTask := ScheduledTask{ ctx: ctx, handler: options.Handler, payload: payload, @@ -442,10 +487,9 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule Interval: options.Interval, Recurring: options.Recurring, }, - }) - - // Start scheduling the task - go s.schedule(s.tasks[len(s.tasks)-1]) + } + s.tasks = append(s.tasks, newTask) + go s.schedule(newTask) } func (s *Scheduler) RemoveTask(payloadID string) { diff --git a/v1/v1.go b/v1/v1.go deleted file mode 100644 index ac9722d..0000000 --- a/v1/v1.go +++ /dev/null @@ -1,1289 +0,0 @@ -package v1 - -import ( - "container/heap" - "context" - "errors" - "fmt" - "math/rand" - "net/http" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/oarkflow/log" -) - -var Logger = log.DefaultLogger - -func startSpan(operation string) (context.Context, func()) { - startTime := time.Now() - Logger.Info().Str("operation", operation).Msg("Span started") - ctx := context.WithValue(context.Background(), "traceID", fmt.Sprintf("%d", startTime.UnixNano())) - return ctx, func() { - duration := time.Since(startTime) - Logger.Info().Str("operation", operation).Msgf("Span ended; duration: %v", duration) - } -} - -var taskPool = sync.Pool{ - New: func() interface{} { return new(Task) }, -} -var queueTaskPool = sync.Pool{ - New: func() interface{} { return new(QueueTask) }, -} - -func getQueueTask() *QueueTask { - return queueTaskPool.Get().(*QueueTask) -} - -type MetricsRegistry interface { - Register(metricName string, value interface{}) - Increment(metricName string) - Get(metricName string) interface{} -} - -type InMemoryMetricsRegistry struct { - metrics map[string]int64 - mu sync.RWMutex -} - -func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry { - return &InMemoryMetricsRegistry{ - metrics: make(map[string]int64), - } -} - -func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{}) { - m.mu.Lock() - defer m.mu.Unlock() - if v, ok := value.(int64); ok { - m.metrics[metricName] = v - Logger.Info().Str("metric", metricName).Msgf("Registered metric: %d", v) - } -} - -func (m *InMemoryMetricsRegistry) Increment(metricName string) { - m.mu.Lock() - defer m.mu.Unlock() - m.metrics[metricName]++ -} - -func (m *InMemoryMetricsRegistry) Get(metricName string) interface{} { - m.mu.RLock() - defer m.mu.RUnlock() - return m.metrics[metricName] -} - -type DeadLetterQueue struct { - tasks []*QueueTask - mu sync.Mutex -} - -func NewDeadLetterQueue() *DeadLetterQueue { - return &DeadLetterQueue{ - tasks: make([]*QueueTask, 0), - } -} - -func (dlq *DeadLetterQueue) Task() []*QueueTask { - return dlq.tasks -} - -func (dlq *DeadLetterQueue) Add(task *QueueTask) { - dlq.mu.Lock() - defer dlq.mu.Unlock() - dlq.tasks = append(dlq.tasks, task) - Logger.Warn().Str("taskID", task.payload.ID).Msg("Task added to Dead Letter Queue") -} - -var DLQ = NewDeadLetterQueue() - -type WarningThresholds struct { - HighMemory int64 - LongExecution time.Duration -} - -type DynamicConfig struct { - Timeout time.Duration - BatchSize int - MaxMemoryLoad int64 - IdleTimeout time.Duration - BackoffDuration time.Duration - MaxRetries int - ReloadInterval time.Duration - WarningThreshold WarningThresholds -} - -var Config = &DynamicConfig{ - Timeout: 10 * time.Second, - BatchSize: 1, - - MaxMemoryLoad: 100 * 1024 * 1024, - IdleTimeout: 5 * time.Minute, - BackoffDuration: 2 * time.Second, - MaxRetries: 3, - ReloadInterval: 30 * time.Second, - WarningThreshold: WarningThresholds{ - HighMemory: 1 * 1024 * 1024, - LongExecution: 2 * time.Second, - }, -} - -func validateDynamicConfig(c *DynamicConfig) error { - if c.Timeout <= 0 { - return errors.New("Timeout must be positive") - } - if c.BatchSize <= 0 { - return errors.New("BatchSize must be > 0") - } - if c.MaxMemoryLoad <= 0 { - return errors.New("MaxMemoryLoad must be > 0") - } - return nil -} - -func startConfigReloader(pool *Pool) { - go func() { - ticker := time.NewTicker(Config.ReloadInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if err := validateDynamicConfig(Config); err != nil { - Logger.Error().Err(err).Msg("Invalid dynamic config, skipping reload") - continue - } - pool.timeout = Config.Timeout - pool.batchSize = Config.BatchSize - pool.maxMemoryLoad = Config.MaxMemoryLoad - pool.idleTimeout = Config.IdleTimeout - pool.backoffDuration = Config.BackoffDuration - pool.maxRetries = Config.MaxRetries - pool.thresholds = ThresholdConfig{ - HighMemory: Config.WarningThreshold.HighMemory, - LongExecution: Config.WarningThreshold.LongExecution, - } - Logger.Info().Msg("Dynamic configuration reloaded") - case <-pool.stop: - return - } - } - }() -} - -type Plugin interface { - Initialize(config interface{}) error - BeforeTask(task *QueueTask) - AfterTask(task *QueueTask, result Result) -} - -type DefaultPlugin struct{} - -func (dp *DefaultPlugin) Initialize(config interface{}) error { return nil } -func (dp *DefaultPlugin) BeforeTask(task *QueueTask) { - Logger.Info().Str("taskID", task.payload.ID).Msg("BeforeTask plugin invoked") -} -func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result) { - Logger.Info().Str("taskID", task.payload.ID).Msg("AfterTask plugin invoked") -} - -func acquireDistributedLock(taskID string) bool { - Logger.Info().Str("taskID", taskID).Msg("Acquiring distributed lock (stub)") - return true -} - -func releaseDistributedLock(taskID string) { - Logger.Info().Str("taskID", taskID).Msg("Releasing distributed lock (stub)") -} - -func validateTaskInput(task *Task) error { - if task.Payload == nil { - return errors.New("task payload cannot be nil") - } - Logger.Info().Str("taskID", task.ID).Msg("Task validated") - return nil -} - -type Callback func(ctx context.Context, result Result) error -type CompletionCallback func() - -type Metrics struct { - TotalTasks int64 - CompletedTasks int64 - ErrorCount int64 - TotalMemoryUsed int64 - TotalScheduled int64 - ExecutionTime int64 -} - -type ScheduleOptions struct { - Handler Handler - Callback Callback - Interval time.Duration - Overlap bool - Recurring bool -} - -type SchedulerOption func(*ScheduleOptions) - -func WithSchedulerHandler(handler Handler) SchedulerOption { - return func(opts *ScheduleOptions) { - opts.Handler = handler - } -} - -func WithSchedulerCallback(callback Callback) SchedulerOption { - return func(opts *ScheduleOptions) { - opts.Callback = callback - } -} - -func WithOverlap() SchedulerOption { - return func(opts *ScheduleOptions) { - opts.Overlap = true - } -} - -func WithInterval(interval time.Duration) SchedulerOption { - return func(opts *ScheduleOptions) { - opts.Interval = interval - } -} - -func WithRecurring() SchedulerOption { - return func(opts *ScheduleOptions) { - opts.Recurring = true - } -} - -func defaultSchedulerOptions() *ScheduleOptions { - return &ScheduleOptions{ - Interval: time.Minute, - Recurring: true, - } -} - -type SchedulerConfig struct { - Callback Callback - Overlap bool -} - -type ScheduledTask struct { - ctx context.Context - handler Handler - payload *Task - config SchedulerConfig - schedule *Schedule - stop chan struct{} - executionHistory []ExecutionHistory -} - -type Schedule struct { - TimeOfDay time.Time - CronSpec string - DayOfWeek []time.Weekday - DayOfMonth []int - Interval time.Duration - Recurring bool -} - -func (s *Schedule) ToHumanReadable() string { - var sb strings.Builder - if s.CronSpec != "" { - cronDescription, err := parseCronSpec(s.CronSpec) - if err != nil { - sb.WriteString(fmt.Sprintf("Invalid CRON spec: %s\n", err.Error())) - } else { - sb.WriteString(fmt.Sprintf("CRON-based schedule: %s\n", cronDescription)) - } - } - if s.Interval > 0 { - sb.WriteString(fmt.Sprintf("Recurring every %s\n", s.Interval)) - } - if len(s.DayOfMonth) > 0 { - sb.WriteString("Occurs on the following days of the month: ") - for i, day := range s.DayOfMonth { - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(fmt.Sprintf("%d", day)) - } - sb.WriteString("\n") - } - if len(s.DayOfWeek) > 0 { - sb.WriteString("Occurs on the following days of the week: ") - for i, day := range s.DayOfWeek { - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(day.String()) - } - sb.WriteString("\n") - } - if !s.TimeOfDay.IsZero() { - sb.WriteString(fmt.Sprintf("Time of day: %s\n", s.TimeOfDay.Format("15:04"))) - } - if s.Recurring { - sb.WriteString("This schedule is recurring.\n") - } else { - sb.WriteString("This schedule is one-time.\n") - } - if sb.Len() == 0 { - sb.WriteString("No schedule defined.") - } - return sb.String() -} - -type CronSchedule struct { - Minute string - Hour string - DayOfMonth string - Month string - DayOfWeek string -} - -func (c CronSchedule) String() string { - return fmt.Sprintf("At %s minute(s) past %s, on %s, during %s, every %s", c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth) -} - -func parseCronSpec(cronSpec string) (CronSchedule, error) { - parts := strings.Fields(cronSpec) - if len(parts) != 5 { - return CronSchedule{}, fmt.Errorf("invalid CRON spec: expected 5 fields, got %d", len(parts)) - } - minute, err := cronFieldToString(parts[0], "minute") - if err != nil { - return CronSchedule{}, err - } - hour, err := cronFieldToString(parts[1], "hour") - if err != nil { - return CronSchedule{}, err - } - dayOfMonth, err := cronFieldToString(parts[2], "day of the month") - if err != nil { - return CronSchedule{}, err - } - month, err := cronFieldToString(parts[3], "month") - if err != nil { - return CronSchedule{}, err - } - dayOfWeek, err := cronFieldToString(parts[4], "day of the week") - if err != nil { - return CronSchedule{}, err - } - return CronSchedule{ - Minute: minute, - Hour: hour, - DayOfMonth: dayOfMonth, - Month: month, - DayOfWeek: dayOfWeek, - }, nil -} - -func cronFieldToString(field string, fieldName string) (string, error) { - if field == "*" { - return fmt.Sprintf("every %s", fieldName), nil - } - values, err := parseCronValue(field) - if err != nil { - return "", fmt.Errorf("invalid %s field: %s", fieldName, err.Error()) - } - return fmt.Sprintf("%s %s", strings.Join(values, ", "), fieldName), nil -} - -func parseCronValue(field string) ([]string, error) { - var values []string - ranges := strings.Split(field, ",") - for _, r := range ranges { - if strings.Contains(r, "-") { - bounds := strings.Split(r, "-") - if len(bounds) != 2 { - return nil, fmt.Errorf("invalid range: %s", r) - } - start, err := strconv.Atoi(bounds[0]) - if err != nil { - return nil, err - } - end, err := strconv.Atoi(bounds[1]) - if err != nil { - return nil, err - } - for i := start; i <= end; i++ { - values = append(values, strconv.Itoa(i)) - } - } else { - values = append(values, r) - } - } - return values, nil -} - -type ExecutionHistory struct { - Timestamp time.Time - Result Result -} - -type Handler func(ctx context.Context, payload *Task) Result - -type Scheduler struct { - pool *Pool - tasks []ScheduledTask - mu sync.Mutex -} - -func NewScheduler(pool *Pool) *Scheduler { - return &Scheduler{pool: pool} -} - -func (s *Scheduler) Start() { - s.mu.Lock() - defer s.mu.Unlock() - for _, task := range s.tasks { - go s.schedule(task) - } -} - -func (s *Scheduler) schedule(task ScheduledTask) { - if s.pool.gracefulShutdown { - return - } - if task.schedule.Interval > 0 { - ticker := time.NewTicker(task.schedule.Interval) - defer ticker.Stop() - if task.schedule.Recurring { - for { - select { - case <-ticker.C: - if s.pool.gracefulShutdown { - return - } - s.executeTask(task) - case <-task.stop: - return - } - } - } else { - select { - case <-ticker.C: - if s.pool.gracefulShutdown { - return - } - s.executeTask(task) - case <-task.stop: - return - } - } - } else if task.schedule.Recurring { - for { - now := time.Now() - nextRun := task.getNextRunTime(now) - select { - case <-time.After(nextRun.Sub(now)): - if s.pool.gracefulShutdown { - return - } - s.executeTask(task) - case <-task.stop: - return - } - } - } -} - -func (s *Scheduler) executeTask(task ScheduledTask) { - go func() { - _, cancelSpan := startSpan("executeTask") - defer cancelSpan() - defer func() { - if r := recover(); r != nil { - Logger.Error().Str("taskID", task.payload.ID).Msgf("Recovered from panic: %v", r) - } - }() - start := time.Now() - for _, plug := range s.pool.plugins { - plug.BeforeTask(getQueueTask()) - } - if !acquireDistributedLock(task.payload.ID) { - Logger.Warn().Str("taskID", task.payload.ID).Msg("Failed to acquire distributed lock") - return - } - defer releaseDistributedLock(task.payload.ID) - result := task.handler(task.ctx, task.payload) - execTime := time.Since(start).Milliseconds() - if s.pool.diagnosticsEnabled { - Logger.Info().Str("taskID", task.payload.ID).Msgf("Executed in %d ms", execTime) - } - if result.Error != nil && s.pool.circuitBreaker.Enabled { - newCount := atomic.AddInt32(&s.pool.circuitBreakerFailureCount, 1) - if newCount >= int32(s.pool.circuitBreaker.FailureThreshold) { - s.pool.circuitBreakerOpen = true - Logger.Warn().Msg("Circuit breaker opened due to errors") - go func() { - time.Sleep(s.pool.circuitBreaker.ResetTimeout) - atomic.StoreInt32(&s.pool.circuitBreakerFailureCount, 0) - s.pool.circuitBreakerOpen = false - Logger.Info().Msg("Circuit breaker reset to closed state") - }() - } - } - if task.config.Callback != nil { - _ = task.config.Callback(task.ctx, result) - } - task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result}) - for _, plug := range s.pool.plugins { - plug.AfterTask(getQueueTask(), result) - } - Logger.Info().Str("taskID", task.payload.ID).Msg("Scheduled task executed") - }() -} - -func (task ScheduledTask) getNextRunTime(now time.Time) time.Time { - if task.schedule.CronSpec != "" { - return task.getNextCronRunTime(now) - } - if len(task.schedule.DayOfMonth) > 0 { - for _, day := range task.schedule.DayOfMonth { - nextRun := time.Date(now.Year(), now.Month(), day, task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location()) - if nextRun.After(now) { - return nextRun - } - } - nextMonth := now.AddDate(0, 1, 0) - return time.Date(nextMonth.Year(), nextMonth.Month(), task.schedule.DayOfMonth[0], task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location()) - } - if len(task.schedule.DayOfWeek) > 0 { - for _, weekday := range task.schedule.DayOfWeek { - nextRun := nextWeekday(now, weekday).Truncate(time.Minute).Add(task.schedule.TimeOfDay.Sub(time.Time{})) - if nextRun.After(now) { - return nextRun - } - } - } - return now -} - -func (task ScheduledTask) getNextCronRunTime(now time.Time) time.Time { - cronSpecs, err := parseCronSpec(task.schedule.CronSpec) - if err != nil { - Logger.Error().Err(err).Msg("Invalid CRON spec") - return now - } - nextRun := now - nextRun = task.applyCronField(nextRun, cronSpecs.Minute, "minute") - nextRun = task.applyCronField(nextRun, cronSpecs.Hour, "hour") - nextRun = task.applyCronField(nextRun, cronSpecs.DayOfMonth, "day") - nextRun = task.applyCronField(nextRun, cronSpecs.Month, "month") - nextRun = task.applyCronField(nextRun, cronSpecs.DayOfWeek, "weekday") - return nextRun -} - -func (task ScheduledTask) applyCronField(t time.Time, fieldSpec string, unit string) time.Time { - if fieldSpec == "*" { - return t - } - value, _ := strconv.Atoi(fieldSpec) - switch unit { - case "minute": - if t.Minute() > value { - t = t.Add(time.Hour) - } - t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), value, 0, 0, t.Location()) - case "hour": - if t.Hour() > value { - t = t.AddDate(0, 0, 1) - } - t = time.Date(t.Year(), t.Month(), t.Day(), value, t.Minute(), 0, 0, t.Location()) - case "day": - if t.Day() > value { - t = t.AddDate(0, 1, 0) - } - t = time.Date(t.Year(), t.Month(), value, t.Hour(), t.Minute(), 0, 0, t.Location()) - case "month": - if int(t.Month()) > value { - t = t.AddDate(1, 0, 0) - } - t = time.Date(t.Year(), time.Month(value), t.Day(), t.Hour(), t.Minute(), 0, 0, t.Location()) - case "weekday": - weekday := time.Weekday(value) - for t.Weekday() != weekday { - t = t.AddDate(0, 0, 1) - } - } - return t -} - -func nextWeekday(t time.Time, weekday time.Weekday) time.Time { - daysUntil := (int(weekday) - int(t.Weekday()) + 7) % 7 - if daysUntil == 0 { - daysUntil = 7 - } - return t.AddDate(0, 0, daysUntil) -} - -func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) { - s.mu.Lock() - defer s.mu.Unlock() - if err := validateTaskInput(payload); err != nil { - Logger.Error().Err(err).Msg("Invalid task input") - return - } - options := defaultSchedulerOptions() - for _, opt := range opts { - opt(options) - } - if options.Handler == nil { - options.Handler = s.pool.handler - } - if options.Callback == nil { - options.Callback = s.pool.callback - } - stop := make(chan struct{}) - newTask := ScheduledTask{ - ctx: ctx, - handler: options.Handler, - payload: payload, - stop: stop, - config: SchedulerConfig{ - Callback: options.Callback, - Overlap: options.Overlap, - }, - schedule: &Schedule{ - Interval: options.Interval, - Recurring: options.Recurring, - }, - } - s.tasks = append(s.tasks, newTask) - go s.schedule(newTask) -} - -func (s *Scheduler) RemoveTask(payloadID string) { - s.mu.Lock() - defer s.mu.Unlock() - for i, task := range s.tasks { - if task.payload.ID == payloadID { - close(task.stop) - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - break - } - } -} - -type Pool struct { - taskStorage TaskStorage - scheduler *Scheduler - stop chan struct{} - taskNotify chan struct{} - workerAdjust chan int - handler Handler - completionCallback CompletionCallback - taskAvailableCond *sync.Cond - callback Callback - taskQueue PriorityQueue - overflowBuffer []*QueueTask - metrics Metrics - wg sync.WaitGroup - taskCompletionNotifier sync.WaitGroup - timeout time.Duration - batchSize int - maxMemoryLoad int64 - idleTimeout time.Duration - backoffDuration time.Duration - maxRetries int - overflowBufferLock sync.RWMutex - taskQueueLock sync.Mutex - numOfWorkers int32 - paused bool - logger log.Logger - gracefulShutdown bool - thresholds ThresholdConfig - diagnosticsEnabled bool - metricsRegistry MetricsRegistry - circuitBreaker CircuitBreakerConfig - circuitBreakerOpen bool - circuitBreakerFailureCount int32 - gracefulShutdownTimeout time.Duration - plugins []Plugin -} - -func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { - pool := &Pool{ - stop: make(chan struct{}), - taskNotify: make(chan struct{}, numOfWorkers), - batchSize: Config.BatchSize, - timeout: Config.Timeout, - idleTimeout: Config.IdleTimeout, - backoffDuration: Config.BackoffDuration, - maxRetries: Config.MaxRetries, - logger: Logger, - metricsRegistry: NewInMemoryMetricsRegistry(), - diagnosticsEnabled: true, - gracefulShutdownTimeout: 10 * time.Second, - } - pool.scheduler = NewScheduler(pool) - pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) - for _, opt := range opts { - opt(pool) - } - if pool.taskQueue == nil { - pool.taskQueue = make(PriorityQueue, 0, 10) - } - heap.Init(&pool.taskQueue) - pool.scheduler.Start() - pool.Start(numOfWorkers) - startConfigReloader(pool) - go pool.dynamicWorkerScaler() - go pool.startHealthServer() - return pool -} - -func (wp *Pool) Start(numWorkers int) { - storedTasks, err := wp.taskStorage.GetAllTasks() - if err == nil { - wp.taskQueueLock.Lock() - for _, task := range storedTasks { - heap.Push(&wp.taskQueue, task) - } - wp.taskQueueLock.Unlock() - } - for i := 0; i < numWorkers; i++ { - wp.wg.Add(1) - go wp.worker() - } - atomic.StoreInt32(&wp.numOfWorkers, int32(numWorkers)) - go wp.monitorWorkerAdjustments() - go wp.startOverflowDrainer() - go wp.monitorIdleWorkers() -} - -func (wp *Pool) worker() { - defer wp.wg.Done() - for { - for len(wp.taskQueue) == 0 && !wp.paused { - wp.Dispatch(wp.taskAvailableCond.Wait) - } - select { - case <-wp.stop: - return - default: - wp.processNextBatch() - } - } -} - -func (wp *Pool) processNextBatch() { - wp.taskQueueLock.Lock() - defer wp.taskQueueLock.Unlock() - tasks := make([]*QueueTask, 0, wp.batchSize) - for len(wp.taskQueue) > 0 && !wp.paused && len(tasks) < wp.batchSize { - task := heap.Pop(&wp.taskQueue).(*QueueTask) - tasks = append(tasks, task) - } - if len(tasks) == 0 && !wp.paused { - for len(tasks) < wp.batchSize { - task, err := wp.taskStorage.FetchNextTask() - if err != nil { - break - } - tasks = append(tasks, task) - } - } - for _, task := range tasks { - if task != nil { - wp.handleTask(task) - } - } -} - -func (wp *Pool) handleTask(task *QueueTask) { - if err := validateTaskInput(task.payload); err != nil { - Logger.Error().Str("taskID", task.payload.ID).Msgf("Validation failed: %v", err) - return - } - ctx, cancel := context.WithTimeout(task.ctx, wp.timeout) - defer cancel() - taskSize := int64(SizeOf(task.payload)) - atomic.AddInt64(&wp.metrics.TotalMemoryUsed, taskSize) - atomic.AddInt64(&wp.metrics.TotalTasks, 1) - startTime := time.Now() - result := wp.handler(ctx, task.payload) - executionTime := time.Since(startTime).Milliseconds() - atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime) - if wp.thresholds.LongExecution > 0 && executionTime > int64(wp.thresholds.LongExecution.Milliseconds()) { - Logger.Warn().Str("taskID", task.payload.ID).Msgf("Exceeded execution time threshold: %d ms", executionTime) - } - if wp.thresholds.HighMemory > 0 && taskSize > wp.thresholds.HighMemory { - Logger.Warn().Str("taskID", task.payload.ID).Msgf("Memory usage %d exceeded threshold", taskSize) - } - if result.Error != nil { - atomic.AddInt64(&wp.metrics.ErrorCount, 1) - Logger.Error().Str("taskID", task.payload.ID).Msgf("Error processing task: %v", result.Error) - wp.backoffAndStore(task) - if wp.circuitBreaker.Enabled { - newCount := atomic.AddInt32(&wp.circuitBreakerFailureCount, 1) - if newCount >= int32(wp.circuitBreaker.FailureThreshold) { - wp.circuitBreakerOpen = true - Logger.Warn().Msg("Circuit breaker opened due to errors") - go func() { - time.Sleep(wp.circuitBreaker.ResetTimeout) - atomic.StoreInt32(&wp.circuitBreakerFailureCount, 0) - wp.circuitBreakerOpen = false - Logger.Info().Msg("Circuit breaker reset to closed state") - }() - } - } - } else { - atomic.AddInt64(&wp.metrics.CompletedTasks, 1) - if wp.circuitBreaker.Enabled { - atomic.StoreInt32(&wp.circuitBreakerFailureCount, 0) - } - } - if wp.diagnosticsEnabled { - Logger.Info().Str("taskID", task.payload.ID).Msgf("Task executed in %d ms", executionTime) - } - if wp.callback != nil { - if err := wp.callback(ctx, result); err != nil { - atomic.AddInt64(&wp.metrics.ErrorCount, 1) - Logger.Error().Str("taskID", task.payload.ID).Msgf("Callback error: %v", err) - } - } - _ = wp.taskStorage.DeleteTask(task.payload.ID) - atomic.AddInt64(&wp.metrics.TotalMemoryUsed, -taskSize) - wp.metricsRegistry.Register("task_execution_time", executionTime) -} - -func (wp *Pool) backoffAndStore(task *QueueTask) { - if task.retryCount < wp.maxRetries { - task.retryCount++ - wp.storeInOverflow(task) - backoff := wp.backoffDuration * (1 << (task.retryCount - 1)) - jitter := time.Duration(rand.Int63n(int64(backoff) / 2)) - sleepDuration := backoff + jitter - Logger.Info().Str("taskID", task.payload.ID).Msgf("Retry %d: sleeping for %s", task.retryCount, sleepDuration) - time.Sleep(sleepDuration) - } else { - Logger.Error().Str("taskID", task.payload.ID).Msg("Task failed after maximum retries") - DLQ.Add(task) - } -} - -func (wp *Pool) monitorIdleWorkers() { - for { - select { - case <-wp.stop: - return - default: - time.Sleep(wp.idleTimeout) - wp.adjustIdleWorkers() - } - } -} - -func (wp *Pool) adjustIdleWorkers() { - currentWorkers := atomic.LoadInt32(&wp.numOfWorkers) - if currentWorkers > 1 { - atomic.StoreInt32(&wp.numOfWorkers, currentWorkers-1) - wp.wg.Add(1) - go wp.worker() - } -} - -func (wp *Pool) monitorWorkerAdjustments() { - for { - select { - case adjustment := <-wp.workerAdjust: - currentWorkers := atomic.LoadInt32(&wp.numOfWorkers) - newWorkerCount := int(currentWorkers) + adjustment - if newWorkerCount > 0 { - wp.adjustWorkers(newWorkerCount) - } - case <-wp.stop: - return - } - } -} - -func (wp *Pool) adjustWorkers(newWorkerCount int) { - currentWorkers := int(atomic.LoadInt32(&wp.numOfWorkers)) - if newWorkerCount > currentWorkers { - for i := 0; i < newWorkerCount-currentWorkers; i++ { - wp.wg.Add(1) - go wp.worker() - } - } else if newWorkerCount < currentWorkers { - for i := 0; i < currentWorkers-newWorkerCount; i++ { - wp.stop <- struct{}{} - } - } - atomic.StoreInt32(&wp.numOfWorkers, int32(newWorkerCount)) -} - -func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error { - if wp.circuitBreaker.Enabled && wp.circuitBreakerOpen { - return fmt.Errorf("circuit breaker open, task rejected") - } - if err := validateTaskInput(payload); err != nil { - return fmt.Errorf("invalid task input: %w", err) - } - if payload.ID == "" { - payload.ID = NewID() - } - task := getQueueTask() - task.ctx = ctx - task.payload = payload - task.priority = priority - task.retryCount = 0 - Logger.Info().Str("taskID", payload.ID).Msg("Enqueuing task") - if err := wp.taskStorage.SaveTask(task); err != nil { - return err - } - wp.taskQueueLock.Lock() - defer wp.taskQueueLock.Unlock() - heap.Push(&wp.taskQueue, task) - wp.Dispatch(wp.taskAvailableCond.Signal) - wp.taskCompletionNotifier.Add(1) - return nil -} - -func (wp *Pool) Dispatch(event func()) { - wp.taskAvailableCond.L.Lock() - event() - wp.taskAvailableCond.L.Unlock() -} - -func (wp *Pool) Pause() { - wp.paused = true - wp.Dispatch(wp.taskAvailableCond.Broadcast) -} - -func (wp *Pool) SetBatchSize(size int) { - wp.batchSize = size -} - -func (wp *Pool) Resume() { - wp.paused = false - wp.Dispatch(wp.taskAvailableCond.Broadcast) -} - -func (wp *Pool) storeInOverflow(task *QueueTask) { - wp.overflowBufferLock.Lock() - wp.overflowBuffer = append(wp.overflowBuffer, task) - wp.overflowBufferLock.Unlock() -} - -func (wp *Pool) startOverflowDrainer() { - for { - wp.drainOverflowBuffer() - select { - case <-wp.stop: - return - default: - time.Sleep(100 * time.Millisecond) - } - } -} - -func (wp *Pool) drainOverflowBuffer() { - wp.overflowBufferLock.Lock() - overflowTasks := wp.overflowBuffer - wp.overflowBuffer = nil - wp.overflowBufferLock.Unlock() - for _, task := range overflowTasks { - select { - case wp.taskNotify <- struct{}{}: - wp.taskQueueLock.Lock() - heap.Push(&wp.taskQueue, task) - wp.taskQueueLock.Unlock() - default: - return - } - } -} - -func (wp *Pool) Stop() { - wp.gracefulShutdown = true - wp.Pause() - close(wp.stop) - done := make(chan struct{}) - go func() { - wp.wg.Wait() - wp.taskCompletionNotifier.Wait() - close(done) - }() - select { - case <-done: - case <-time.After(wp.gracefulShutdownTimeout): - Logger.Warn().Msg("Graceful shutdown timeout reached") - } - if wp.completionCallback != nil { - wp.completionCallback() - } -} - -func (wp *Pool) AdjustWorkerCount(newWorkerCount int) { - adjustment := newWorkerCount - int(atomic.LoadInt32(&wp.numOfWorkers)) - if adjustment != 0 { - wp.workerAdjust <- adjustment - } -} - -func (wp *Pool) Metrics() Metrics { - wp.metrics.TotalScheduled = int64(len(wp.scheduler.tasks)) - return wp.metrics -} - -func (wp *Pool) Scheduler() *Scheduler { return wp.scheduler } - -func (wp *Pool) dynamicWorkerScaler() { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - wp.taskQueueLock.Lock() - queueLen := len(wp.taskQueue) - wp.taskQueueLock.Unlock() - newWorkers := queueLen/5 + 1 - Logger.Info().Msgf("Auto-scaling: queue length %d, adjusting workers to %d", queueLen, newWorkers) - wp.AdjustWorkerCount(newWorkers) - case <-wp.stop: - return - } - } -} - -func (wp *Pool) startHealthServer() { - mux := http.NewServeMux() - mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - status := "OK" - if wp.gracefulShutdown { - status = "shutting down" - } - fmt.Fprintf(w, "status: %s\nworkers: %d\nqueueLength: %d\n", - status, atomic.LoadInt32(&wp.numOfWorkers), len(wp.taskQueue)) - }) - server := &http.Server{ - Addr: ":8080", - Handler: mux, - } - go func() { - Logger.Info().Msg("Starting health server on :8080") - if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - Logger.Error().Err(err).Msg("Health server failed") - } - }() - - go func() { - <-wp.stop - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := server.Shutdown(ctx); err != nil { - Logger.Error().Err(err).Msg("Health server shutdown failed") - } else { - Logger.Info().Msg("Health server shutdown gracefully") - } - }() -} - -type ThresholdConfig struct { - HighMemory int64 - LongExecution time.Duration -} - -type CircuitBreakerConfig struct { - Enabled bool - FailureThreshold int - ResetTimeout time.Duration -} - -type PoolOption func(*Pool) - -func WithTaskQueueSize(size int) PoolOption { - return func(p *Pool) { - p.taskQueue = make(PriorityQueue, 0, size) - } -} - -func WithTaskTimeout(t time.Duration) PoolOption { - return func(p *Pool) { - p.timeout = t - } -} - -func WithCompletionCallback(callback func()) PoolOption { - return func(p *Pool) { - p.completionCallback = callback - } -} - -func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption { - return func(p *Pool) { - p.maxMemoryLoad = maxMemoryLoad - } -} - -func WithBatchSize(batchSize int) PoolOption { - return func(p *Pool) { - p.batchSize = batchSize - } -} - -func WithHandler(handler Handler) PoolOption { - return func(p *Pool) { - p.handler = handler - } -} - -func WithPoolCallback(callback Callback) PoolOption { - return func(p *Pool) { - p.callback = callback - } -} - -func WithTaskStorage(storage TaskStorage) PoolOption { - return func(p *Pool) { - p.taskStorage = storage - } -} - -func WithWarningThresholds(thresholds ThresholdConfig) PoolOption { - return func(p *Pool) { - p.thresholds = thresholds - } -} - -func WithDiagnostics(enabled bool) PoolOption { - return func(p *Pool) { - p.diagnosticsEnabled = enabled - } -} - -func WithMetricsRegistry(registry MetricsRegistry) PoolOption { - return func(p *Pool) { - p.metricsRegistry = registry - } -} - -func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption { - return func(p *Pool) { - p.circuitBreaker = config - } -} - -func WithGracefulShutdown(timeout time.Duration) PoolOption { - return func(p *Pool) { - p.gracefulShutdownTimeout = timeout - } -} - -func WithPlugin(plugin Plugin) PoolOption { - return func(p *Pool) { - p.plugins = append(p.plugins, plugin) - } -} - -type Task struct { - ID string - Payload interface{} -} - -type Result struct { - Error error -} - -type QueueTask struct { - ctx context.Context - payload *Task - priority int - retryCount int - index int -} - -type PriorityQueue []*QueueTask - -func (pq PriorityQueue) Len() int { return len(pq) } -func (pq PriorityQueue) Less(i, j int) bool { - return pq[i].priority > pq[j].priority -} -func (pq PriorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j -} -func (pq *PriorityQueue) Push(x interface{}) { - n := len(*pq) - task := x.(*QueueTask) - task.index = n - *pq = append(*pq, task) -} -func (pq *PriorityQueue) Pop() interface{} { - old := *pq - n := len(old) - task := old[n-1] - task.index = -1 - *pq = old[0 : n-1] - return task -} - -func NewID() string { - return fmt.Sprintf("%d", time.Now().UnixNano()) -} - -func SizeOf(payload interface{}) int { - - return 100 -} - -type TaskStorage interface { - SaveTask(task *QueueTask) error - FetchNextTask() (*QueueTask, error) - DeleteTask(taskID string) error - GetAllTasks() ([]*QueueTask, error) -} - -type InMemoryTaskStorage struct { - tasks map[string]*QueueTask - mu sync.RWMutex -} - -func NewInMemoryTaskStorage() *InMemoryTaskStorage { - return &InMemoryTaskStorage{ - tasks: make(map[string]*QueueTask), - } -} - -func (s *InMemoryTaskStorage) SaveTask(task *QueueTask) error { - s.mu.Lock() - defer s.mu.Unlock() - s.tasks[task.payload.ID] = task - return nil -} - -func (s *InMemoryTaskStorage) FetchNextTask() (*QueueTask, error) { - s.mu.Lock() - defer s.mu.Unlock() - for id, task := range s.tasks { - delete(s.tasks, id) - return task, nil - } - return nil, errors.New("no tasks") -} - -func (s *InMemoryTaskStorage) DeleteTask(taskID string) error { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.tasks, taskID) - return nil -} - -func (s *InMemoryTaskStorage) GetAllTasks() ([]*QueueTask, error) { - s.mu.RLock() - defer s.mu.RUnlock() - tasks := make([]*QueueTask, 0, len(s.tasks)) - for _, task := range s.tasks { - tasks = append(tasks, task) - } - return tasks, nil -}