From a84ff6d831b2c33ff94101eb5e136fc469e690e8 Mon Sep 17 00:00:00 2001 From: sujit Date: Tue, 22 Oct 2024 10:08:18 +0545 Subject: [PATCH] feat: Add connection --- examples/tasks/operations.go | 1 - pool.go | 32 ++++++++++++++++++++------------ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index 86cd592..be5158f 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -94,7 +94,6 @@ type StoreData struct { } func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - panic("panic on store") return mq.Result{Payload: task.Payload, Ctx: ctx} } diff --git a/pool.go b/pool.go index 465c542..212fb92 100644 --- a/pool.go +++ b/pool.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/oarkflow/mq/utils" ) @@ -18,8 +19,6 @@ type Pool struct { taskQueueLock sync.Mutex stop chan struct{} taskNotify chan struct{} - handler Handler - callback Callback workerAdjust chan int wg sync.WaitGroup totalMemoryUsed int64 @@ -31,6 +30,9 @@ type Pool struct { scheduler *Scheduler overflowBufferLock sync.RWMutex overflowBuffer []*QueueTask + taskAvailableCond *sync.Cond + handler Handler + callback Callback } func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handler, callback Callback, storage TaskStorage) *Pool { @@ -45,6 +47,7 @@ func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handl workerAdjust: make(chan int), } pool.scheduler = NewScheduler(pool) + pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) // Initialize condition variable heap.Init(&pool.taskQueue) pool.scheduler.Start() pool.Start(numOfWorkers) @@ -72,11 +75,17 @@ func (wp *Pool) Start(numWorkers int) { func (wp *Pool) worker() { defer wp.wg.Done() for { + wp.taskAvailableCond.L.Lock() // Lock the condition variable mutex + for len(wp.taskQueue) == 0 && !wp.paused { // Wait if there are no tasks and not paused + wp.taskAvailableCond.Wait() + } + wp.taskAvailableCond.L.Unlock() // Unlock the condition variable mutex + select { - case <-wp.taskNotify: - wp.processNextTask() case <-wp.stop: return + default: + wp.processNextTask() // Process next task if there are any } } } @@ -167,12 +176,11 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er } heap.Push(&wp.taskQueue, task) - // Non-blocking task notification - select { - case wp.taskNotify <- struct{}{}: - default: - wp.storeInOverflow(task) - } + // Notify one worker that a task has been added + wp.taskAvailableCond.L.Lock() + wp.taskAvailableCond.Signal() + wp.taskAvailableCond.L.Unlock() + return nil } @@ -191,7 +199,7 @@ func (wp *Pool) storeInOverflow(task *QueueTask) { wp.overflowBufferLock.Unlock() } -// Drains tasks from the overflow buffer when taskNotify is not full +// Drains tasks from the overflow buffer func (wp *Pool) startOverflowDrainer() { for { wp.drainOverflowBuffer() @@ -199,7 +207,7 @@ func (wp *Pool) startOverflowDrainer() { case <-wp.stop: return default: - continue + time.Sleep(50 * time.Millisecond) } } }