From c47f8c9acc69d5157bd88ccd8e287eef824d0b5f Mon Sep 17 00:00:00 2001 From: sujit Date: Tue, 22 Oct 2024 16:56:59 +0545 Subject: [PATCH] feat: Add connection --- examples/priority.go | 2 +- examples/scheduler.go | 2 +- examples/tasks/operations.go | 2 -- examples/tasks/tasks.go | 6 ++-- options.go | 6 ++-- pool.go | 67 ++++++++++++++++++++---------------- util.go | 2 +- 7 files changed, 48 insertions(+), 39 deletions(-) diff --git a/examples/priority.go b/examples/priority.go index 7d21558..1140531 100644 --- a/examples/priority.go +++ b/examples/priority.go @@ -22,6 +22,6 @@ func main() { } time.Sleep(15 * time.Second) - pool.PrintMetrics() + pool.Metrics() pool.Stop() } diff --git a/examples/scheduler.go b/examples/scheduler.go index f854286..f2c8e11 100644 --- a/examples/scheduler.go +++ b/examples/scheduler.go @@ -20,6 +20,6 @@ func main() { time.Sleep(10 * time.Minute) pool.Scheduler().RemoveTask("Every Minute Task") time.Sleep(5 * time.Minute) - pool.PrintMetrics() + pool.Metrics() pool.Stop() } diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index 898a3d3..df44834 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -3,7 +3,6 @@ package tasks import ( "context" - "github.com/oarkflow/errors" "github.com/oarkflow/json" "github.com/oarkflow/mq" @@ -36,7 +35,6 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { if err != nil { panic(err) } - return mq.Result{Error: errors.New("Condition error")} switch email := data["email"].(type) { case string: if email == "abc.xyz@gmail.com" { diff --git a/examples/tasks/tasks.go b/examples/tasks/tasks.go index a001504..52c9879 100644 --- a/examples/tasks/tasks.go +++ b/examples/tasks/tasks.go @@ -93,10 +93,12 @@ func Callback(_ context.Context, task mq.Result) mq.Result { return mq.Result{} } -func NotifyResponse(_ context.Context, result mq.Result) { +func NotifyResponse(_ context.Context, result mq.Result) error { log.Printf("DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency) + return nil } -func NotifySubDAGResponse(_ context.Context, result mq.Result) { +func NotifySubDAGResponse(_ context.Context, result mq.Result) error { log.Printf("SUB DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency) + return nil } diff --git a/options.go b/options.go index b683715..ee59b24 100644 --- a/options.go +++ b/options.go @@ -107,7 +107,7 @@ type TLSConfig struct { type Options struct { consumerOnSubscribe func(ctx context.Context, topic, consumerName string) consumerOnClose func(ctx context.Context, topic, consumerName string) - notifyResponse func(context.Context, Result) + notifyResponse func(context.Context, Result) error tlsConfig TLSConfig brokerAddr string callback []func(context.Context, Result) Result @@ -175,9 +175,9 @@ func SetupOptions(opts ...Option) *Options { return options } -func WithNotifyResponse(handler func(ctx context.Context, result Result)) Option { +func WithNotifyResponse(callback Callback) Option { return func(opts *Options) { - opts.notifyResponse = handler + opts.notifyResponse = callback } } diff --git a/pool.go b/pool.go index 212fb92..67c77a3 100644 --- a/pool.go +++ b/pool.go @@ -13,26 +13,35 @@ import ( type Callback func(ctx context.Context, result Result) error +type Metrics struct { + TotalTasks int64 + CompletedTasks int64 + ErrorCount int64 + TotalMemoryUsed int64 + TotalScheduled int64 +} + +type PoolOption struct { +} + type Pool struct { - taskStorage TaskStorage - taskQueue PriorityQueue - taskQueueLock sync.Mutex - stop chan struct{} - taskNotify chan struct{} - workerAdjust chan int - wg sync.WaitGroup - totalMemoryUsed int64 - completedTasks int - errorCount, maxMemoryLoad int64 - totalTasks int - numOfWorkers int32 - paused bool - scheduler *Scheduler - overflowBufferLock sync.RWMutex - overflowBuffer []*QueueTask - taskAvailableCond *sync.Cond - handler Handler - callback Callback + taskStorage TaskStorage + taskQueue PriorityQueue + taskQueueLock sync.Mutex + stop chan struct{} + taskNotify chan struct{} + workerAdjust chan int + wg sync.WaitGroup + maxMemoryLoad int64 + numOfWorkers int32 + metrics Metrics + paused bool + scheduler *Scheduler + overflowBufferLock sync.RWMutex + overflowBuffer []*QueueTask + taskAvailableCond *sync.Cond + handler Handler + callback Callback } func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handler, callback Callback, storage TaskStorage) *Pool { @@ -111,23 +120,23 @@ func (wp *Pool) processNextTask() { func (wp *Pool) handleTask(task *QueueTask) { taskSize := int64(utils.SizeOf(task.payload)) - wp.totalMemoryUsed += taskSize - wp.totalTasks++ + wp.metrics.TotalMemoryUsed += taskSize + wp.metrics.TotalTasks++ result := wp.handler(task.ctx, task.payload) if result.Error != nil { - wp.errorCount++ + wp.metrics.ErrorCount++ } else { - wp.completedTasks++ + wp.metrics.CompletedTasks++ } if wp.callback != nil { if err := wp.callback(task.ctx, result); err != nil { - wp.errorCount++ + wp.metrics.ErrorCount++ } } if err := wp.taskStorage.DeleteTask(task.payload.ID); err != nil { // Handle deletion error } - wp.totalMemoryUsed -= taskSize + wp.metrics.TotalMemoryUsed -= taskSize } func (wp *Pool) monitorWorkerAdjustments() { @@ -171,7 +180,7 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er wp.taskQueueLock.Lock() defer wp.taskQueueLock.Unlock() taskSize := int64(utils.SizeOf(payload)) - if wp.totalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 { + if wp.metrics.TotalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 { return fmt.Errorf("max memory load reached, cannot add task of size %d", taskSize) } heap.Push(&wp.taskQueue, task) @@ -243,9 +252,9 @@ func (wp *Pool) AdjustWorkerCount(newWorkerCount int) { } } -func (wp *Pool) PrintMetrics() { - fmt.Printf("Total Tasks: %d, Completed Tasks: %d, Error Count: %d, Total Memory Used: %d bytes, Total Scheduled Tasks: %d\n", - wp.totalTasks, wp.completedTasks, wp.errorCount, wp.totalMemoryUsed, len(wp.scheduler.tasks)) +func (wp *Pool) Metrics() Metrics { + wp.metrics.TotalScheduled = int64(len(wp.scheduler.tasks)) + return wp.metrics } func (wp *Pool) Scheduler() *Scheduler { diff --git a/util.go b/util.go index c2e9fd2..1f393bf 100644 --- a/util.go +++ b/util.go @@ -15,7 +15,7 @@ func (b *Broker) SyncMode() bool { return b.opts.syncMode } -func (b *Broker) NotifyHandler() func(context.Context, Result) { +func (b *Broker) NotifyHandler() func(context.Context, Result) error { return b.opts.notifyResponse }