mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-13 00:33:41 +08:00
feat: add task completion
This commit is contained in:
44
pool.go
44
pool.go
@@ -47,7 +47,8 @@ type Pool struct {
|
|||||||
completionCallback CompletionCallback
|
completionCallback CompletionCallback
|
||||||
taskCompletionNotifier sync.WaitGroup
|
taskCompletionNotifier sync.WaitGroup
|
||||||
idleTimeout time.Duration
|
idleTimeout time.Duration
|
||||||
backoffDuration time.Duration // Duration to back off for overflow tasks
|
backoffDuration time.Duration
|
||||||
|
maxRetries int // Max retries for tasks
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
|
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
|
||||||
@@ -57,7 +58,8 @@ func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
|
|||||||
batchSize: 1,
|
batchSize: 1,
|
||||||
timeout: 10 * time.Second,
|
timeout: 10 * time.Second,
|
||||||
idleTimeout: 5 * time.Minute,
|
idleTimeout: 5 * time.Minute,
|
||||||
backoffDuration: 2 * time.Second, // Initial backoff duration
|
backoffDuration: 2 * time.Second,
|
||||||
|
maxRetries: 3, // Set max retries for failed tasks
|
||||||
}
|
}
|
||||||
pool.scheduler = NewScheduler(pool)
|
pool.scheduler = NewScheduler(pool)
|
||||||
pool.taskAvailableCond = sync.NewCond(&sync.Mutex{})
|
pool.taskAvailableCond = sync.NewCond(&sync.Mutex{})
|
||||||
@@ -138,32 +140,37 @@ func (wp *Pool) handleTask(task *QueueTask) {
|
|||||||
ctx, cancel := context.WithTimeout(task.ctx, wp.timeout)
|
ctx, cancel := context.WithTimeout(task.ctx, wp.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
taskSize := int64(utils.SizeOf(task.payload))
|
taskSize := int64(utils.SizeOf(task.payload))
|
||||||
wp.metrics.TotalMemoryUsed += taskSize
|
atomic.AddInt64(&wp.metrics.TotalMemoryUsed, taskSize)
|
||||||
wp.metrics.TotalTasks++
|
atomic.AddInt64(&wp.metrics.TotalTasks, 1)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
result := wp.handler(ctx, task.payload)
|
result := wp.handler(ctx, task.payload)
|
||||||
executionTime := time.Since(startTime).Milliseconds()
|
executionTime := time.Since(startTime).Milliseconds()
|
||||||
atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime)
|
atomic.AddInt64(&wp.metrics.ExecutionTime, executionTime)
|
||||||
if result.Error != nil {
|
if result.Error != nil {
|
||||||
wp.metrics.ErrorCount++
|
atomic.AddInt64(&wp.metrics.ErrorCount, 1)
|
||||||
log.Printf("Error processing task %s: %v", task.payload.ID, result.Error)
|
log.Printf("Error processing task %s: %v", task.payload.ID, result.Error)
|
||||||
wp.backoffAndStore(task) // Backoff and store in overflow
|
wp.backoffAndStore(task)
|
||||||
} else {
|
} else {
|
||||||
wp.metrics.CompletedTasks++
|
atomic.AddInt64(&wp.metrics.CompletedTasks, 1)
|
||||||
}
|
}
|
||||||
if wp.callback != nil {
|
if wp.callback != nil {
|
||||||
if err := wp.callback(ctx, result); err != nil {
|
if err := wp.callback(ctx, result); err != nil {
|
||||||
wp.metrics.ErrorCount++
|
atomic.AddInt64(&wp.metrics.ErrorCount, 1)
|
||||||
log.Printf("Error in callback for task %s: %v", task.payload.ID, err)
|
log.Printf("Error in callback for task %s: %v", task.payload.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = wp.taskStorage.DeleteTask(task.payload.ID)
|
_ = wp.taskStorage.DeleteTask(task.payload.ID)
|
||||||
wp.metrics.TotalMemoryUsed -= taskSize
|
atomic.AddInt64(&wp.metrics.TotalMemoryUsed, -taskSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wp *Pool) backoffAndStore(task *QueueTask) {
|
func (wp *Pool) backoffAndStore(task *QueueTask) {
|
||||||
wp.storeInOverflow(task)
|
if task.retryCount < wp.maxRetries {
|
||||||
time.Sleep(wp.backoffDuration) // Apply backoff delay
|
task.retryCount++
|
||||||
|
wp.storeInOverflow(task)
|
||||||
|
time.Sleep(wp.backoffDuration)
|
||||||
|
} else {
|
||||||
|
log.Printf("Task %s failed after maximum retries", task.payload.ID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wp *Pool) monitorIdleWorkers() {
|
func (wp *Pool) monitorIdleWorkers() {
|
||||||
@@ -228,9 +235,9 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er
|
|||||||
wp.taskQueueLock.Lock()
|
wp.taskQueueLock.Lock()
|
||||||
defer wp.taskQueueLock.Unlock()
|
defer wp.taskQueueLock.Unlock()
|
||||||
taskSize := int64(utils.SizeOf(payload))
|
taskSize := int64(utils.SizeOf(payload))
|
||||||
if wp.metrics.TotalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 {
|
if atomic.LoadInt64(&wp.metrics.TotalMemoryUsed)+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 {
|
||||||
wp.storeInOverflow(task)
|
wp.storeInOverflow(task)
|
||||||
return fmt.Errorf("max memory load reached, task stored in overflow buffer of size %d", taskSize)
|
return fmt.Errorf("max memory load reached, task stored in overflow buffer")
|
||||||
}
|
}
|
||||||
heap.Push(&wp.taskQueue, task)
|
heap.Push(&wp.taskQueue, task)
|
||||||
wp.Dispatch(wp.taskAvailableCond.Signal)
|
wp.Dispatch(wp.taskAvailableCond.Signal)
|
||||||
@@ -278,13 +285,15 @@ func (wp *Pool) startOverflowDrainer() {
|
|||||||
|
|
||||||
func (wp *Pool) drainOverflowBuffer() {
|
func (wp *Pool) drainOverflowBuffer() {
|
||||||
wp.overflowBufferLock.Lock()
|
wp.overflowBufferLock.Lock()
|
||||||
defer wp.overflowBufferLock.Unlock()
|
overflowTasks := wp.overflowBuffer
|
||||||
for len(wp.overflowBuffer) > 0 {
|
wp.overflowBuffer = nil // Clear buffer
|
||||||
|
wp.overflowBufferLock.Unlock()
|
||||||
|
|
||||||
|
for _, task := range overflowTasks {
|
||||||
select {
|
select {
|
||||||
case wp.taskNotify <- struct{}{}:
|
case wp.taskNotify <- struct{}{}:
|
||||||
wp.taskQueueLock.Lock()
|
wp.taskQueueLock.Lock()
|
||||||
heap.Push(&wp.taskQueue, wp.overflowBuffer[0])
|
heap.Push(&wp.taskQueue, task)
|
||||||
wp.overflowBuffer = wp.overflowBuffer[1:]
|
|
||||||
wp.taskQueueLock.Unlock()
|
wp.taskQueueLock.Unlock()
|
||||||
default:
|
default:
|
||||||
return
|
return
|
||||||
@@ -295,7 +304,6 @@ func (wp *Pool) drainOverflowBuffer() {
|
|||||||
func (wp *Pool) Stop() {
|
func (wp *Pool) Stop() {
|
||||||
close(wp.stop)
|
close(wp.stop)
|
||||||
wp.wg.Wait()
|
wp.wg.Wait()
|
||||||
|
|
||||||
wp.taskCompletionNotifier.Wait()
|
wp.taskCompletionNotifier.Wait()
|
||||||
if wp.completionCallback != nil {
|
if wp.completionCallback != nil {
|
||||||
wp.completionCallback()
|
wp.completionCallback()
|
||||||
|
9
queue.go
9
queue.go
@@ -43,10 +43,11 @@ func (b *Broker) NewQueue(name string) *Queue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type QueueTask struct {
|
type QueueTask struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
payload *Task
|
payload *Task
|
||||||
priority int
|
retryCount int
|
||||||
index int // The index in the heap
|
priority int
|
||||||
|
index int // The index in the heap
|
||||||
}
|
}
|
||||||
|
|
||||||
type PriorityQueue []*QueueTask
|
type PriorityQueue []*QueueTask
|
||||||
|
Reference in New Issue
Block a user