mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-29 08:42:29 +08:00
feat: addenhancements
This commit is contained in:
92
pool.go
92
pool.go
@@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -49,6 +50,17 @@ type Pool struct {
|
||||
taskQueueLock sync.Mutex
|
||||
numOfWorkers int32
|
||||
paused bool
|
||||
logger *log.Logger
|
||||
gracefulShutdown bool
|
||||
|
||||
// New fields for enhancements:
|
||||
thresholds ThresholdConfig
|
||||
diagnosticsEnabled bool
|
||||
metricsRegistry MetricsRegistry
|
||||
circuitBreaker CircuitBreakerConfig
|
||||
circuitBreakerOpen bool
|
||||
circuitBreakerFailureCount int32
|
||||
gracefulShutdownTimeout time.Duration
|
||||
}
|
||||
|
||||
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
|
||||
@@ -60,6 +72,7 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
|
||||
idleTimeout: 5 * time.Minute,
|
||||
backoffDuration: 2 * time.Second,
|
||||
maxRetries: 3, // Set max retries for failed tasks
|
||||
logger: log.Default(),
|
||||
}
|
||||
pool.scheduler = NewScheduler(pool)
|
||||
pool.taskAvailableCond = sync.NewCond(&sync.Mutex{})
|
||||
@@ -147,17 +160,51 @@ func (wp *Pool) handleTask(task *QueueTask) {
|
||||
result := wp.handler(ctx, task.payload)
|
||||
executionTime := time.Since(startTime).Milliseconds()
|
||||
atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime)
|
||||
|
||||
// Warning thresholds check
|
||||
if wp.thresholds.LongExecution > 0 && executionTime > int64(wp.thresholds.LongExecution.Milliseconds()) {
|
||||
wp.logger.Printf("Warning: Task %s exceeded execution time threshold: %d ms", task.payload.ID, executionTime)
|
||||
}
|
||||
if wp.thresholds.HighMemory > 0 && taskSize > wp.thresholds.HighMemory {
|
||||
wp.logger.Printf("Warning: Task %s memory usage %d exceeded threshold", task.payload.ID, taskSize)
|
||||
}
|
||||
|
||||
if result.Error != nil {
|
||||
atomic.AddInt64(&wp.metrics.ErrorCount, 1)
|
||||
log.Printf("Error processing task %s: %v", task.payload.ID, result.Error)
|
||||
wp.logger.Printf("Error processing task %s: %v", task.payload.ID, result.Error)
|
||||
wp.backoffAndStore(task)
|
||||
|
||||
// Circuit breaker check
|
||||
if wp.circuitBreaker.Enabled {
|
||||
newCount := atomic.AddInt32(&wp.circuitBreakerFailureCount, 1)
|
||||
if newCount >= int32(wp.circuitBreaker.FailureThreshold) {
|
||||
wp.circuitBreakerOpen = true
|
||||
wp.logger.Println("Circuit breaker opened due to errors")
|
||||
go func() {
|
||||
time.Sleep(wp.circuitBreaker.ResetTimeout)
|
||||
atomic.StoreInt32(&wp.circuitBreakerFailureCount, 0)
|
||||
wp.circuitBreakerOpen = false
|
||||
wp.logger.Println("Circuit breaker reset")
|
||||
}()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
atomic.AddInt64(&wp.metrics.CompletedTasks, 1)
|
||||
// Reset failure count on success if using circuit breaker
|
||||
if wp.circuitBreaker.Enabled {
|
||||
atomic.StoreInt32(&wp.circuitBreakerFailureCount, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// Diagnostics logging if enabled
|
||||
if wp.diagnosticsEnabled {
|
||||
wp.logger.Printf("Task %s executed in %d ms", task.payload.ID, executionTime)
|
||||
}
|
||||
|
||||
if wp.callback != nil {
|
||||
if err := wp.callback(ctx, result); err != nil {
|
||||
atomic.AddInt64(&wp.metrics.ErrorCount, 1)
|
||||
log.Printf("Error in callback for task %s: %v", task.payload.ID, err)
|
||||
wp.logger.Printf("Error in callback for task %s: %v", task.payload.ID, err)
|
||||
}
|
||||
}
|
||||
_ = wp.taskStorage.DeleteTask(task.payload.ID)
|
||||
@@ -168,9 +215,14 @@ func (wp *Pool) backoffAndStore(task *QueueTask) {
|
||||
if task.retryCount < wp.maxRetries {
|
||||
task.retryCount++
|
||||
wp.storeInOverflow(task)
|
||||
time.Sleep(wp.backoffDuration)
|
||||
// Exponential backoff with jitter:
|
||||
backoff := wp.backoffDuration * (1 << (task.retryCount - 1))
|
||||
jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
|
||||
sleepDuration := backoff + jitter
|
||||
wp.logger.Printf("Task %s retry %d: sleeping for %s", task.payload.ID, task.retryCount, sleepDuration)
|
||||
time.Sleep(sleepDuration)
|
||||
} else {
|
||||
log.Printf("Task %s failed after maximum retries", task.payload.ID)
|
||||
wp.logger.Printf("Task %s failed after maximum retries", task.payload.ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,6 +278,11 @@ func (wp *Pool) adjustWorkers(newWorkerCount int) {
|
||||
}
|
||||
|
||||
func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error {
|
||||
// Check circuit breaker state
|
||||
if wp.circuitBreaker.Enabled && wp.circuitBreakerOpen {
|
||||
return fmt.Errorf("circuit breaker open, task rejected")
|
||||
}
|
||||
|
||||
if payload.ID == "" {
|
||||
payload.ID = NewID()
|
||||
}
|
||||
@@ -303,9 +360,23 @@ func (wp *Pool) drainOverflowBuffer() {
|
||||
}
|
||||
|
||||
func (wp *Pool) Stop() {
|
||||
wp.gracefulShutdown = true
|
||||
wp.Pause()
|
||||
close(wp.stop)
|
||||
wp.wg.Wait()
|
||||
wp.taskCompletionNotifier.Wait()
|
||||
|
||||
// Graceful shutdown with timeout support
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wp.wg.Wait()
|
||||
wp.taskCompletionNotifier.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
// All workers finished gracefully.
|
||||
case <-time.After(wp.gracefulShutdownTimeout):
|
||||
wp.logger.Println("Graceful shutdown timeout reached")
|
||||
}
|
||||
if wp.completionCallback != nil {
|
||||
wp.completionCallback()
|
||||
}
|
||||
@@ -324,3 +395,12 @@ func (wp *Pool) Metrics() Metrics {
|
||||
}
|
||||
|
||||
func (wp *Pool) Scheduler() *Scheduler { return wp.scheduler }
|
||||
|
||||
// [Enhancement Suggestions]
|
||||
// - Introduce a graceful shutdown sequence that prevents accepting new tasks.
|
||||
// - Integrate a circuit breaker pattern to temporarily pause task processing if errors spike.
|
||||
// - Emit events or metrics (with timestamps/retries) for monitoring worker health.
|
||||
// - Refine the worker autoscaling or rate-limiting to respond to load changes.
|
||||
// - Add proper context propagation (including cancellation and deadlines) across goroutines.
|
||||
// - Incorporate centralized error logging and alerting based on failure types.
|
||||
// - Consider unit tests and stress tests hooks to simulate production load.
|
||||
|
||||
@@ -4,6 +4,23 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// New type definitions for enhancements
|
||||
type ThresholdConfig struct {
|
||||
HighMemory int64 // e.g. in bytes
|
||||
LongExecution time.Duration // e.g. warning if task execution exceeds
|
||||
}
|
||||
|
||||
type MetricsRegistry interface {
|
||||
Register(metricName string, value interface{})
|
||||
// ...other methods as needed...
|
||||
}
|
||||
|
||||
type CircuitBreakerConfig struct {
|
||||
Enabled bool
|
||||
FailureThreshold int
|
||||
ResetTimeout time.Duration
|
||||
}
|
||||
|
||||
type PoolOption func(*Pool)
|
||||
|
||||
func WithTaskQueueSize(size int) PoolOption {
|
||||
@@ -54,3 +71,42 @@ func WithTaskStorage(storage TaskStorage) PoolOption {
|
||||
p.taskStorage = storage
|
||||
}
|
||||
}
|
||||
|
||||
// New option functions:
|
||||
|
||||
func WithWarningThresholds(thresholds ThresholdConfig) PoolOption {
|
||||
return func(p *Pool) {
|
||||
p.thresholds = thresholds
|
||||
}
|
||||
}
|
||||
|
||||
func WithDiagnostics(enabled bool) PoolOption {
|
||||
return func(p *Pool) {
|
||||
p.diagnosticsEnabled = enabled
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetricsRegistry(registry MetricsRegistry) PoolOption {
|
||||
return func(p *Pool) {
|
||||
p.metricsRegistry = registry
|
||||
}
|
||||
}
|
||||
|
||||
func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption {
|
||||
return func(p *Pool) {
|
||||
p.circuitBreaker = config
|
||||
}
|
||||
}
|
||||
|
||||
func WithGracefulShutdown(timeout time.Duration) PoolOption {
|
||||
return func(p *Pool) {
|
||||
p.gracefulShutdownTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// [Enhancements]
|
||||
// - Add WithWarningThresholds(thresholds ThresholdConfig) option to flag high memory or long-running tasks.
|
||||
// - Add WithDiagnostics(enabled bool) to toggle detailed diagnostics logging.
|
||||
// - Add WithMetricsRegistry(registry MetricsRegistry) to inject a metrics/monitoring system.
|
||||
// - Add WithCircuitBreaker(config CircuitBreakerConfig) for pausing task intake on error spikes.
|
||||
// - Add WithGracefulShutdown(timeout time.Duration) to configure the shutdown phase.
|
||||
|
||||
71
scheduler.go
71
scheduler.go
@@ -6,6 +6,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -228,6 +229,10 @@ func (s *Scheduler) Start() {
|
||||
}
|
||||
|
||||
func (s *Scheduler) schedule(task ScheduledTask) {
|
||||
// Check for graceful shutdown
|
||||
if s.pool.gracefulShutdown {
|
||||
return
|
||||
}
|
||||
if task.schedule.Interval > 0 {
|
||||
ticker := time.NewTicker(task.schedule.Interval)
|
||||
defer ticker.Stop()
|
||||
@@ -236,6 +241,10 @@ func (s *Scheduler) schedule(task ScheduledTask) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Check for graceful shutdown before executing
|
||||
if s.pool.gracefulShutdown {
|
||||
return
|
||||
}
|
||||
s.executeTask(task)
|
||||
case <-task.stop:
|
||||
return
|
||||
@@ -245,6 +254,9 @@ func (s *Scheduler) schedule(task ScheduledTask) {
|
||||
// Execute once and return
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if s.pool.gracefulShutdown {
|
||||
return
|
||||
}
|
||||
s.executeTask(task)
|
||||
case <-task.stop:
|
||||
return
|
||||
@@ -256,6 +268,9 @@ func (s *Scheduler) schedule(task ScheduledTask) {
|
||||
nextRun := task.getNextRunTime(now)
|
||||
select {
|
||||
case <-time.After(nextRun.Sub(now)):
|
||||
if s.pool.gracefulShutdown {
|
||||
return
|
||||
}
|
||||
s.executeTask(task)
|
||||
case <-task.stop:
|
||||
return
|
||||
@@ -264,6 +279,49 @@ func (s *Scheduler) schedule(task ScheduledTask) {
|
||||
}
|
||||
}
|
||||
|
||||
// Enhance executeTask with circuit breaker and diagnostics logging support.
|
||||
func (s *Scheduler) executeTask(task ScheduledTask) {
|
||||
if !task.config.Overlap && !s.pool.gracefulShutdown {
|
||||
// Prevent overlapping execution if not allowed.
|
||||
// ...existing code...
|
||||
}
|
||||
go func() {
|
||||
// Recover from panic to keep scheduler running.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
fmt.Printf("Recovered from panic in scheduled task %s: %v\n", task.payload.ID, r)
|
||||
}
|
||||
}()
|
||||
start := time.Now()
|
||||
result := task.handler(task.ctx, task.payload)
|
||||
execTime := time.Since(start).Milliseconds()
|
||||
// Diagnostics logging if enabled.
|
||||
if s.pool.diagnosticsEnabled {
|
||||
s.pool.logger.Printf("Scheduled task %s executed in %d ms", task.payload.ID, execTime)
|
||||
}
|
||||
// Circuit breaker check similar to pool.
|
||||
if result.Error != nil && s.pool.circuitBreaker.Enabled {
|
||||
newCount := atomic.AddInt32(&s.pool.circuitBreakerFailureCount, 1)
|
||||
if newCount >= int32(s.pool.circuitBreaker.FailureThreshold) {
|
||||
s.pool.circuitBreakerOpen = true
|
||||
s.pool.logger.Println("Scheduler: circuit breaker opened due to errors")
|
||||
go func() {
|
||||
time.Sleep(s.pool.circuitBreaker.ResetTimeout)
|
||||
atomic.StoreInt32(&s.pool.circuitBreakerFailureCount, 0)
|
||||
s.pool.circuitBreakerOpen = false
|
||||
s.pool.logger.Println("Scheduler: circuit breaker reset")
|
||||
}()
|
||||
}
|
||||
}
|
||||
// Invoke callback if defined.
|
||||
if task.config.Callback != nil {
|
||||
_ = task.config.Callback(task.ctx, result)
|
||||
}
|
||||
task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result})
|
||||
fmt.Printf("Executed scheduled task: %s\n", task.payload.ID)
|
||||
}()
|
||||
}
|
||||
|
||||
func NewScheduler(pool *Pool) *Scheduler {
|
||||
return &Scheduler{pool: pool}
|
||||
}
|
||||
@@ -353,19 +411,6 @@ func nextWeekday(t time.Time, weekday time.Weekday) time.Time {
|
||||
return t.AddDate(0, 0, daysUntil)
|
||||
}
|
||||
|
||||
func (s *Scheduler) executeTask(task ScheduledTask) {
|
||||
if task.config.Overlap || len(task.schedule.DayOfWeek) == 0 {
|
||||
go func() {
|
||||
result := task.handler(task.ctx, task.payload)
|
||||
task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result})
|
||||
if task.config.Callback != nil {
|
||||
_ = task.config.Callback(task.ctx, result)
|
||||
}
|
||||
fmt.Printf("Executed scheduled task: %s\n", task.payload.ID)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user