feat: Add connection

This commit is contained in:
sujit
2024-10-22 10:08:18 +05:45
parent c2013827c6
commit a84ff6d831
2 changed files with 20 additions and 13 deletions

View File

@@ -94,7 +94,6 @@ type StoreData struct {
} }
func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
panic("panic on store")
return mq.Result{Payload: task.Payload, Ctx: ctx} return mq.Result{Payload: task.Payload, Ctx: ctx}
} }

32
pool.go
View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/oarkflow/mq/utils" "github.com/oarkflow/mq/utils"
) )
@@ -18,8 +19,6 @@ type Pool struct {
taskQueueLock sync.Mutex taskQueueLock sync.Mutex
stop chan struct{} stop chan struct{}
taskNotify chan struct{} taskNotify chan struct{}
handler Handler
callback Callback
workerAdjust chan int workerAdjust chan int
wg sync.WaitGroup wg sync.WaitGroup
totalMemoryUsed int64 totalMemoryUsed int64
@@ -31,6 +30,9 @@ type Pool struct {
scheduler *Scheduler scheduler *Scheduler
overflowBufferLock sync.RWMutex overflowBufferLock sync.RWMutex
overflowBuffer []*QueueTask overflowBuffer []*QueueTask
taskAvailableCond *sync.Cond
handler Handler
callback Callback
} }
func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handler, callback Callback, storage TaskStorage) *Pool { func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handler, callback Callback, storage TaskStorage) *Pool {
@@ -45,6 +47,7 @@ func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handl
workerAdjust: make(chan int), workerAdjust: make(chan int),
} }
pool.scheduler = NewScheduler(pool) pool.scheduler = NewScheduler(pool)
pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) // Initialize condition variable
heap.Init(&pool.taskQueue) heap.Init(&pool.taskQueue)
pool.scheduler.Start() pool.scheduler.Start()
pool.Start(numOfWorkers) pool.Start(numOfWorkers)
@@ -72,11 +75,17 @@ func (wp *Pool) Start(numWorkers int) {
func (wp *Pool) worker() { func (wp *Pool) worker() {
defer wp.wg.Done() defer wp.wg.Done()
for { for {
wp.taskAvailableCond.L.Lock() // Lock the condition variable mutex
for len(wp.taskQueue) == 0 && !wp.paused { // Wait if there are no tasks and not paused
wp.taskAvailableCond.Wait()
}
wp.taskAvailableCond.L.Unlock() // Unlock the condition variable mutex
select { select {
case <-wp.taskNotify:
wp.processNextTask()
case <-wp.stop: case <-wp.stop:
return return
default:
wp.processNextTask() // Process next task if there are any
} }
} }
} }
@@ -167,12 +176,11 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er
} }
heap.Push(&wp.taskQueue, task) heap.Push(&wp.taskQueue, task)
// Non-blocking task notification // Notify one worker that a task has been added
select { wp.taskAvailableCond.L.Lock()
case wp.taskNotify <- struct{}{}: wp.taskAvailableCond.Signal()
default: wp.taskAvailableCond.L.Unlock()
wp.storeInOverflow(task)
}
return nil return nil
} }
@@ -191,7 +199,7 @@ func (wp *Pool) storeInOverflow(task *QueueTask) {
wp.overflowBufferLock.Unlock() wp.overflowBufferLock.Unlock()
} }
// Drains tasks from the overflow buffer when taskNotify is not full // Drains tasks from the overflow buffer
func (wp *Pool) startOverflowDrainer() { func (wp *Pool) startOverflowDrainer() {
for { for {
wp.drainOverflowBuffer() wp.drainOverflowBuffer()
@@ -199,7 +207,7 @@ func (wp *Pool) startOverflowDrainer() {
case <-wp.stop: case <-wp.stop:
return return
default: default:
continue time.Sleep(50 * time.Millisecond)
} }
} }
} }