From b2085d72e162dba99c8cc8b7f0de518cf095d459 Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 5 May 2025 10:04:29 +0545 Subject: [PATCH] feat: use "GetTags" and "SetTags" --- scheduler.go | 370 ++++++++++++++++++++++++++++----------------------- 1 file changed, 206 insertions(+), 164 deletions(-) diff --git a/scheduler.go b/scheduler.go index 1a5a0f4..42bc568 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,7 +2,9 @@ package mq import ( "context" + "expvar" "fmt" + "math" "strconv" "strings" "sync" @@ -17,6 +19,9 @@ import ( ) var Logger = log.DefaultLogger +var totalTasks = expvar.NewInt("totalTasks") +var failedTasks = expvar.NewInt("failedTasks") +var execTimeExp = expvar.NewInt("executionTimeMs") type ScheduleOptions struct { Handler Handler @@ -29,49 +34,42 @@ type ScheduleOptions struct { type SchedulerOption func(*ScheduleOptions) -// WithSchedulerHandler sets the handler. func WithSchedulerHandler(handler Handler) SchedulerOption { return func(opts *ScheduleOptions) { opts.Handler = handler } } -// WithSchedulerCallback sets the callback. func WithSchedulerCallback(callback Callback) SchedulerOption { return func(opts *ScheduleOptions) { opts.Callback = callback } } -// WithOverlap indicates that overlapping executions are allowed. func WithOverlap() SchedulerOption { return func(opts *ScheduleOptions) { opts.Overlap = true } } -// WithInterval sets a fixed interval. func WithInterval(interval time.Duration) SchedulerOption { return func(opts *ScheduleOptions) { opts.Interval = interval } } -// WithRecurring indicates that the task should be rescheduled after execution. func WithRecurring() SchedulerOption { return func(opts *ScheduleOptions) { opts.Recurring = true } } -// WithScheduleSpec provides a schedule string (e.g., cron expression, @daily, @every 1h30m, etc.) func WithScheduleSpec(spec string) SchedulerOption { return func(opts *ScheduleOptions) { opts.ScheduleSpec = spec } } -// defaultSchedulerOptions returns the default scheduling options. func defaultSchedulerOptions() *ScheduleOptions { return &ScheduleOptions{ Interval: time.Minute, @@ -79,24 +77,15 @@ func defaultSchedulerOptions() *ScheduleOptions { } } -// -------------------------------------------------------- -// Schedule and Cron Structures -// -------------------------------------------------------- - -// Schedule holds a schedule. It may be defined via: -// - A fixed time-interval (Interval) -// - A cron spec (CronSpec) string (which may be 5 or 6 fields) -// - A specific time of day, specific days of week or month. type Schedule struct { - TimeOfDay time.Time // Optional: time of day for one-off daily recurrence. - CronSpec string // For cron-based scheduling. - DayOfWeek []time.Weekday // Optional: days of the week. - DayOfMonth []int // Optional: days of the month. - Interval time.Duration // For duration-based scheduling (e.g. @every). - Recurring bool // Indicates if schedule recurs. + TimeOfDay time.Time + CronSpec string + DayOfWeek []time.Weekday + DayOfMonth []int + Interval time.Duration + Recurring bool } -// ToHumanReadable returns a human‑readable description of the schedule. func (s *Schedule) ToHumanReadable() string { var sb strings.Builder if s.CronSpec != "" { @@ -143,9 +132,8 @@ func (s *Schedule) ToHumanReadable() string { return sb.String() } -// CronSchedule represents a parsed cron expression. It supports both 5‑field and 6‑field (extended) formats. type CronSchedule struct { - Seconds string // Optional; if empty, assume "0" + Seconds string Minute string Hour string DayOfMonth string @@ -153,34 +141,21 @@ type CronSchedule struct { DayOfWeek string } -// String returns a summary string for the cron schedule. func (c CronSchedule) String() string { if c.Seconds != "" && c.Seconds != "0" { - return fmt.Sprintf("At %s seconds, %s minutes past %s, on %s, during %s, every %s", - c.Seconds, c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth) + return fmt.Sprintf("At %s seconds, %s minutes past %s, on %s, during %s, every %s", c.Seconds, c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth) } - return fmt.Sprintf("At %s minutes past %s, on %s, during %s, every %s", - c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth) + return fmt.Sprintf("At %s minutes past %s, on %s, during %s, every %s", c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth) } -// -------------------------------------------------------- -// Parsing: Special Schedule Strings and Cron Specs -// -------------------------------------------------------- - -// parseScheduleSpec inspects a schedule spec string and returns a Schedule. -// The spec may be a special keyword (starting with '@') or a standard cron expression. func parseScheduleSpec(spec string) (*Schedule, error) { - s := &Schedule{Recurring: true} // default recurring + s := &Schedule{Recurring: true} if strings.HasPrefix(spec, "@") { - // Handle special cases. switch { case strings.HasPrefix(spec, "@every"): - // Format: "@every ", use time.ParseDuration. durationStr := strings.TrimSpace(strings.TrimPrefix(spec, "@every")) d, err := time.ParseDuration(durationStr) if err != nil { - // If duration parsing fails, try to support days or weeks. - // For example: "1d", "1w". if strings.HasSuffix(durationStr, "d") || strings.HasSuffix(durationStr, "w") { numStr := durationStr[:len(durationStr)-1] num, err2 := strconv.Atoi(numStr) @@ -211,20 +186,17 @@ func parseScheduleSpec(spec string) (*Schedule, error) { s.CronSpec = "0 0 1 1 *" return s, nil case spec == "@reboot": - // For @reboot, you might want to run the task once at startup. s.Recurring = false return s, nil default: return nil, fmt.Errorf("unknown special schedule: %s", spec) } } else { - // Assume a standard cron spec s.CronSpec = spec return s, nil } } -// parseCronSpecDescription parses a cron spec and returns a human‑readable description. func parseCronSpecDescription(cronSpec string) (string, error) { cs, err := parseCronSpec(cronSpec) if err != nil { @@ -233,46 +205,52 @@ func parseCronSpecDescription(cronSpec string) (string, error) { return cs.String(), nil } -// parseCronSpec parses a cron specification string, supporting either 5 fields or 6 (with seconds). func parseCronSpec(spec string) (CronSchedule, error) { parts := strings.Fields(spec) if len(parts) == 5 { - // Assume no seconds provided; use default "0" return CronSchedule{ Seconds: "0", Minute: parts[0], Hour: parts[1], DayOfMonth: parts[2], Month: parts[3], - DayOfWeek: parts[4], + DayOfWeek: parseCronFieldNames(parts[4], "dow"), }, nil } else if len(parts) == 6 { - // Extended spec with seconds. return CronSchedule{ Seconds: parts[0], Minute: parts[1], Hour: parts[2], DayOfMonth: parts[3], Month: parts[4], - DayOfWeek: parts[5], + DayOfWeek: parseCronFieldNames(parts[5], "dow"), }, nil } return CronSchedule{}, fmt.Errorf("invalid CRON spec: expected 5 or 6 fields, got %d", len(parts)) } -// -------------------------------------------------------- -// Helper: Checking if a time matches a cron field value. -// (Supports "*" and comma separated list of integers.) -// -// For simplicity, we assume all fields in the cron spec represent numeric values. +func parseCronFieldNames(field string, fieldType string) string { + lower := strings.ToLower(field) + if fieldType == "dow" { + mapping := map[string]string{"sun": "0", "mon": "1", "tue": "2", "wed": "3", "thu": "4", "fri": "5", "sat": "6"} + parts := strings.Split(lower, ",") + for i, p := range parts { + p = strings.TrimSpace(p) + if val, ok := mapping[p]; ok { + parts[i] = val + } + } + return strings.Join(parts, ",") + } + return field +} + func matchesCronField(val int, field string) bool { if field == "*" { return true } - // Support lists separated by commas. parts := strings.Split(field, ",") for _, p := range parts { - // Trim any potential spaces. p = strings.TrimSpace(p) ival, err := strconv.Atoi(p) if err == nil && ival == val { @@ -282,16 +260,13 @@ func matchesCronField(val int, field string) bool { return false } -// checkTimeMatchesCron tests whether a given time t satisfies the cron expression. func checkTimeMatchesCron(t time.Time, cs CronSchedule) bool { - // Check seconds, minutes, hour, day, month, weekday. sec := t.Second() minute := t.Minute() hour := t.Hour() day := t.Day() month := int(t.Month()) - weekday := int(t.Weekday()) // Sunday==0, match cron where Sunday==0 - + weekday := int(t.Weekday()) if !matchesCronField(sec, cs.Seconds) { return false } @@ -313,37 +288,25 @@ func checkTimeMatchesCron(t time.Time, cs CronSchedule) bool { return true } -// nextCronRunTime computes the next time after now that matches the cron spec. -// For simplicity, it iterates minute by minute (or second by second if extended). func nextCronRunTime(now time.Time, cs CronSchedule) time.Time { - // We'll search up to a year ahead. searchLimit := now.AddDate(1, 0, 0) - t := now.Add(time.Second) // start a second later + t := now.Add(time.Second) for !t.After(now) { t = t.Add(time.Second) } - // If seconds field is in use (not just "0"), iterate second-by-second. for t.Before(searchLimit) { if checkTimeMatchesCron(t, cs) { return t } - // Increment by one second if seconds precision is needed; - // otherwise, for minute-level precision, you can use t = t.Add(time.Minute) if cs.Seconds != "0" && cs.Seconds != "*" { t = t.Add(time.Second) } else { t = t.Add(time.Minute) } } - // Fallback: return the search limit if no matching time is found. return searchLimit } -// -------------------------------------------------------- -// Scheduled Task and Scheduler Structures -// -------------------------------------------------------- - -// ScheduledTask represents a scheduled job. type ScheduledTask struct { ctx context.Context handler Handler @@ -357,8 +320,9 @@ type ScheduledTask struct { } type SchedulerConfig struct { - Callback Callback - Overlap bool + Callback Callback + Overlap bool + MaxRetries int } type ExecutionHistory struct { @@ -366,23 +330,20 @@ type ExecutionHistory struct { Result Result } -// Scheduler manages scheduling and executing tasks. type Scheduler struct { pool *Pool - storage storage.IMap[string, *ScheduledTask] // added storage field + storage storage.IMap[string, *ScheduledTask] + wg sync.WaitGroup } -// SchedulerOpt functional option type for Scheduler. type SchedulerOpt func(*Scheduler) -// WithStorage sets the storage for ScheduledTasks. func WithStorage(sm storage.IMap[string, *ScheduledTask]) SchedulerOpt { return func(s *Scheduler) { s.storage = sm } } -// NewScheduler constructor to use SchedulerOpt. func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler { s := &Scheduler{ pool: pool, @@ -394,9 +355,9 @@ func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler { return s } -// Start begins executing scheduled tasks. func (s *Scheduler) Start() { s.storage.ForEach(func(_ string, task *ScheduledTask) bool { + s.wg.Add(1) go s.schedule(task) return true }) @@ -404,16 +365,14 @@ func (s *Scheduler) Start() { func (s *Scheduler) Close() error { s.pool.Stop() + s.wg.Wait() return nil } -// schedule dispatches task execution based on its schedule. func (s *Scheduler) schedule(task *ScheduledTask) { - // Use the task context for cancellation. + defer s.wg.Done() ctx := task.ctx - // Main scheduling loop. if task.schedule.Interval > 0 { - // Duration-based scheduling (@every). ticker := time.NewTicker(task.schedule.Interval) defer ticker.Stop() if task.schedule.Recurring { @@ -444,7 +403,6 @@ func (s *Scheduler) schedule(task *ScheduledTask) { } } } else if task.schedule.CronSpec != "" { - // Cron-based scheduling. cs, err := parseCronSpec(task.schedule.CronSpec) if err != nil { Logger.Error().Err(err).Msg("Invalid CRON spec") @@ -466,11 +424,9 @@ func (s *Scheduler) schedule(task *ScheduledTask) { } } } else if !task.schedule.TimeOfDay.IsZero() { - // A one-off daily time-of-day scheduling. for { now := time.Now() - nextRun := time.Date(now.Year(), now.Month(), now.Day(), - task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location()) + nextRun := time.Date(now.Year(), now.Month(), now.Day(), task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location()) if !nextRun.After(now) { nextRun = nextRun.AddDate(0, 0, 1) } @@ -492,35 +448,48 @@ func (s *Scheduler) schedule(task *ScheduledTask) { } } -// executeTask runs the task. It checks the overlap setting and uses context cancellation. func (s *Scheduler) executeTask(task *ScheduledTask) { - // If overlapping executions are not allowed, use an atomic flag. if !task.config.Overlap { if !atomic.CompareAndSwapInt32(&task.running, 0, 1) { Logger.Warn().Str("taskID", task.payload.ID).Msg("Skipping execution due to overlap configuration") return } } + s.wg.Add(1) go func() { + defer s.wg.Done() _, cancelSpan := startSpan("executeTask") defer cancelSpan() defer RecoverPanic(RecoverTitle) - start := time.Now() - for _, plug := range s.pool.plugins { - plug.BeforeTask(getQueueTask()) - } - if !acquireDistributedLock(task.payload.ID) { - Logger.Warn().Str("taskID", task.payload.ID).Msg("Failed to acquire distributed lock") - if !task.config.Overlap { - atomic.StoreInt32(&task.running, 0) + var attempt int + var result Result + var delay time.Duration = 100 * time.Millisecond + for { + start := time.Now() + asyncPluginsBefore(s.pool.plugins) + if !acquireLockWithRetry(task.payload.ID, 3) { + Logger.Warn().Str("taskID", task.payload.ID).Msg("Failed to acquire distributed lock after retries") + if !task.config.Overlap { + atomic.StoreInt32(&task.running, 0) + } + return } - return + result = task.handler(task.ctx, task.payload) + releaseDistributedLock(task.payload.ID) + elapsed := time.Since(start).Milliseconds() + execTimeExp.Add(elapsed) + if s.pool.diagnosticsEnabled { + Logger.Info().Str("taskID", task.payload.ID).Msgf("Executed in %d ms", elapsed) + } + if result.Error == nil || attempt >= task.config.MaxRetries { + break + } + attempt++ + time.Sleep(delay) + delay = time.Duration(math.Min(float64(delay*2), float64(5*time.Second))) } - defer releaseDistributedLock(task.payload.ID) - result := task.handler(task.ctx, task.payload) - execTime := time.Since(start).Milliseconds() - if s.pool.diagnosticsEnabled { - Logger.Info().Str("taskID", task.payload.ID).Msgf("Executed in %d ms", execTime) + if result.Error != nil { + failedTasks.Add(1) } if result.Error != nil && s.pool.circuitBreaker.Enabled { newCount := atomic.AddInt32(&s.pool.circuitBreakerFailureCount, 1) @@ -539,9 +508,7 @@ func (s *Scheduler) executeTask(task *ScheduledTask) { _ = task.config.Callback(task.ctx, result) } task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result}) - for _, plug := range s.pool.plugins { - plug.AfterTask(getQueueTask(), result) - } + asyncPluginsAfter(s.pool.plugins, result) Logger.Info().Str("taskID", task.payload.ID).Msg("Scheduled task executed") if !task.config.Overlap { atomic.StoreInt32(&task.running, 0) @@ -549,7 +516,6 @@ func (s *Scheduler) executeTask(task *ScheduledTask) { }() } -// AddTask adds a new scheduled task using the supplied context, payload, and options. func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string { var hasDuplicate bool if payload.DedupKey != "" { @@ -574,8 +540,6 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule if options.Callback == nil { options.Callback = s.pool.callback } - - // Determine the schedule from ScheduleSpec or Interval. var sched *Schedule var err error if options.ScheduleSpec != "" { @@ -587,7 +551,6 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule } else { sched = &Schedule{Interval: options.Interval, Recurring: options.Recurring} } - stop := make(chan struct{}) newTask := &ScheduledTask{ id: xid.New().String(), @@ -596,30 +559,29 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule payload: payload, stop: stop, config: SchedulerConfig{ - Callback: options.Callback, - Overlap: options.Overlap, + Callback: options.Callback, + Overlap: options.Overlap, + MaxRetries: 3, }, schedule: sched, } s.storage.Set(newTask.id, newTask) + s.wg.Add(1) go s.schedule(newTask) + totalTasks.Add(1) return newTask.id } -// RemoveTask stops and removes a task by its payload ID. func (s *Scheduler) RemoveTask(id string) error { task, ok := s.storage.Get(id) if !ok { - return fmt.Errorf("No task found with ID: %s\n", id) + return fmt.Errorf("No task found with ID: %s", id) } close(task.stop) - if s.storage != nil { - s.storage.Del(id) - } + s.storage.Del(id) return nil } -// PrintAllTasks prints a summary of all scheduled tasks. func (s *Scheduler) PrintAllTasks() { fmt.Println("Scheduled Tasks:") s.storage.ForEach(func(_ string, task *ScheduledTask) bool { @@ -628,11 +590,10 @@ func (s *Scheduler) PrintAllTasks() { }) } -// PrintExecutionHistory prints the execution history for a task by its ID. func (s *Scheduler) PrintExecutionHistory(id string) error { task, ok := s.storage.Get(id) if !ok { - return fmt.Errorf("No task found with ID: %s\n", id) + return fmt.Errorf("No task found with ID: %s", id) } for _, history := range task.executionHistory { fmt.Printf("Timestamp: %s, Result: %v\n", history.Timestamp, history.Result.Error) @@ -640,38 +601,6 @@ func (s *Scheduler) PrintExecutionHistory(id string) error { return nil } -// getNextRunTime computes the next run time for the task. -func (task *ScheduledTask) getNextRunTime(now time.Time) time.Time { - if task.schedule.Interval > 0 { - return now.Add(task.schedule.Interval) - } - if task.schedule.CronSpec != "" { - cs, err := parseCronSpec(task.schedule.CronSpec) - if err != nil { - Logger.Error().Err(err).Msg("Invalid CRON spec") - return now - } - return nextCronRunTime(now, cs) - } - if !task.schedule.TimeOfDay.IsZero() { - nextRun := time.Date(now.Year(), now.Month(), now.Day(), - task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location()) - if !nextRun.After(now) { - nextRun = nextRun.AddDate(0, 0, 1) - } - return nextRun - } - // For DayOfWeek or DayOfMonth based scheduling, you could add additional logic. - return now -} - -// New type to hold scheduled task information. -type TaskInfo struct { - TaskID string `json:"task_id"` - NextRunTime time.Time `json:"next_run_time"` -} - -// ListScheduledTasks returns details of all scheduled tasks along with their next run time. func (s *Scheduler) ListScheduledTasks() []TaskInfo { now := time.Now() infos := make([]TaskInfo, 0, s.storage.Size()) @@ -685,11 +614,42 @@ func (s *Scheduler) ListScheduledTasks() []TaskInfo { return infos } -// -------------------------------------------------------- -// Additional Helper Functions and Stubs -// -------------------------------------------------------- +func (s *Scheduler) UpdateTask(id string, newSched *Schedule) error { + task, ok := s.storage.Get(id) + if !ok { + return fmt.Errorf("No task found with ID: %s", id) + } + task.schedule = newSched + return nil +} + +func (task *ScheduledTask) getNextRunTime(now time.Time) time.Time { + if task.schedule.Interval > 0 { + return now.Add(task.schedule.Interval) + } + if task.schedule.CronSpec != "" { + cs, err := parseCronSpec(task.schedule.CronSpec) + if err != nil { + Logger.Error().Err(err).Msg("Invalid CRON spec") + return now + } + return nextCronRunTime(now, cs) + } + if !task.schedule.TimeOfDay.IsZero() { + nextRun := time.Date(now.Year(), now.Month(), now.Day(), task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location()) + if !nextRun.After(now) { + nextRun = nextRun.AddDate(0, 0, 1) + } + return nextRun + } + return now +} + +type TaskInfo struct { + TaskID string `json:"task_id"` + NextRunTime time.Time `json:"next_run_time"` +} -// startSpan is a stub for tracing span creation. func startSpan(operation string) (context.Context, func()) { startTime := time.Now() Logger.Info().Str("operation", operation).Msg("Span started") @@ -700,15 +660,98 @@ func startSpan(operation string) (context.Context, func()) { } } -// acquireDistributedLock is a stub for distributed locking. -func acquireDistributedLock(taskID string) bool { - Logger.Info().Str("taskID", taskID).Msg("Acquiring distributed lock (stub)") - return true +var distributedLockMap sync.Map + +type LockEntry struct { + mu sync.Mutex + expiry int64 +} + +func acquireDistributedLock(taskID string) bool { + now := time.Now().UnixNano() + value, _ := distributedLockMap.LoadOrStore(taskID, &LockEntry{expiry: 0}) + entry := value.(*LockEntry) + entry.mu.Lock() + if entry.expiry < now { + entry.expiry = now + int64(30*time.Second) + entry.mu.Unlock() + return true + } + entry.mu.Unlock() + return false } -// releaseDistributedLock is a stub for releasing a distributed lock. func releaseDistributedLock(taskID string) { - Logger.Info().Str("taskID", taskID).Msg("Releasing distributed lock (stub)") + now := time.Now().UnixNano() + if value, ok := distributedLockMap.Load(taskID); ok { + entry := value.(*LockEntry) + entry.mu.Lock() + if entry.expiry > now { + entry.expiry = 0 + } + entry.mu.Unlock() + } +} + +func acquireLockWithRetry(taskID string, retries int) bool { + var i int + delay := 10 * time.Millisecond + for i = 0; i < retries; i++ { + if acquireDistributedLock(taskID) { + return true + } + time.Sleep(delay) + delay = time.Duration(math.Min(float64(delay*2), float64(200*time.Millisecond))) + } + return false +} + +func asyncPluginsBefore(plugins []Plugin, timeout ...time.Duration) { + var wg sync.WaitGroup + dur := 1 * time.Second + if len(timeout) > 0 { + dur = timeout[0] + } + for _, plug := range plugins { + wg.Add(1) + go func(p Plugin) { + defer wg.Done() + p.BeforeTask(getQueueTask()) + }(plug) + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(dur): + } +} + +func asyncPluginsAfter(plugins []Plugin, result Result, timeout ...time.Duration) { + var wg sync.WaitGroup + dur := 1 * time.Second + if len(timeout) > 0 { + dur = timeout[0] + } + for _, plug := range plugins { + wg.Add(1) + go func(p Plugin) { + defer wg.Done() + p.AfterTask(getQueueTask(), result) + }(plug) + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(dur): + } } var taskPool = sync.Pool{ @@ -723,7 +766,6 @@ func getQueueTask() *QueueTask { return queueTaskPool.Get().(*QueueTask) } -// CircuitBreaker holds configuration for error threshold detection. type CircuitBreaker struct { Enabled bool FailureThreshold int