diff --git a/pool.go b/pool.go index e939157..9dd5d42 100644 --- a/pool.go +++ b/pool.go @@ -47,15 +47,17 @@ type Pool struct { completionCallback CompletionCallback taskCompletionNotifier sync.WaitGroup idleTimeout time.Duration + backoffDuration time.Duration // Duration to back off for overflow tasks } func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { pool := &Pool{ - stop: make(chan struct{}), - taskNotify: make(chan struct{}, numOfWorkers), - batchSize: 1, - timeout: 10 * time.Second, - idleTimeout: 5 * time.Minute, + stop: make(chan struct{}), + taskNotify: make(chan struct{}, numOfWorkers), + batchSize: 1, + timeout: 10 * time.Second, + idleTimeout: 5 * time.Minute, + backoffDuration: 2 * time.Second, // Initial backoff duration } pool.scheduler = NewScheduler(pool) pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) @@ -145,6 +147,7 @@ func (wp *Pool) handleTask(task *QueueTask) { if result.Error != nil { wp.metrics.ErrorCount++ log.Printf("Error processing task %s: %v", task.payload.ID, result.Error) + wp.backoffAndStore(task) // Backoff and store in overflow } else { wp.metrics.CompletedTasks++ } @@ -158,6 +161,11 @@ func (wp *Pool) handleTask(task *QueueTask) { wp.metrics.TotalMemoryUsed -= taskSize } +func (wp *Pool) backoffAndStore(task *QueueTask) { + wp.storeInOverflow(task) + time.Sleep(wp.backoffDuration) // Apply backoff delay +} + func (wp *Pool) monitorIdleWorkers() { for { select { diff --git a/queue.go b/queue.go index 6d4fcdd..b677897 100644 --- a/queue.go +++ b/queue.go @@ -46,6 +46,7 @@ type QueueTask struct { ctx context.Context payload *Task priority int + index int // The index in the heap } type PriorityQueue []*QueueTask @@ -56,17 +57,25 @@ func (pq PriorityQueue) Less(i, j int) bool { return pq[i].priority > pq[j].priority } -func (pq PriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} func (pq *PriorityQueue) Push(x interface{}) { - item := x.(*QueueTask) - *pq = append(*pq, item) + n := len(*pq) + task := x.(*QueueTask) + task.index = n + *pq = append(*pq, task) } func (pq *PriorityQueue) Pop() interface{} { old := *pq n := len(old) - item := old[n-1] + task := old[n-1] + old[n-1] = nil // avoid memory leak + task.index = -1 // for safety *pq = old[0 : n-1] - return item + return task }