feat: use "GetTags" and "SetTags"

This commit is contained in:
sujit
2025-05-05 10:04:29 +05:45
parent cee16a04a8
commit b2085d72e1

View File

@@ -2,7 +2,9 @@ package mq
import ( import (
"context" "context"
"expvar"
"fmt" "fmt"
"math"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -17,6 +19,9 @@ import (
) )
var Logger = log.DefaultLogger var Logger = log.DefaultLogger
var totalTasks = expvar.NewInt("totalTasks")
var failedTasks = expvar.NewInt("failedTasks")
var execTimeExp = expvar.NewInt("executionTimeMs")
type ScheduleOptions struct { type ScheduleOptions struct {
Handler Handler Handler Handler
@@ -29,49 +34,42 @@ type ScheduleOptions struct {
type SchedulerOption func(*ScheduleOptions) type SchedulerOption func(*ScheduleOptions)
// WithSchedulerHandler sets the handler.
func WithSchedulerHandler(handler Handler) SchedulerOption { func WithSchedulerHandler(handler Handler) SchedulerOption {
return func(opts *ScheduleOptions) { return func(opts *ScheduleOptions) {
opts.Handler = handler opts.Handler = handler
} }
} }
// WithSchedulerCallback sets the callback.
func WithSchedulerCallback(callback Callback) SchedulerOption { func WithSchedulerCallback(callback Callback) SchedulerOption {
return func(opts *ScheduleOptions) { return func(opts *ScheduleOptions) {
opts.Callback = callback opts.Callback = callback
} }
} }
// WithOverlap indicates that overlapping executions are allowed.
func WithOverlap() SchedulerOption { func WithOverlap() SchedulerOption {
return func(opts *ScheduleOptions) { return func(opts *ScheduleOptions) {
opts.Overlap = true opts.Overlap = true
} }
} }
// WithInterval sets a fixed interval.
func WithInterval(interval time.Duration) SchedulerOption { func WithInterval(interval time.Duration) SchedulerOption {
return func(opts *ScheduleOptions) { return func(opts *ScheduleOptions) {
opts.Interval = interval opts.Interval = interval
} }
} }
// WithRecurring indicates that the task should be rescheduled after execution.
func WithRecurring() SchedulerOption { func WithRecurring() SchedulerOption {
return func(opts *ScheduleOptions) { return func(opts *ScheduleOptions) {
opts.Recurring = true opts.Recurring = true
} }
} }
// WithScheduleSpec provides a schedule string (e.g., cron expression, @daily, @every 1h30m, etc.)
func WithScheduleSpec(spec string) SchedulerOption { func WithScheduleSpec(spec string) SchedulerOption {
return func(opts *ScheduleOptions) { return func(opts *ScheduleOptions) {
opts.ScheduleSpec = spec opts.ScheduleSpec = spec
} }
} }
// defaultSchedulerOptions returns the default scheduling options.
func defaultSchedulerOptions() *ScheduleOptions { func defaultSchedulerOptions() *ScheduleOptions {
return &ScheduleOptions{ return &ScheduleOptions{
Interval: time.Minute, Interval: time.Minute,
@@ -79,24 +77,15 @@ func defaultSchedulerOptions() *ScheduleOptions {
} }
} }
// --------------------------------------------------------
// Schedule and Cron Structures
// --------------------------------------------------------
// Schedule holds a schedule. It may be defined via:
// - A fixed time-interval (Interval)
// - A cron spec (CronSpec) string (which may be 5 or 6 fields)
// - A specific time of day, specific days of week or month.
type Schedule struct { type Schedule struct {
TimeOfDay time.Time // Optional: time of day for one-off daily recurrence. TimeOfDay time.Time
CronSpec string // For cron-based scheduling. CronSpec string
DayOfWeek []time.Weekday // Optional: days of the week. DayOfWeek []time.Weekday
DayOfMonth []int // Optional: days of the month. DayOfMonth []int
Interval time.Duration // For duration-based scheduling (e.g. @every). Interval time.Duration
Recurring bool // Indicates if schedule recurs. Recurring bool
} }
// ToHumanReadable returns a humanreadable description of the schedule.
func (s *Schedule) ToHumanReadable() string { func (s *Schedule) ToHumanReadable() string {
var sb strings.Builder var sb strings.Builder
if s.CronSpec != "" { if s.CronSpec != "" {
@@ -143,9 +132,8 @@ func (s *Schedule) ToHumanReadable() string {
return sb.String() return sb.String()
} }
// CronSchedule represents a parsed cron expression. It supports both 5field and 6field (extended) formats.
type CronSchedule struct { type CronSchedule struct {
Seconds string // Optional; if empty, assume "0" Seconds string
Minute string Minute string
Hour string Hour string
DayOfMonth string DayOfMonth string
@@ -153,34 +141,21 @@ type CronSchedule struct {
DayOfWeek string DayOfWeek string
} }
// String returns a summary string for the cron schedule.
func (c CronSchedule) String() string { func (c CronSchedule) String() string {
if c.Seconds != "" && c.Seconds != "0" { if c.Seconds != "" && c.Seconds != "0" {
return fmt.Sprintf("At %s seconds, %s minutes past %s, on %s, during %s, every %s", return fmt.Sprintf("At %s seconds, %s minutes past %s, on %s, during %s, every %s", c.Seconds, c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth)
c.Seconds, c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth)
} }
return fmt.Sprintf("At %s minutes past %s, on %s, during %s, every %s", return fmt.Sprintf("At %s minutes past %s, on %s, during %s, every %s", c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth)
c.Minute, c.Hour, c.DayOfWeek, c.Month, c.DayOfMonth)
} }
// --------------------------------------------------------
// Parsing: Special Schedule Strings and Cron Specs
// --------------------------------------------------------
// parseScheduleSpec inspects a schedule spec string and returns a Schedule.
// The spec may be a special keyword (starting with '@') or a standard cron expression.
func parseScheduleSpec(spec string) (*Schedule, error) { func parseScheduleSpec(spec string) (*Schedule, error) {
s := &Schedule{Recurring: true} // default recurring s := &Schedule{Recurring: true}
if strings.HasPrefix(spec, "@") { if strings.HasPrefix(spec, "@") {
// Handle special cases.
switch { switch {
case strings.HasPrefix(spec, "@every"): case strings.HasPrefix(spec, "@every"):
// Format: "@every <duration>", use time.ParseDuration.
durationStr := strings.TrimSpace(strings.TrimPrefix(spec, "@every")) durationStr := strings.TrimSpace(strings.TrimPrefix(spec, "@every"))
d, err := time.ParseDuration(durationStr) d, err := time.ParseDuration(durationStr)
if err != nil { if err != nil {
// If duration parsing fails, try to support days or weeks.
// For example: "1d", "1w".
if strings.HasSuffix(durationStr, "d") || strings.HasSuffix(durationStr, "w") { if strings.HasSuffix(durationStr, "d") || strings.HasSuffix(durationStr, "w") {
numStr := durationStr[:len(durationStr)-1] numStr := durationStr[:len(durationStr)-1]
num, err2 := strconv.Atoi(numStr) num, err2 := strconv.Atoi(numStr)
@@ -211,20 +186,17 @@ func parseScheduleSpec(spec string) (*Schedule, error) {
s.CronSpec = "0 0 1 1 *" s.CronSpec = "0 0 1 1 *"
return s, nil return s, nil
case spec == "@reboot": case spec == "@reboot":
// For @reboot, you might want to run the task once at startup.
s.Recurring = false s.Recurring = false
return s, nil return s, nil
default: default:
return nil, fmt.Errorf("unknown special schedule: %s", spec) return nil, fmt.Errorf("unknown special schedule: %s", spec)
} }
} else { } else {
// Assume a standard cron spec
s.CronSpec = spec s.CronSpec = spec
return s, nil return s, nil
} }
} }
// parseCronSpecDescription parses a cron spec and returns a humanreadable description.
func parseCronSpecDescription(cronSpec string) (string, error) { func parseCronSpecDescription(cronSpec string) (string, error) {
cs, err := parseCronSpec(cronSpec) cs, err := parseCronSpec(cronSpec)
if err != nil { if err != nil {
@@ -233,46 +205,52 @@ func parseCronSpecDescription(cronSpec string) (string, error) {
return cs.String(), nil return cs.String(), nil
} }
// parseCronSpec parses a cron specification string, supporting either 5 fields or 6 (with seconds).
func parseCronSpec(spec string) (CronSchedule, error) { func parseCronSpec(spec string) (CronSchedule, error) {
parts := strings.Fields(spec) parts := strings.Fields(spec)
if len(parts) == 5 { if len(parts) == 5 {
// Assume no seconds provided; use default "0"
return CronSchedule{ return CronSchedule{
Seconds: "0", Seconds: "0",
Minute: parts[0], Minute: parts[0],
Hour: parts[1], Hour: parts[1],
DayOfMonth: parts[2], DayOfMonth: parts[2],
Month: parts[3], Month: parts[3],
DayOfWeek: parts[4], DayOfWeek: parseCronFieldNames(parts[4], "dow"),
}, nil }, nil
} else if len(parts) == 6 { } else if len(parts) == 6 {
// Extended spec with seconds.
return CronSchedule{ return CronSchedule{
Seconds: parts[0], Seconds: parts[0],
Minute: parts[1], Minute: parts[1],
Hour: parts[2], Hour: parts[2],
DayOfMonth: parts[3], DayOfMonth: parts[3],
Month: parts[4], Month: parts[4],
DayOfWeek: parts[5], DayOfWeek: parseCronFieldNames(parts[5], "dow"),
}, nil }, nil
} }
return CronSchedule{}, fmt.Errorf("invalid CRON spec: expected 5 or 6 fields, got %d", len(parts)) return CronSchedule{}, fmt.Errorf("invalid CRON spec: expected 5 or 6 fields, got %d", len(parts))
} }
// -------------------------------------------------------- func parseCronFieldNames(field string, fieldType string) string {
// Helper: Checking if a time matches a cron field value. lower := strings.ToLower(field)
// (Supports "*" and comma separated list of integers.) if fieldType == "dow" {
// mapping := map[string]string{"sun": "0", "mon": "1", "tue": "2", "wed": "3", "thu": "4", "fri": "5", "sat": "6"}
// For simplicity, we assume all fields in the cron spec represent numeric values. parts := strings.Split(lower, ",")
for i, p := range parts {
p = strings.TrimSpace(p)
if val, ok := mapping[p]; ok {
parts[i] = val
}
}
return strings.Join(parts, ",")
}
return field
}
func matchesCronField(val int, field string) bool { func matchesCronField(val int, field string) bool {
if field == "*" { if field == "*" {
return true return true
} }
// Support lists separated by commas.
parts := strings.Split(field, ",") parts := strings.Split(field, ",")
for _, p := range parts { for _, p := range parts {
// Trim any potential spaces.
p = strings.TrimSpace(p) p = strings.TrimSpace(p)
ival, err := strconv.Atoi(p) ival, err := strconv.Atoi(p)
if err == nil && ival == val { if err == nil && ival == val {
@@ -282,16 +260,13 @@ func matchesCronField(val int, field string) bool {
return false return false
} }
// checkTimeMatchesCron tests whether a given time t satisfies the cron expression.
func checkTimeMatchesCron(t time.Time, cs CronSchedule) bool { func checkTimeMatchesCron(t time.Time, cs CronSchedule) bool {
// Check seconds, minutes, hour, day, month, weekday.
sec := t.Second() sec := t.Second()
minute := t.Minute() minute := t.Minute()
hour := t.Hour() hour := t.Hour()
day := t.Day() day := t.Day()
month := int(t.Month()) month := int(t.Month())
weekday := int(t.Weekday()) // Sunday==0, match cron where Sunday==0 weekday := int(t.Weekday())
if !matchesCronField(sec, cs.Seconds) { if !matchesCronField(sec, cs.Seconds) {
return false return false
} }
@@ -313,37 +288,25 @@ func checkTimeMatchesCron(t time.Time, cs CronSchedule) bool {
return true return true
} }
// nextCronRunTime computes the next time after now that matches the cron spec.
// For simplicity, it iterates minute by minute (or second by second if extended).
func nextCronRunTime(now time.Time, cs CronSchedule) time.Time { func nextCronRunTime(now time.Time, cs CronSchedule) time.Time {
// We'll search up to a year ahead.
searchLimit := now.AddDate(1, 0, 0) searchLimit := now.AddDate(1, 0, 0)
t := now.Add(time.Second) // start a second later t := now.Add(time.Second)
for !t.After(now) { for !t.After(now) {
t = t.Add(time.Second) t = t.Add(time.Second)
} }
// If seconds field is in use (not just "0"), iterate second-by-second.
for t.Before(searchLimit) { for t.Before(searchLimit) {
if checkTimeMatchesCron(t, cs) { if checkTimeMatchesCron(t, cs) {
return t return t
} }
// Increment by one second if seconds precision is needed;
// otherwise, for minute-level precision, you can use t = t.Add(time.Minute)
if cs.Seconds != "0" && cs.Seconds != "*" { if cs.Seconds != "0" && cs.Seconds != "*" {
t = t.Add(time.Second) t = t.Add(time.Second)
} else { } else {
t = t.Add(time.Minute) t = t.Add(time.Minute)
} }
} }
// Fallback: return the search limit if no matching time is found.
return searchLimit return searchLimit
} }
// --------------------------------------------------------
// Scheduled Task and Scheduler Structures
// --------------------------------------------------------
// ScheduledTask represents a scheduled job.
type ScheduledTask struct { type ScheduledTask struct {
ctx context.Context ctx context.Context
handler Handler handler Handler
@@ -357,8 +320,9 @@ type ScheduledTask struct {
} }
type SchedulerConfig struct { type SchedulerConfig struct {
Callback Callback Callback Callback
Overlap bool Overlap bool
MaxRetries int
} }
type ExecutionHistory struct { type ExecutionHistory struct {
@@ -366,23 +330,20 @@ type ExecutionHistory struct {
Result Result Result Result
} }
// Scheduler manages scheduling and executing tasks.
type Scheduler struct { type Scheduler struct {
pool *Pool pool *Pool
storage storage.IMap[string, *ScheduledTask] // added storage field storage storage.IMap[string, *ScheduledTask]
wg sync.WaitGroup
} }
// SchedulerOpt functional option type for Scheduler.
type SchedulerOpt func(*Scheduler) type SchedulerOpt func(*Scheduler)
// WithStorage sets the storage for ScheduledTasks.
func WithStorage(sm storage.IMap[string, *ScheduledTask]) SchedulerOpt { func WithStorage(sm storage.IMap[string, *ScheduledTask]) SchedulerOpt {
return func(s *Scheduler) { return func(s *Scheduler) {
s.storage = sm s.storage = sm
} }
} }
// NewScheduler constructor to use SchedulerOpt.
func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler { func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler {
s := &Scheduler{ s := &Scheduler{
pool: pool, pool: pool,
@@ -394,9 +355,9 @@ func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler {
return s return s
} }
// Start begins executing scheduled tasks.
func (s *Scheduler) Start() { func (s *Scheduler) Start() {
s.storage.ForEach(func(_ string, task *ScheduledTask) bool { s.storage.ForEach(func(_ string, task *ScheduledTask) bool {
s.wg.Add(1)
go s.schedule(task) go s.schedule(task)
return true return true
}) })
@@ -404,16 +365,14 @@ func (s *Scheduler) Start() {
func (s *Scheduler) Close() error { func (s *Scheduler) Close() error {
s.pool.Stop() s.pool.Stop()
s.wg.Wait()
return nil return nil
} }
// schedule dispatches task execution based on its schedule.
func (s *Scheduler) schedule(task *ScheduledTask) { func (s *Scheduler) schedule(task *ScheduledTask) {
// Use the task context for cancellation. defer s.wg.Done()
ctx := task.ctx ctx := task.ctx
// Main scheduling loop.
if task.schedule.Interval > 0 { if task.schedule.Interval > 0 {
// Duration-based scheduling (@every).
ticker := time.NewTicker(task.schedule.Interval) ticker := time.NewTicker(task.schedule.Interval)
defer ticker.Stop() defer ticker.Stop()
if task.schedule.Recurring { if task.schedule.Recurring {
@@ -444,7 +403,6 @@ func (s *Scheduler) schedule(task *ScheduledTask) {
} }
} }
} else if task.schedule.CronSpec != "" { } else if task.schedule.CronSpec != "" {
// Cron-based scheduling.
cs, err := parseCronSpec(task.schedule.CronSpec) cs, err := parseCronSpec(task.schedule.CronSpec)
if err != nil { if err != nil {
Logger.Error().Err(err).Msg("Invalid CRON spec") Logger.Error().Err(err).Msg("Invalid CRON spec")
@@ -466,11 +424,9 @@ func (s *Scheduler) schedule(task *ScheduledTask) {
} }
} }
} else if !task.schedule.TimeOfDay.IsZero() { } else if !task.schedule.TimeOfDay.IsZero() {
// A one-off daily time-of-day scheduling.
for { for {
now := time.Now() now := time.Now()
nextRun := time.Date(now.Year(), now.Month(), now.Day(), nextRun := time.Date(now.Year(), now.Month(), now.Day(), task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location())
task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location())
if !nextRun.After(now) { if !nextRun.After(now) {
nextRun = nextRun.AddDate(0, 0, 1) nextRun = nextRun.AddDate(0, 0, 1)
} }
@@ -492,35 +448,48 @@ func (s *Scheduler) schedule(task *ScheduledTask) {
} }
} }
// executeTask runs the task. It checks the overlap setting and uses context cancellation.
func (s *Scheduler) executeTask(task *ScheduledTask) { func (s *Scheduler) executeTask(task *ScheduledTask) {
// If overlapping executions are not allowed, use an atomic flag.
if !task.config.Overlap { if !task.config.Overlap {
if !atomic.CompareAndSwapInt32(&task.running, 0, 1) { if !atomic.CompareAndSwapInt32(&task.running, 0, 1) {
Logger.Warn().Str("taskID", task.payload.ID).Msg("Skipping execution due to overlap configuration") Logger.Warn().Str("taskID", task.payload.ID).Msg("Skipping execution due to overlap configuration")
return return
} }
} }
s.wg.Add(1)
go func() { go func() {
defer s.wg.Done()
_, cancelSpan := startSpan("executeTask") _, cancelSpan := startSpan("executeTask")
defer cancelSpan() defer cancelSpan()
defer RecoverPanic(RecoverTitle) defer RecoverPanic(RecoverTitle)
start := time.Now() var attempt int
for _, plug := range s.pool.plugins { var result Result
plug.BeforeTask(getQueueTask()) var delay time.Duration = 100 * time.Millisecond
} for {
if !acquireDistributedLock(task.payload.ID) { start := time.Now()
Logger.Warn().Str("taskID", task.payload.ID).Msg("Failed to acquire distributed lock") asyncPluginsBefore(s.pool.plugins)
if !task.config.Overlap { if !acquireLockWithRetry(task.payload.ID, 3) {
atomic.StoreInt32(&task.running, 0) Logger.Warn().Str("taskID", task.payload.ID).Msg("Failed to acquire distributed lock after retries")
if !task.config.Overlap {
atomic.StoreInt32(&task.running, 0)
}
return
} }
return result = task.handler(task.ctx, task.payload)
releaseDistributedLock(task.payload.ID)
elapsed := time.Since(start).Milliseconds()
execTimeExp.Add(elapsed)
if s.pool.diagnosticsEnabled {
Logger.Info().Str("taskID", task.payload.ID).Msgf("Executed in %d ms", elapsed)
}
if result.Error == nil || attempt >= task.config.MaxRetries {
break
}
attempt++
time.Sleep(delay)
delay = time.Duration(math.Min(float64(delay*2), float64(5*time.Second)))
} }
defer releaseDistributedLock(task.payload.ID) if result.Error != nil {
result := task.handler(task.ctx, task.payload) failedTasks.Add(1)
execTime := time.Since(start).Milliseconds()
if s.pool.diagnosticsEnabled {
Logger.Info().Str("taskID", task.payload.ID).Msgf("Executed in %d ms", execTime)
} }
if result.Error != nil && s.pool.circuitBreaker.Enabled { if result.Error != nil && s.pool.circuitBreaker.Enabled {
newCount := atomic.AddInt32(&s.pool.circuitBreakerFailureCount, 1) newCount := atomic.AddInt32(&s.pool.circuitBreakerFailureCount, 1)
@@ -539,9 +508,7 @@ func (s *Scheduler) executeTask(task *ScheduledTask) {
_ = task.config.Callback(task.ctx, result) _ = task.config.Callback(task.ctx, result)
} }
task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result}) task.executionHistory = append(task.executionHistory, ExecutionHistory{Timestamp: time.Now(), Result: result})
for _, plug := range s.pool.plugins { asyncPluginsAfter(s.pool.plugins, result)
plug.AfterTask(getQueueTask(), result)
}
Logger.Info().Str("taskID", task.payload.ID).Msg("Scheduled task executed") Logger.Info().Str("taskID", task.payload.ID).Msg("Scheduled task executed")
if !task.config.Overlap { if !task.config.Overlap {
atomic.StoreInt32(&task.running, 0) atomic.StoreInt32(&task.running, 0)
@@ -549,7 +516,6 @@ func (s *Scheduler) executeTask(task *ScheduledTask) {
}() }()
} }
// AddTask adds a new scheduled task using the supplied context, payload, and options.
func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string { func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string {
var hasDuplicate bool var hasDuplicate bool
if payload.DedupKey != "" { if payload.DedupKey != "" {
@@ -574,8 +540,6 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule
if options.Callback == nil { if options.Callback == nil {
options.Callback = s.pool.callback options.Callback = s.pool.callback
} }
// Determine the schedule from ScheduleSpec or Interval.
var sched *Schedule var sched *Schedule
var err error var err error
if options.ScheduleSpec != "" { if options.ScheduleSpec != "" {
@@ -587,7 +551,6 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule
} else { } else {
sched = &Schedule{Interval: options.Interval, Recurring: options.Recurring} sched = &Schedule{Interval: options.Interval, Recurring: options.Recurring}
} }
stop := make(chan struct{}) stop := make(chan struct{})
newTask := &ScheduledTask{ newTask := &ScheduledTask{
id: xid.New().String(), id: xid.New().String(),
@@ -596,30 +559,29 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule
payload: payload, payload: payload,
stop: stop, stop: stop,
config: SchedulerConfig{ config: SchedulerConfig{
Callback: options.Callback, Callback: options.Callback,
Overlap: options.Overlap, Overlap: options.Overlap,
MaxRetries: 3,
}, },
schedule: sched, schedule: sched,
} }
s.storage.Set(newTask.id, newTask) s.storage.Set(newTask.id, newTask)
s.wg.Add(1)
go s.schedule(newTask) go s.schedule(newTask)
totalTasks.Add(1)
return newTask.id return newTask.id
} }
// RemoveTask stops and removes a task by its payload ID.
func (s *Scheduler) RemoveTask(id string) error { func (s *Scheduler) RemoveTask(id string) error {
task, ok := s.storage.Get(id) task, ok := s.storage.Get(id)
if !ok { if !ok {
return fmt.Errorf("No task found with ID: %s\n", id) return fmt.Errorf("No task found with ID: %s", id)
} }
close(task.stop) close(task.stop)
if s.storage != nil { s.storage.Del(id)
s.storage.Del(id)
}
return nil return nil
} }
// PrintAllTasks prints a summary of all scheduled tasks.
func (s *Scheduler) PrintAllTasks() { func (s *Scheduler) PrintAllTasks() {
fmt.Println("Scheduled Tasks:") fmt.Println("Scheduled Tasks:")
s.storage.ForEach(func(_ string, task *ScheduledTask) bool { s.storage.ForEach(func(_ string, task *ScheduledTask) bool {
@@ -628,11 +590,10 @@ func (s *Scheduler) PrintAllTasks() {
}) })
} }
// PrintExecutionHistory prints the execution history for a task by its ID.
func (s *Scheduler) PrintExecutionHistory(id string) error { func (s *Scheduler) PrintExecutionHistory(id string) error {
task, ok := s.storage.Get(id) task, ok := s.storage.Get(id)
if !ok { if !ok {
return fmt.Errorf("No task found with ID: %s\n", id) return fmt.Errorf("No task found with ID: %s", id)
} }
for _, history := range task.executionHistory { for _, history := range task.executionHistory {
fmt.Printf("Timestamp: %s, Result: %v\n", history.Timestamp, history.Result.Error) fmt.Printf("Timestamp: %s, Result: %v\n", history.Timestamp, history.Result.Error)
@@ -640,38 +601,6 @@ func (s *Scheduler) PrintExecutionHistory(id string) error {
return nil return nil
} }
// getNextRunTime computes the next run time for the task.
func (task *ScheduledTask) getNextRunTime(now time.Time) time.Time {
if task.schedule.Interval > 0 {
return now.Add(task.schedule.Interval)
}
if task.schedule.CronSpec != "" {
cs, err := parseCronSpec(task.schedule.CronSpec)
if err != nil {
Logger.Error().Err(err).Msg("Invalid CRON spec")
return now
}
return nextCronRunTime(now, cs)
}
if !task.schedule.TimeOfDay.IsZero() {
nextRun := time.Date(now.Year(), now.Month(), now.Day(),
task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location())
if !nextRun.After(now) {
nextRun = nextRun.AddDate(0, 0, 1)
}
return nextRun
}
// For DayOfWeek or DayOfMonth based scheduling, you could add additional logic.
return now
}
// New type to hold scheduled task information.
type TaskInfo struct {
TaskID string `json:"task_id"`
NextRunTime time.Time `json:"next_run_time"`
}
// ListScheduledTasks returns details of all scheduled tasks along with their next run time.
func (s *Scheduler) ListScheduledTasks() []TaskInfo { func (s *Scheduler) ListScheduledTasks() []TaskInfo {
now := time.Now() now := time.Now()
infos := make([]TaskInfo, 0, s.storage.Size()) infos := make([]TaskInfo, 0, s.storage.Size())
@@ -685,11 +614,42 @@ func (s *Scheduler) ListScheduledTasks() []TaskInfo {
return infos return infos
} }
// -------------------------------------------------------- func (s *Scheduler) UpdateTask(id string, newSched *Schedule) error {
// Additional Helper Functions and Stubs task, ok := s.storage.Get(id)
// -------------------------------------------------------- if !ok {
return fmt.Errorf("No task found with ID: %s", id)
}
task.schedule = newSched
return nil
}
func (task *ScheduledTask) getNextRunTime(now time.Time) time.Time {
if task.schedule.Interval > 0 {
return now.Add(task.schedule.Interval)
}
if task.schedule.CronSpec != "" {
cs, err := parseCronSpec(task.schedule.CronSpec)
if err != nil {
Logger.Error().Err(err).Msg("Invalid CRON spec")
return now
}
return nextCronRunTime(now, cs)
}
if !task.schedule.TimeOfDay.IsZero() {
nextRun := time.Date(now.Year(), now.Month(), now.Day(), task.schedule.TimeOfDay.Hour(), task.schedule.TimeOfDay.Minute(), 0, 0, now.Location())
if !nextRun.After(now) {
nextRun = nextRun.AddDate(0, 0, 1)
}
return nextRun
}
return now
}
type TaskInfo struct {
TaskID string `json:"task_id"`
NextRunTime time.Time `json:"next_run_time"`
}
// startSpan is a stub for tracing span creation.
func startSpan(operation string) (context.Context, func()) { func startSpan(operation string) (context.Context, func()) {
startTime := time.Now() startTime := time.Now()
Logger.Info().Str("operation", operation).Msg("Span started") Logger.Info().Str("operation", operation).Msg("Span started")
@@ -700,15 +660,98 @@ func startSpan(operation string) (context.Context, func()) {
} }
} }
// acquireDistributedLock is a stub for distributed locking. var distributedLockMap sync.Map
func acquireDistributedLock(taskID string) bool {
Logger.Info().Str("taskID", taskID).Msg("Acquiring distributed lock (stub)") type LockEntry struct {
return true mu sync.Mutex
expiry int64
}
func acquireDistributedLock(taskID string) bool {
now := time.Now().UnixNano()
value, _ := distributedLockMap.LoadOrStore(taskID, &LockEntry{expiry: 0})
entry := value.(*LockEntry)
entry.mu.Lock()
if entry.expiry < now {
entry.expiry = now + int64(30*time.Second)
entry.mu.Unlock()
return true
}
entry.mu.Unlock()
return false
} }
// releaseDistributedLock is a stub for releasing a distributed lock.
func releaseDistributedLock(taskID string) { func releaseDistributedLock(taskID string) {
Logger.Info().Str("taskID", taskID).Msg("Releasing distributed lock (stub)") now := time.Now().UnixNano()
if value, ok := distributedLockMap.Load(taskID); ok {
entry := value.(*LockEntry)
entry.mu.Lock()
if entry.expiry > now {
entry.expiry = 0
}
entry.mu.Unlock()
}
}
func acquireLockWithRetry(taskID string, retries int) bool {
var i int
delay := 10 * time.Millisecond
for i = 0; i < retries; i++ {
if acquireDistributedLock(taskID) {
return true
}
time.Sleep(delay)
delay = time.Duration(math.Min(float64(delay*2), float64(200*time.Millisecond)))
}
return false
}
func asyncPluginsBefore(plugins []Plugin, timeout ...time.Duration) {
var wg sync.WaitGroup
dur := 1 * time.Second
if len(timeout) > 0 {
dur = timeout[0]
}
for _, plug := range plugins {
wg.Add(1)
go func(p Plugin) {
defer wg.Done()
p.BeforeTask(getQueueTask())
}(plug)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(dur):
}
}
func asyncPluginsAfter(plugins []Plugin, result Result, timeout ...time.Duration) {
var wg sync.WaitGroup
dur := 1 * time.Second
if len(timeout) > 0 {
dur = timeout[0]
}
for _, plug := range plugins {
wg.Add(1)
go func(p Plugin) {
defer wg.Done()
p.AfterTask(getQueueTask(), result)
}(plug)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(dur):
}
} }
var taskPool = sync.Pool{ var taskPool = sync.Pool{
@@ -723,7 +766,6 @@ func getQueueTask() *QueueTask {
return queueTaskPool.Get().(*QueueTask) return queueTaskPool.Get().(*QueueTask)
} }
// CircuitBreaker holds configuration for error threshold detection.
type CircuitBreaker struct { type CircuitBreaker struct {
Enabled bool Enabled bool
FailureThreshold int FailureThreshold int