feat: Add connection

This commit is contained in:
sujit
2024-10-22 16:56:59 +05:45
parent d703f5f1d3
commit c47f8c9acc
7 changed files with 48 additions and 39 deletions

View File

@@ -22,6 +22,6 @@ func main() {
}
time.Sleep(15 * time.Second)
pool.PrintMetrics()
pool.Metrics()
pool.Stop()
}

View File

@@ -20,6 +20,6 @@ func main() {
time.Sleep(10 * time.Minute)
pool.Scheduler().RemoveTask("Every Minute Task")
time.Sleep(5 * time.Minute)
pool.PrintMetrics()
pool.Metrics()
pool.Stop()
}

View File

@@ -3,7 +3,6 @@ package tasks
import (
"context"
"github.com/oarkflow/errors"
"github.com/oarkflow/json"
"github.com/oarkflow/mq"
@@ -36,7 +35,6 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
if err != nil {
panic(err)
}
return mq.Result{Error: errors.New("Condition error")}
switch email := data["email"].(type) {
case string:
if email == "abc.xyz@gmail.com" {

View File

@@ -93,10 +93,12 @@ func Callback(_ context.Context, task mq.Result) mq.Result {
return mq.Result{}
}
func NotifyResponse(_ context.Context, result mq.Result) {
func NotifyResponse(_ context.Context, result mq.Result) error {
log.Printf("DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency)
return nil
}
func NotifySubDAGResponse(_ context.Context, result mq.Result) {
func NotifySubDAGResponse(_ context.Context, result mq.Result) error {
log.Printf("SUB DAG - FINAL_RESPONSE ~> TaskID: %s, Payload: %s, Topic: %s, Error: %v, Latency: %s", result.TaskID, result.Payload, result.Topic, result.Error, result.Latency)
return nil
}

View File

@@ -107,7 +107,7 @@ type TLSConfig struct {
type Options struct {
consumerOnSubscribe func(ctx context.Context, topic, consumerName string)
consumerOnClose func(ctx context.Context, topic, consumerName string)
notifyResponse func(context.Context, Result)
notifyResponse func(context.Context, Result) error
tlsConfig TLSConfig
brokerAddr string
callback []func(context.Context, Result) Result
@@ -175,9 +175,9 @@ func SetupOptions(opts ...Option) *Options {
return options
}
func WithNotifyResponse(handler func(ctx context.Context, result Result)) Option {
func WithNotifyResponse(callback Callback) Option {
return func(opts *Options) {
opts.notifyResponse = handler
opts.notifyResponse = callback
}
}

67
pool.go
View File

@@ -13,26 +13,35 @@ import (
type Callback func(ctx context.Context, result Result) error
type Metrics struct {
TotalTasks int64
CompletedTasks int64
ErrorCount int64
TotalMemoryUsed int64
TotalScheduled int64
}
type PoolOption struct {
}
type Pool struct {
taskStorage TaskStorage
taskQueue PriorityQueue
taskQueueLock sync.Mutex
stop chan struct{}
taskNotify chan struct{}
workerAdjust chan int
wg sync.WaitGroup
totalMemoryUsed int64
completedTasks int
errorCount, maxMemoryLoad int64
totalTasks int
numOfWorkers int32
paused bool
scheduler *Scheduler
overflowBufferLock sync.RWMutex
overflowBuffer []*QueueTask
taskAvailableCond *sync.Cond
handler Handler
callback Callback
taskStorage TaskStorage
taskQueue PriorityQueue
taskQueueLock sync.Mutex
stop chan struct{}
taskNotify chan struct{}
workerAdjust chan int
wg sync.WaitGroup
maxMemoryLoad int64
numOfWorkers int32
metrics Metrics
paused bool
scheduler *Scheduler
overflowBufferLock sync.RWMutex
overflowBuffer []*QueueTask
taskAvailableCond *sync.Cond
handler Handler
callback Callback
}
func NewPool(numOfWorkers, taskQueueSize int, maxMemoryLoad int64, handler Handler, callback Callback, storage TaskStorage) *Pool {
@@ -111,23 +120,23 @@ func (wp *Pool) processNextTask() {
func (wp *Pool) handleTask(task *QueueTask) {
taskSize := int64(utils.SizeOf(task.payload))
wp.totalMemoryUsed += taskSize
wp.totalTasks++
wp.metrics.TotalMemoryUsed += taskSize
wp.metrics.TotalTasks++
result := wp.handler(task.ctx, task.payload)
if result.Error != nil {
wp.errorCount++
wp.metrics.ErrorCount++
} else {
wp.completedTasks++
wp.metrics.CompletedTasks++
}
if wp.callback != nil {
if err := wp.callback(task.ctx, result); err != nil {
wp.errorCount++
wp.metrics.ErrorCount++
}
}
if err := wp.taskStorage.DeleteTask(task.payload.ID); err != nil {
// Handle deletion error
}
wp.totalMemoryUsed -= taskSize
wp.metrics.TotalMemoryUsed -= taskSize
}
func (wp *Pool) monitorWorkerAdjustments() {
@@ -171,7 +180,7 @@ func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) er
wp.taskQueueLock.Lock()
defer wp.taskQueueLock.Unlock()
taskSize := int64(utils.SizeOf(payload))
if wp.totalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 {
if wp.metrics.TotalMemoryUsed+taskSize > wp.maxMemoryLoad && wp.maxMemoryLoad > 0 {
return fmt.Errorf("max memory load reached, cannot add task of size %d", taskSize)
}
heap.Push(&wp.taskQueue, task)
@@ -243,9 +252,9 @@ func (wp *Pool) AdjustWorkerCount(newWorkerCount int) {
}
}
func (wp *Pool) PrintMetrics() {
fmt.Printf("Total Tasks: %d, Completed Tasks: %d, Error Count: %d, Total Memory Used: %d bytes, Total Scheduled Tasks: %d\n",
wp.totalTasks, wp.completedTasks, wp.errorCount, wp.totalMemoryUsed, len(wp.scheduler.tasks))
func (wp *Pool) Metrics() Metrics {
wp.metrics.TotalScheduled = int64(len(wp.scheduler.tasks))
return wp.metrics
}
func (wp *Pool) Scheduler() *Scheduler {

View File

@@ -15,7 +15,7 @@ func (b *Broker) SyncMode() bool {
return b.opts.syncMode
}
func (b *Broker) NotifyHandler() func(context.Context, Result) {
func (b *Broker) NotifyHandler() func(context.Context, Result) error {
return b.opts.notifyResponse
}