feat: add task completion

This commit is contained in:
sujit
2024-10-23 14:45:05 +05:45
parent a96c7d78f6
commit 63bb97c3b6
2 changed files with 27 additions and 10 deletions

View File

@@ -47,6 +47,7 @@ type Pool struct {
completionCallback CompletionCallback completionCallback CompletionCallback
taskCompletionNotifier sync.WaitGroup taskCompletionNotifier sync.WaitGroup
idleTimeout time.Duration idleTimeout time.Duration
backoffDuration time.Duration // Duration to back off for overflow tasks
} }
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool { func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
@@ -56,6 +57,7 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
batchSize: 1, batchSize: 1,
timeout: 10 * time.Second, timeout: 10 * time.Second,
idleTimeout: 5 * time.Minute, idleTimeout: 5 * time.Minute,
backoffDuration: 2 * time.Second, // Initial backoff duration
} }
pool.scheduler = NewScheduler(pool) pool.scheduler = NewScheduler(pool)
pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) pool.taskAvailableCond = sync.NewCond(&sync.Mutex{})
@@ -145,6 +147,7 @@ func (wp *Pool) handleTask(task *QueueTask) {
if result.Error != nil { if result.Error != nil {
wp.metrics.ErrorCount++ wp.metrics.ErrorCount++
log.Printf("Error processing task %s: %v", task.payload.ID, result.Error) log.Printf("Error processing task %s: %v", task.payload.ID, result.Error)
wp.backoffAndStore(task) // Backoff and store in overflow
} else { } else {
wp.metrics.CompletedTasks++ wp.metrics.CompletedTasks++
} }
@@ -158,6 +161,11 @@ func (wp *Pool) handleTask(task *QueueTask) {
wp.metrics.TotalMemoryUsed -= taskSize wp.metrics.TotalMemoryUsed -= taskSize
} }
func (wp *Pool) backoffAndStore(task *QueueTask) {
wp.storeInOverflow(task)
time.Sleep(wp.backoffDuration) // Apply backoff delay
}
func (wp *Pool) monitorIdleWorkers() { func (wp *Pool) monitorIdleWorkers() {
for { for {
select { select {

View File

@@ -46,6 +46,7 @@ type QueueTask struct {
ctx context.Context ctx context.Context
payload *Task payload *Task
priority int priority int
index int // The index in the heap
} }
type PriorityQueue []*QueueTask type PriorityQueue []*QueueTask
@@ -56,17 +57,25 @@ func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].priority > pq[j].priority return pq[i].priority > pq[j].priority
} }
func (pq PriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) { func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*QueueTask) n := len(*pq)
*pq = append(*pq, item) task := x.(*QueueTask)
task.index = n
*pq = append(*pq, task)
} }
func (pq *PriorityQueue) Pop() interface{} { func (pq *PriorityQueue) Pop() interface{} {
old := *pq old := *pq
n := len(old) n := len(old)
item := old[n-1] task := old[n-1]
old[n-1] = nil // avoid memory leak
task.index = -1 // for safety
*pq = old[0 : n-1] *pq = old[0 : n-1]
return item return task
} }