From 28870f162961288214e7079e354a7cdec0f17b19 Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 5 May 2025 09:07:01 +0545 Subject: [PATCH] feat: use "GetTags" and "SetTags" --- examples/dedup.go | 58 +++++++++++++++++++++++++ scheduler.go | 108 +++++++++++++++++++++++++++------------------- storage.go | 20 +++++++++ task.go | 9 ++++ 4 files changed, 150 insertions(+), 45 deletions(-) create mode 100644 examples/dedup.go diff --git a/examples/dedup.go b/examples/dedup.go new file mode 100644 index 0000000..ca04122 --- /dev/null +++ b/examples/dedup.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/oarkflow/json" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" +) + +func main() { + handler := tasks.SchedulerHandler + callback := tasks.SchedulerCallback + + // Initialize the pool with various parameters. + pool := mq.NewPool(3, + mq.WithTaskQueueSize(5), + mq.WithMaxMemoryLoad(1000), + mq.WithHandler(handler), + mq.WithPoolCallback(callback), + mq.WithTaskStorage(mq.NewMemoryTaskStorage(10*time.Minute)), + ) + scheduler := mq.NewScheduler(pool) + scheduler.Start() + ctx := context.Background() + + // Example: Schedule an email task with deduplication. + // DedupKey here is set to the recipient's email (e.g., "user@example.com") to avoid duplicate email tasks. + emailPayload := json.RawMessage(`{"email": "user@example.com", "message": "Hello, Customer!"}`) + scheduler.AddTask(ctx, mq.NewTask("Email Task", emailPayload, "email", + mq.WithDedupKey("user@example.com"), + ), + mq.WithScheduleSpec("@every 1m"), // runs every minute for demonstration + mq.WithRecurring(), + ) + + scheduler.AddTask(ctx, mq.NewTask("Duplicate Email Task", emailPayload, "email", + mq.WithDedupKey("user@example.com"), + ), + mq.WithScheduleSpec("@every 1m"), + mq.WithRecurring(), + ) + + go func() { + for { + for _, task := range scheduler.ListScheduledTasks() { + fmt.Println("Scheduled.....", task) + } + time.Sleep(1 * time.Minute) + } + }() + + time.Sleep(10 * time.Minute) + +} diff --git a/scheduler.go b/scheduler.go index a3be347..2e68854 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,6 +10,7 @@ import ( "time" "github.com/oarkflow/log" + "github.com/oarkflow/xid" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" @@ -351,8 +352,8 @@ type ScheduledTask struct { schedule *Schedule stop chan struct{} executionHistory []ExecutionHistory - // running is used to indicate whether the task is currently executing (if !Overlap). - running int32 + running int32 + id string } type SchedulerConfig struct { @@ -368,8 +369,6 @@ type ExecutionHistory struct { // Scheduler manages scheduling and executing tasks. type Scheduler struct { pool *Pool - tasks []*ScheduledTask - mu sync.Mutex storage storage.IMap[string, *ScheduledTask] // added storage field } @@ -397,11 +396,10 @@ func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler { // Start begins executing scheduled tasks. func (s *Scheduler) Start() { - s.mu.Lock() - defer s.mu.Unlock() - for _, task := range s.tasks { + s.storage.ForEach(func(_ string, task *ScheduledTask) bool { go s.schedule(task) - } + return true + }) } func (s *Scheduler) Close() error { @@ -552,9 +550,20 @@ func (s *Scheduler) executeTask(task *ScheduledTask) { } // AddTask adds a new scheduled task using the supplied context, payload, and options. -func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) { - s.mu.Lock() - defer s.mu.Unlock() +func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string { + var hasDuplicate bool + if payload.DedupKey != "" { + s.storage.ForEach(func(_ string, task *ScheduledTask) bool { + if task.payload.DedupKey == payload.DedupKey { + hasDuplicate = true + Logger.Warn().Str("dedup", payload.DedupKey).Msg("Duplicate scheduled task prevented") + } + return true + }) + } + if hasDuplicate { + return "" + } options := defaultSchedulerOptions() for _, opt := range opts { opt(options) @@ -581,6 +590,7 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule stop := make(chan struct{}) newTask := &ScheduledTask{ + id: xid.New().String(), ctx: ctx, handler: options.Handler, payload: payload, @@ -591,55 +601,43 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule }, schedule: sched, } - s.tasks = append(s.tasks, newTask) - // Persist the task in storage if available. - if s.storage != nil { - s.storage.Set(newTask.payload.ID, newTask) - } + s.storage.Set(newTask.id, newTask) go s.schedule(newTask) + return newTask.id } // RemoveTask stops and removes a task by its payload ID. -func (s *Scheduler) RemoveTask(payloadID string) { - s.mu.Lock() - defer s.mu.Unlock() - for i, task := range s.tasks { - if task.payload.ID == payloadID { - close(task.stop) - s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) - // Remove task from storage if available. - if s.storage != nil { - s.storage.Del(payloadID) - } - break - } +func (s *Scheduler) RemoveTask(id string) error { + task, ok := s.storage.Get(id) + if !ok { + return fmt.Errorf("No task found with ID: %s\n", id) } + close(task.stop) + if s.storage != nil { + s.storage.Del(id) + } + return nil } // PrintAllTasks prints a summary of all scheduled tasks. func (s *Scheduler) PrintAllTasks() { - s.mu.Lock() - defer s.mu.Unlock() fmt.Println("Scheduled Tasks:") - for _, task := range s.tasks { + s.storage.ForEach(func(_ string, task *ScheduledTask) bool { fmt.Printf("Task ID: %s, Next Execution: %s\n", task.payload.ID, task.getNextRunTime(time.Now())) - } + return true + }) } // PrintExecutionHistory prints the execution history for a task by its ID. -func (s *Scheduler) PrintExecutionHistory(taskID string) { - s.mu.Lock() - defer s.mu.Unlock() - for _, task := range s.tasks { - if task.payload.ID == taskID { - fmt.Printf("Execution History for Task ID: %s\n", taskID) - for _, history := range task.executionHistory { - fmt.Printf("Timestamp: %s, Result: %v\n", history.Timestamp, history.Result.Error) - } - return - } +func (s *Scheduler) PrintExecutionHistory(id string) error { + task, ok := s.storage.Get(id) + if !ok { + return fmt.Errorf("No task found with ID: %s\n", id) } - fmt.Printf("No task found with ID: %s\n", taskID) + for _, history := range task.executionHistory { + fmt.Printf("Timestamp: %s, Result: %v\n", history.Timestamp, history.Result.Error) + } + return nil } // getNextRunTime computes the next run time for the task. @@ -667,6 +665,26 @@ func (task *ScheduledTask) getNextRunTime(now time.Time) time.Time { return now } +// New type to hold scheduled task information. +type TaskInfo struct { + TaskID string `json:"task_id"` + NextRunTime time.Time `json:"next_run_time"` +} + +// ListScheduledTasks returns details of all scheduled tasks along with their next run time. +func (s *Scheduler) ListScheduledTasks() []TaskInfo { + now := time.Now() + infos := make([]TaskInfo, 0, s.storage.Size()) + s.storage.ForEach(func(_ string, task *ScheduledTask) bool { + infos = append(infos, TaskInfo{ + TaskID: task.payload.ID, + NextRunTime: task.getNextRunTime(now), + }) + return true + }) + return infos +} + // -------------------------------------------------------- // Additional Helper Functions and Stubs // -------------------------------------------------------- diff --git a/storage.go b/storage.go index 0430a22..693f0da 100644 --- a/storage.go +++ b/storage.go @@ -20,18 +20,29 @@ type MemoryTaskStorage struct { tasks PriorityQueue taskLock sync.Mutex expiryTime time.Duration + dedup map[string]*QueueTask } func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage { return &MemoryTaskStorage{ tasks: make(PriorityQueue, 0), expiryTime: expiryTime, + dedup: make(map[string]*QueueTask), } } func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error { m.taskLock.Lock() defer m.taskLock.Unlock() + if key := task.payload.DedupKey; key != "" { + if existing, ok := m.dedup[key]; ok { + if existing.payload.CreatedAt.Add(m.expiryTime).After(time.Now()) { + return fmt.Errorf("duplicate task with dedup key: %s", key) + } + delete(m.dedup, key) + } + m.dedup[key] = task + } heap.Push(&m.tasks, task) return nil } @@ -52,6 +63,9 @@ func (m *MemoryTaskStorage) DeleteTask(taskID string) error { defer m.taskLock.Unlock() for i, task := range m.tasks { if task.payload.ID == taskID { + if key := task.payload.DedupKey; key != "" { + delete(m.dedup, key) + } heap.Remove(&m.tasks, i) return nil } @@ -81,6 +95,9 @@ func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error) { m.DeleteTask(task.payload.ID) return m.FetchNextTask() } + if key := task.payload.DedupKey; key != "" { + delete(m.dedup, key) + } return task, nil } @@ -91,6 +108,9 @@ func (m *MemoryTaskStorage) CleanupExpiredTasks() error { for i := 0; i < len(m.tasks); i++ { task := m.tasks[i] if task.payload.CreatedAt.Add(m.expiryTime).Before(time.Now()) { + if key := task.payload.DedupKey; key != "" { + delete(m.dedup, key) + } heap.Remove(&m.tasks, i) i-- // Adjust index after removal } diff --git a/task.go b/task.go index e581dc7..e01e36e 100644 --- a/task.go +++ b/task.go @@ -68,6 +68,8 @@ type Task struct { Status string `json:"status"` Payload json.RawMessage `json:"payload"` dag any + // new deduplication field + DedupKey string `json:"dedup_key,omitempty"` } func (t *Task) GetFlow() any { @@ -84,3 +86,10 @@ func NewTask(id string, payload json.RawMessage, nodeKey string, opts ...TaskOpt } return task } + +// new TaskOption for deduplication: +func WithDedupKey(key string) TaskOption { + return func(t *Task) { + t.DedupKey = key + } +}