feat: implement Validate to check for cycle

This commit is contained in:
sujit
2024-10-23 14:11:39 +05:45
parent 9ca4e1c2f4
commit 1ec5630d3e
6 changed files with 112 additions and 57 deletions

View File

@@ -232,7 +232,14 @@ func (c *Consumer) Consume(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
c.pool = NewPool(c.opts.numOfWorkers, c.opts.queueSize, c.opts.maxMemoryLoad, c.ProcessTask, c.OnResponse, c.opts.storage) c.pool = NewPool(
c.opts.numOfWorkers,
WithTaskQueueSize(c.opts.queueSize),
WithMaxMemoryLoad(c.opts.maxMemoryLoad),
WithHandler(c.ProcessTask),
WithPoolCallback(c.OnResponse),
WithTaskStorage(c.opts.storage),
)
if err := c.subscribe(ctx, c.queue); err != nil { if err := c.subscribe(ctx, c.queue); err != nil {
return fmt.Errorf("failed to connect to server for queue %s: %v", c.queue, err) return fmt.Errorf("failed to connect to server for queue %s: %v", c.queue, err)
} }

View File

@@ -3,12 +3,13 @@ package dag
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/oarkflow/mq/sio"
"log" "log"
"net/http" "net/http"
"sync" "sync"
"time" "time"
"github.com/oarkflow/mq/sio"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
@@ -143,7 +144,14 @@ func NewDAG(name, key string, opts ...mq.Option) *DAG {
d.server = mq.NewBroker(opts...) d.server = mq.NewBroker(opts...)
d.opts = opts d.opts = opts
options := d.server.Options() options := d.server.Options()
d.pool = mq.NewPool(options.NumOfWorkers(), options.QueueSize(), options.MaxMemoryLoad(), d.ProcessTask, callback, options.Storage()) d.pool = mq.NewPool(
options.NumOfWorkers(),
mq.WithTaskQueueSize(options.QueueSize()),
mq.WithMaxMemoryLoad(options.MaxMemoryLoad()),
mq.WithHandler(d.ProcessTask),
mq.WithPoolCallback(callback),
mq.WithTaskStorage(options.Storage()),
)
d.pool.Start(d.server.Options().NumOfWorkers()) d.pool.Start(d.server.Options().NumOfWorkers())
go d.listenForTaskCleanup() go d.listenForTaskCleanup()
return d return d

View File

@@ -9,7 +9,13 @@ import (
) )
func main() { func main() {
pool := mq.NewPool(2, 5, 1000, tasks.SchedulerHandler, tasks.SchedulerCallback, mq.NewMemoryTaskStorage(10*time.Minute)) pool := mq.NewPool(2,
mq.WithTaskQueueSize(5),
mq.WithMaxMemoryLoad(1000),
mq.WithHandler(tasks.SchedulerHandler),
mq.WithPoolCallback(tasks.SchedulerCallback),
mq.WithTaskStorage(mq.NewMemoryTaskStorage(10*time.Minute)),
)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
if i%10 == 0 { if i%10 == 0 {

View File

@@ -11,7 +11,13 @@ import (
func main() { func main() {
handler := tasks.SchedulerHandler handler := tasks.SchedulerHandler
callback := tasks.SchedulerCallback callback := tasks.SchedulerCallback
pool := mq.NewPool(3, 5, 1000, handler, callback, mq.NewMemoryTaskStorage(10*time.Minute)) pool := mq.NewPool(3,
mq.WithTaskQueueSize(5),
mq.WithMaxMemoryLoad(1000),
mq.WithHandler(handler),
mq.WithPoolCallback(callback),
mq.WithTaskStorage(mq.NewMemoryTaskStorage(10*time.Minute)),
)
ctx := context.Background() ctx := context.Background()
pool.EnqueueTask(context.Background(), &mq.Task{ID: "Task 1"}, 1) pool.EnqueueTask(context.Background(), &mq.Task{ID: "Task 1"}, 1)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)

92
pool.go
View File

@@ -21,9 +21,6 @@ type Metrics struct {
TotalScheduled int64 TotalScheduled int64
} }
type PoolOption struct {
}
type Pool struct { type Pool struct {
taskStorage TaskStorage taskStorage TaskStorage
taskQueue PriorityQueue taskQueue PriorityQueue
@@ -42,21 +39,23 @@ type Pool struct {
taskAvailableCond *sync.Cond taskAvailableCond *sync.Cond
handler Handler handler Handler
callback Callback callback Callback
batchSize int
} }
func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handler, callback Callback, storage TaskStorage) *Pool { func NewPool(numOfWorkers int, opts ...PoolOption) *Pool {
pool := &Pool{ pool := &Pool{
taskQueue: make(PriorityQueue, 0, taskQueueSize), stop: make(chan struct{}),
stop: make(chan struct{}), taskNotify: make(chan struct{}, numOfWorkers),
taskNotify: make(chan struct{}, numOfWorkers), // Buffer for workers batchSize: 1,
maxMemoryLoad: maxMemoryLoad,
handler: handler,
callback: callback,
taskStorage: storage,
workerAdjust: make(chan int),
} }
pool.scheduler = NewScheduler(pool) pool.scheduler = NewScheduler(pool)
pool.taskAvailableCond = sync.NewCond(&sync.Mutex{}) // Initialize condition variable pool.taskAvailableCond = sync.NewCond(&sync.Mutex{})
for _, opt := range opts {
opt(pool)
}
if len(pool.taskQueue) == 0 {
pool.taskQueue = make(PriorityQueue, 0, 10)
}
heap.Init(&pool.taskQueue) heap.Init(&pool.taskQueue)
pool.scheduler.Start() pool.scheduler.Start()
pool.Start(numOfWorkers) pool.Start(numOfWorkers)
@@ -84,37 +83,41 @@ func (wp *Pool) Start(numWorkers int) {
func (wp *Pool) worker() { func (wp *Pool) worker() {
defer wp.wg.Done() defer wp.wg.Done()
for { for {
wp.taskAvailableCond.L.Lock() // Lock the condition variable mutex wp.taskAvailableCond.L.Lock()
for len(wp.taskQueue) == 0 && !wp.paused { // Wait if there are no tasks and not paused for len(wp.taskQueue) == 0 && !wp.paused {
wp.taskAvailableCond.Wait() wp.taskAvailableCond.Wait()
} }
wp.taskAvailableCond.L.Unlock() // Unlock the condition variable mutex wp.taskAvailableCond.L.Unlock()
select { select {
case <-wp.stop: case <-wp.stop:
return return
default: default:
wp.processNextTask() // Process next task if there are any wp.processNextBatch()
} }
} }
} }
func (wp *Pool) processNextTask() { func (wp *Pool) processNextBatch() {
wp.taskQueueLock.Lock() wp.taskQueueLock.Lock()
var task *QueueTask defer wp.taskQueueLock.Unlock()
if len(wp.taskQueue) > 0 && !wp.paused { tasks := make([]*QueueTask, 0, wp.batchSize)
task = heap.Pop(&wp.taskQueue).(*QueueTask) for len(wp.taskQueue) > 0 && !wp.paused && len(tasks) < wp.batchSize {
task := heap.Pop(&wp.taskQueue).(*QueueTask)
tasks = append(tasks, task)
} }
wp.taskQueueLock.Unlock() if len(tasks) == 0 && !wp.paused {
if task == nil && !wp.paused { for len(tasks) < wp.batchSize {
var err error task, err := wp.taskStorage.FetchNextTask()
task, err = wp.taskStorage.FetchNextTask() if err != nil {
if err != nil { break
return }
tasks = append(tasks, task)
} }
} }
if task != nil { for _, task := range tasks {
wp.handleTask(task) if task != nil {
wp.handleTask(task)
}
} }
} }
@@ -133,9 +136,7 @@ func (wp *Pool) handleTask(task *QueueTask) {
wp.metrics.ErrorCount++ wp.metrics.ErrorCount++
} }
} }
if err := wp.taskStorage.DeleteTask(task.payload.ID); err != nil { _ = wp.taskStorage.DeleteTask(task.payload.ID)
// Handle deletion error
}
wp.metrics.TotalMemoryUsed -= taskSize wp.metrics.TotalMemoryUsed -= taskSize
} }
@@ -181,34 +182,26 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er
defer wp.taskQueueLock.Unlock() defer wp.taskQueueLock.Unlock()
taskSize := int64(utils.SizeOf(payload)) taskSize := int64(utils.SizeOf(payload))
if wp.metrics.TotalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 { if wp.metrics.TotalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 {
return fmt.Errorf("max memory load reached, cannot add task of size %d", taskSize) wp.storeInOverflow(task)
return fmt.Errorf("max memory load reached, task stored in overflow buffer of size %d", taskSize)
} }
heap.Push(&wp.taskQueue, task) heap.Push(&wp.taskQueue, task)
// Notify one worker that a task has been added
wp.taskAvailableCond.L.Lock() wp.taskAvailableCond.L.Lock()
wp.taskAvailableCond.Signal() wp.taskAvailableCond.Signal()
wp.taskAvailableCond.L.Unlock() wp.taskAvailableCond.L.Unlock()
return nil return nil
} }
func (wp *Pool) Pause() { func (wp *Pool) Pause() { wp.paused = true }
wp.paused = true
}
func (wp *Pool) Resume() { func (wp *Pool) Resume() { wp.paused = false }
wp.paused = false
}
// Overflow Handling
func (wp *Pool) storeInOverflow(task *QueueTask) { func (wp *Pool) storeInOverflow(task *QueueTask) {
wp.overflowBufferLock.Lock() wp.overflowBufferLock.Lock()
wp.overflowBuffer = append(wp.overflowBuffer, task) wp.overflowBuffer = append(wp.overflowBuffer, task)
wp.overflowBufferLock.Unlock() wp.overflowBufferLock.Unlock()
} }
// Drains tasks from the overflow buffer
func (wp *Pool) startOverflowDrainer() { func (wp *Pool) startOverflowDrainer() {
for { for {
wp.drainOverflowBuffer() wp.drainOverflowBuffer()
@@ -216,7 +209,7 @@ func (wp *Pool) startOverflowDrainer() {
case <-wp.stop: case <-wp.stop:
return return
default: default:
time.Sleep(50 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
} }
} }
@@ -224,17 +217,14 @@ func (wp *Pool) startOverflowDrainer() {
func (wp *Pool) drainOverflowBuffer() { func (wp *Pool) drainOverflowBuffer() {
wp.overflowBufferLock.Lock() wp.overflowBufferLock.Lock()
defer wp.overflowBufferLock.Unlock() defer wp.overflowBufferLock.Unlock()
for len(wp.overflowBuffer) > 0 { for len(wp.overflowBuffer) > 0 {
select { select {
case wp.taskNotify <- struct{}{}: case wp.taskNotify <- struct{}{}:
// Move the first task from the overflow buffer to the queue
wp.taskQueueLock.Lock() wp.taskQueueLock.Lock()
heap.Push(&wp.taskQueue, wp.overflowBuffer[0]) heap.Push(&wp.taskQueue, wp.overflowBuffer[0])
wp.overflowBuffer = wp.overflowBuffer[1:] wp.overflowBuffer = wp.overflowBuffer[1:]
wp.taskQueueLock.Unlock() wp.taskQueueLock.Unlock()
default: default:
// Stop if taskNotify is full
return return
} }
} }
@@ -257,6 +247,4 @@ func (wp *Pool) Metrics() Metrics {
return wp.metrics return wp.metrics
} }
func (wp *Pool) Scheduler() *Scheduler { func (wp *Pool) Scheduler() *Scheduler { return wp.scheduler }
return wp.scheduler
}

40
pool_options.go Normal file
View File

@@ -0,0 +1,40 @@
package mq
type PoolOption func(*Pool)
func WithTaskQueueSize(size int) PoolOption {
return func(p *Pool) {
// Initialize the task queue with the specified size
p.taskQueue = make(PriorityQueue, 0, size)
}
}
func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption {
return func(p *Pool) {
p.maxMemoryLoad = maxMemoryLoad
}
}
func WithBatchSize(batchSize int) PoolOption {
return func(p *Pool) {
p.batchSize = batchSize
}
}
func WithHandler(handler Handler) PoolOption {
return func(p *Pool) {
p.handler = handler
}
}
func WithPoolCallback(callback Callback) PoolOption {
return func(p *Pool) {
p.callback = callback
}
}
func WithTaskStorage(storage TaskStorage) PoolOption {
return func(p *Pool) {
p.taskStorage = storage
}
}