diff --git a/docs/docs.go b/docs/docs.go index e478638a..24a5cd36 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -3371,10 +3371,17 @@ const docTemplate = `{ "reference": { "type": "string" }, + "scheduler": { + "type": "string" + }, "stale_timeout_seconds": { "type": "integer", "format": "uint64" }, + "timeout_seconds": { + "type": "integer", + "format": "uint64" + }, "type": { "type": "string", "enum": [ diff --git a/docs/swagger.json b/docs/swagger.json index 1c9a0a9d..74150432 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3364,10 +3364,17 @@ "reference": { "type": "string" }, + "scheduler": { + "type": "string" + }, "stale_timeout_seconds": { "type": "integer", "format": "uint64" }, + "timeout_seconds": { + "type": "integer", + "format": "uint64" + }, "type": { "type": "string", "enum": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 41c84e81..4f8f9b00 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -746,9 +746,14 @@ definitions: type: integer reference: type: string + scheduler: + type: string stale_timeout_seconds: format: uint64 type: integer + timeout_seconds: + format: uint64 + type: integer type: enum: - ffmpeg diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 2e0acc99..8fcf9744 100644 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -32,6 +32,8 @@ type ProcessConfig struct { Reconnect bool ReconnectDelay time.Duration StaleTimeout time.Duration + Timeout time.Duration + Scheduler string Args []string Parser process.Parser Logger log.Logger @@ -115,12 +117,24 @@ func New(config Config) (FFmpeg, error) { } func (f *ffmpeg) New(config ProcessConfig) (process.Process, error) { + var scheduler process.Scheduler = nil + var err error + + if len(config.Scheduler) != 0 { + scheduler, err = process.NewScheduler(config.Scheduler) + if err != nil { + return nil, err + } + } + ffmpeg, err := process.New(process.Config{ Binary: f.binary, Args: config.Args, Reconnect: config.Reconnect, ReconnectDelay: config.ReconnectDelay, StaleTimeout: config.StaleTimeout, + Timeout: config.Timeout, + Scheduler: scheduler, Parser: config.Parser, Logger: config.Logger, OnArgs: config.OnArgs, diff --git a/go.mod b/go.mod index 2ab961f4..93c31fea 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( require ( github.com/KyleBanks/depth v1.2.1 // indirect + github.com/adhocore/gronx v1.1.2 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 4f0530a5..37ddde62 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030I github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/adhocore/gronx v1.1.2 h1:Hgm+d8SyGtn+rCoDkxZq3nLNFLLkzRGR7L2ziRRD1w8= +github.com/adhocore/gronx v1.1.2/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/agiledragon/gomonkey/v2 v2.3.1/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= diff --git a/http/api/process.go b/http/api/process.go index c627a0c6..f0d25c2c 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -52,6 +52,8 @@ type ProcessConfig struct { ReconnectDelay uint64 `json:"reconnect_delay_seconds" format:"uint64"` Autostart bool `json:"autostart"` StaleTimeout uint64 `json:"stale_timeout_seconds" format:"uint64"` + Timeout uint64 `json:"timeout_seconds" format:"uint64"` + Scheduler string `json:"scheduler"` Limits ProcessConfigLimits `json:"limits"` } @@ -65,6 +67,8 @@ func (cfg *ProcessConfig) Marshal() *app.Config { ReconnectDelay: cfg.ReconnectDelay, Autostart: cfg.Autostart, StaleTimeout: cfg.StaleTimeout, + Timeout: cfg.Timeout, + Scheduler: cfg.Scheduler, LimitCPU: cfg.Limits.CPU, LimitMemory: cfg.Limits.Memory * 1024 * 1024, LimitWaitFor: cfg.Limits.WaitFor, @@ -144,6 +148,8 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) { cfg.ReconnectDelay = c.ReconnectDelay cfg.Autostart = c.Autostart cfg.StaleTimeout = c.StaleTimeout + cfg.Timeout = c.Timeout + cfg.Scheduler = c.Scheduler cfg.Limits.CPU = c.LimitCPU cfg.Limits.Memory = c.LimitMemory / 1024 / 1024 cfg.Limits.WaitFor = c.LimitWaitFor diff --git a/process/process.go b/process/process.go index 2f8e7899..0f34a105 100644 --- a/process/process.go +++ b/process/process.go @@ -51,9 +51,11 @@ type Config struct { Reconnect bool // Whether to restart the process if it exited ReconnectDelay time.Duration // Duration to wait before restarting the process StaleTimeout time.Duration // Kill the process after this duration if it doesn't produce any output + Timeout time.Duration // Kill the process after this duration LimitCPU float64 // Kill the process if the CPU usage in percent is above this value LimitMemory uint64 // Kill the process if the memory consumption in bytes is above this value LimitDuration time.Duration // Kill the process if the limits are exceeded for this duration + Scheduler Scheduler // A scheduler Parser Parser // A parser for the output of the process OnArgs func(args []string) []string // A callback which is called right before the process will start with the command args OnStart func() // A callback which is called after the process started @@ -67,6 +69,7 @@ type Status struct { State string // State is the current state of the process. See stateType for the known states. States States // States is the cumulative history of states the process had. Order string // Order is the wanted condition of process, either "start" or "stop" + Reconnect time.Duration // Reconnect is the time until the next reconnect, negative if no reconnect is scheduled. Duration time.Duration // Duration is the time since the last change of the state Time time.Time // Time is the time of the last change of the state CPU float64 // Used CPU in percent @@ -168,11 +171,15 @@ type process struct { lock sync.Mutex } reconn struct { - enable bool - delay time.Duration - timer *time.Timer - lock sync.Mutex + enable bool + delay time.Duration + reconnectAt time.Time + timer *time.Timer + lock sync.Mutex } + timeout time.Duration + stopTimer *time.Timer + stopTimerLock sync.Mutex killTimer *time.Timer killTimerLock sync.Mutex logger log.Logger @@ -184,7 +191,8 @@ type process struct { onStateChange func(from, to string) lock sync.Mutex } - limits Limiter + limits Limiter + scheduler Scheduler } var _ Process = &process{} @@ -192,10 +200,12 @@ var _ Process = &process{} // New creates a new process wrapper func New(config Config) (Process, error) { p := &process{ - binary: config.Binary, - cmd: nil, - parser: config.Parser, - logger: config.Logger, + binary: config.Binary, + cmd: nil, + timeout: config.Timeout, + parser: config.Parser, + logger: config.Logger, + scheduler: config.Scheduler, } p.args = make([]string, len(config.Args)) @@ -352,9 +362,11 @@ func (p *process) setState(state stateType) error { p.state.time = time.Now() + p.callbacks.lock.Lock() if p.callbacks.onStateChange != nil { - go p.callbacks.onStateChange(prevState.String(), p.state.state.String()) + p.callbacks.onStateChange(prevState.String(), p.state.state.String()) } + p.callbacks.lock.Unlock() return nil } @@ -386,7 +398,7 @@ func (p *process) Status() Status { p.state.lock.Lock() stateTime := p.state.time - stateString := p.state.state.String() + state := p.state.state states := p.state.states p.state.lock.Unlock() @@ -395,18 +407,25 @@ func (p *process) Status() Status { p.order.lock.Unlock() s := Status{ - State: stateString, - States: states, - Order: order, - Duration: time.Since(stateTime), - Time: stateTime, - CPU: cpu, - Memory: memory, + State: state.String(), + States: states, + Order: order, + Reconnect: time.Duration(-1), + Duration: time.Since(stateTime), + Time: stateTime, + CPU: cpu, + Memory: memory, } s.CommandArgs = make([]string, len(p.args)) copy(s.CommandArgs, p.args) + if order == "start" && !state.IsRunning() { + p.reconn.lock.Lock() + s.Reconnect = time.Until(p.reconn.reconnectAt) + p.reconn.lock.Unlock() + } + return s } @@ -428,6 +447,17 @@ func (p *process) Start() error { p.order.order = "start" + if p.scheduler != nil { + next, err := p.scheduler.Next() + if err != nil { + return err + } + + p.reconnect(next) + + return nil + } + err := p.start() if err != nil { p.debuglogger.WithFields(log.Fields{ @@ -462,12 +492,32 @@ func (p *process) start() error { p.setState(stateStarting) args := p.args + + p.callbacks.lock.Lock() if p.callbacks.onArgs != nil { args = make([]string, len(p.args)) copy(args, p.args) args = p.callbacks.onArgs(args) } + p.callbacks.lock.Unlock() + + // Start the stop timeout if enabled + if p.timeout > time.Duration(0) { + p.stopTimerLock.Lock() + if p.stopTimer == nil { + // Only create a new timer if there isn't already one running + p.stopTimer = time.AfterFunc(p.timeout, func() { + p.Kill(false) + + p.stopTimerLock.Lock() + p.stopTimer.Stop() + p.stopTimer = nil + p.stopTimerLock.Unlock() + }) + } + p.stopTimerLock.Unlock() + } p.cmd = exec.Command(p.binary, args...) p.cmd.Env = []string{} @@ -478,7 +528,8 @@ func (p *process) start() error { p.parser.Parse(err.Error()) p.logger.WithError(err).Error().Log("Command failed") - p.reconnect() + + p.reconnect(p.delay(stateFailed)) return err } @@ -487,7 +538,7 @@ func (p *process) start() error { p.parser.Parse(err.Error()) p.logger.WithError(err).Error().Log("Command failed") - p.reconnect() + p.reconnect(p.delay(stateFailed)) return err } @@ -503,9 +554,11 @@ func (p *process) start() error { p.logger.Info().Log("Started") p.debuglogger.Debug().Log("Started") + p.callbacks.lock.Lock() if p.callbacks.onStart != nil { - go p.callbacks.onStart() + p.callbacks.onStart() } + p.callbacks.lock.Unlock() // Start the reader go p.reader() @@ -652,21 +705,21 @@ func (p *process) stop(wait bool) error { } // reconnect will setup a timer to restart the process -func (p *process) reconnect() { - // If restarting a process is not enabled, don't do anything - if !p.reconn.enable { +func (p *process) reconnect(delay time.Duration) { + if delay < time.Duration(0) { return } // Stop a currently running timer p.unreconnect() - p.logger.Info().Log("Scheduling restart in %s", p.reconn.delay) + p.logger.Info().Log("Scheduling restart in %s", delay) p.reconn.lock.Lock() defer p.reconn.lock.Unlock() - p.reconn.timer = time.AfterFunc(p.reconn.delay, func() { + p.reconn.reconnectAt = time.Now().Add(delay) + p.reconn.timer = time.AfterFunc(delay, func() { p.order.lock.Lock() defer p.order.lock.Unlock() @@ -819,6 +872,17 @@ func (p *process) waiter() { p.limits.Stop() + // Stop the stop timer + if state == stateFinished { + // Only clear the timer if the process finished normally + p.stopTimerLock.Lock() + if p.stopTimer != nil { + p.stopTimer.Stop() + p.stopTimer = nil + } + p.stopTimerLock.Unlock() + } + // Stop the kill timer p.killTimerLock.Lock() if p.killTimer != nil { @@ -844,7 +908,7 @@ func (p *process) waiter() { // Call the onExit callback p.callbacks.lock.Lock() if p.callbacks.onExit != nil { - go p.callbacks.onExit(state.String()) + p.callbacks.onExit(state.String()) } p.callbacks.lock.Unlock() @@ -858,10 +922,61 @@ func (p *process) waiter() { // Restart the process if p.order.order == "start" { - p.reconnect() + p.reconnect(p.delay(state)) } } +// delay returns the duration for the next reconnect of the process. If no reconnect is +// wanted, it returns a negative duration. +func (p *process) delay(state stateType) time.Duration { + // By default, reconnect after the configured delay. + delay := p.reconn.delay + + if p.scheduler == nil { + // No scheduler has been provided, reconnect in any case, if enabled. + if !p.reconn.enable { + return time.Duration(-1) + } + + return delay + } + + // Get the next scheduled start time. + next, err := p.scheduler.Next() + if err == nil { + // There's a next scheduled time. + if state == stateFinished { + // If the process finished without error, reconnect at the next scheduled time. + delay = next + } else { + // The process finished with an error. + if !p.reconn.enable { + // If reconnect is not enabled, reconnect at the next scheduled time. + delay = next + } else { + // If the next scheduled time is closer than the next configured delay, + // reconnect at the next scheduled time + if next < p.reconn.delay { + delay = next + } + } + } + } else { + // No next scheduled time. + if state == stateFinished { + // If the process finished without error, don't reconnect. + delay = time.Duration(-1) + } else { + // If the process finished with an error, reconnect if enabled. + if !p.reconn.enable { + delay = time.Duration(-1) + } + } + } + + return delay +} + // scanLine splits the data on \r, \n, or \r\n line endings func scanLine(data []byte, atEOF bool) (advance int, token []byte, err error) { // Skip leading spaces. diff --git a/process/process_test.go b/process/process_test.go index aa6dd429..b098b52d 100644 --- a/process/process_test.go +++ b/process/process_test.go @@ -1,6 +1,7 @@ package process import ( + "sync" "testing" "time" @@ -235,3 +236,335 @@ func TestProcessForceKill(t *testing.T) { require.Equal(t, "killed", p.Status().State) } + +func TestProcessDuration(t *testing.T) { + binary, err := testhelper.BuildBinary("sigint", "../internal/testhelper") + require.NoError(t, err, "Failed to build helper program") + + p, err := New(Config{ + Binary: binary, + Args: []string{}, + Timeout: 3 * time.Second, + }) + require.NoError(t, err) + + status := p.Status() + require.Equal(t, "stop", status.Order) + require.Equal(t, "finished", status.State) + + err = p.Start() + require.NoError(t, err) + + time.Sleep(time.Second) + + status = p.Status() + require.Equal(t, "start", status.Order) + require.Equal(t, "running", status.State) + + time.Sleep(5 * time.Second) + + status = p.Status() + require.Equal(t, "start", status.Order) + require.Equal(t, "finished", status.State) + + px := p.(*process) + + require.Nil(t, px.stopTimer) +} + +func TestProcessSchedulePointInTime(t *testing.T) { + now := time.Now() + s, err := NewScheduler(now.Add(5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ := New(Config{ + Binary: "sleep", + Args: []string{ + "5", + }, + Reconnect: false, + Scheduler: s, + }) + + status := p.Status() + require.Equal(t, "stop", status.Order) + require.Equal(t, "finished", status.State) + + err = p.Start() + require.NoError(t, err) + + status = p.Status() + require.Equal(t, "start", status.Order) + require.Equal(t, "finished", status.State) + require.Greater(t, status.Reconnect, time.Duration(0)) + + time.Sleep(status.Reconnect + (2 * time.Second)) + + status = p.Status() + require.Equal(t, "running", status.State) + + time.Sleep(5 * time.Second) + + status = p.Status() + require.Equal(t, "finished", status.State) + require.Less(t, status.Reconnect, time.Duration(0)) +} + +func TestProcessSchedulePointInTimeGone(t *testing.T) { + now := time.Now() + s, err := NewScheduler(now.Add(-5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ := New(Config{ + Binary: "sleep", + Args: []string{ + "5", + }, + Reconnect: false, + Scheduler: s, + }) + + status := p.Status() + require.Equal(t, "stop", status.Order) + require.Equal(t, "finished", status.State) + + err = p.Start() + require.Error(t, err) + + status = p.Status() + require.Equal(t, "start", status.Order) + require.Equal(t, "finished", status.State) +} + +func TestProcessScheduleCron(t *testing.T) { + s, err := NewScheduler("* * * * *") + require.NoError(t, err) + + p, _ := New(Config{ + Binary: "sleep", + Args: []string{ + "5", + }, + Reconnect: false, + Scheduler: s, + }) + + status := p.Status() + require.Equal(t, "stop", status.Order) + require.Equal(t, "finished", status.State) + + err = p.Start() + require.NoError(t, err) + + status = p.Status() + + time.Sleep(status.Reconnect + (2 * time.Second)) + + status = p.Status() + require.Equal(t, "running", status.State) + + time.Sleep(5 * time.Second) + + status = p.Status() + require.Equal(t, "finished", status.State) + require.Greater(t, status.Reconnect, time.Duration(0)) +} + +func TestProcessDelayNoScheduler(t *testing.T) { + p, _ := New(Config{ + Binary: "sleep", + Reconnect: false, + ReconnectDelay: 5 * time.Second, + }) + + px := p.(*process) + + // negative delay for finished process + d := px.delay(stateFinished) + require.Less(t, d, time.Duration(0)) + + // negative delay for failed process + d = px.delay(stateFailed) + require.Less(t, d, time.Duration(0)) + + p, _ = New(Config{ + Binary: "sleep", + Reconnect: true, + ReconnectDelay: 5 * time.Second, + }) + + px = p.(*process) + + // positive delay for finished process + d = px.delay(stateFinished) + require.Greater(t, d, time.Duration(0)) + + // positive delay for failed process + d = px.delay(stateFailed) + require.Greater(t, d, time.Duration(0)) +} + +func TestProcessDelaySchedulerNoReconnect(t *testing.T) { + now := time.Now() + s, err := NewScheduler(now.Add(5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ := New(Config{ + Binary: "sleep", + Reconnect: false, + ReconnectDelay: 1 * time.Second, + Scheduler: s, + }) + + px := p.(*process) + + // scheduled delay for finished process + d := px.delay(stateFinished) + require.Greater(t, d, time.Second) + + // scheduled delay for failed process + d = px.delay(stateFailed) + require.Greater(t, d, time.Second) + + now = time.Now() + s, err = NewScheduler(now.Add(-5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ = New(Config{ + Binary: "sleep", + Reconnect: false, + ReconnectDelay: 1 * time.Second, + Scheduler: s, + }) + + px = p.(*process) + + // negative delay for finished process + d = px.delay(stateFinished) + require.Less(t, d, time.Duration(0)) + + // negative delay for failed process + d = px.delay(stateFailed) + require.Less(t, d, time.Duration(0)) +} + +func TestProcessDelaySchedulerReconnect(t *testing.T) { + now := time.Now() + s, err := NewScheduler(now.Add(5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ := New(Config{ + Binary: "sleep", + Reconnect: true, + ReconnectDelay: 1 * time.Second, + Scheduler: s, + }) + + px := p.(*process) + + // scheduled delay for finished process + d := px.delay(stateFinished) + require.Greater(t, d, time.Second) + + // reconnect delay for failed process + d = px.delay(stateFailed) + require.Equal(t, d, time.Second) + + now = time.Now() + s, err = NewScheduler(now.Add(-5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ = New(Config{ + Binary: "sleep", + Reconnect: true, + ReconnectDelay: 1 * time.Second, + Scheduler: s, + }) + + px = p.(*process) + + // negative delay for finished process + d = px.delay(stateFinished) + require.Less(t, d, time.Duration(0)) + + // reconnect delay for failed process + d = px.delay(stateFailed) + require.Equal(t, d, time.Second) + + now = time.Now() + s, err = NewScheduler(now.Add(5 * time.Second).Format(time.RFC3339)) + require.NoError(t, err) + + p, _ = New(Config{ + Binary: "sleep", + Reconnect: true, + ReconnectDelay: 10 * time.Second, + Scheduler: s, + }) + + px = p.(*process) + + // scheduled delay for failed process + d = px.delay(stateFailed) + require.Less(t, d, 10*time.Second) +} + +func TestProcessCallbacks(t *testing.T) { + var args []string + onStart := false + onExit := "" + onState := []string{} + + lock := sync.Mutex{} + + p, err := New(Config{ + Binary: "sleep", + Args: []string{ + "2", + }, + Reconnect: false, + OnArgs: func(a []string) []string { + lock.Lock() + defer lock.Unlock() + + args = make([]string, len(a)) + copy(args, a) + return a + }, + OnStart: func() { + lock.Lock() + defer lock.Unlock() + + onStart = true + }, + OnExit: func(state string) { + lock.Lock() + defer lock.Unlock() + + onExit = state + }, + OnStateChange: func(from, to string) { + lock.Lock() + defer lock.Unlock() + + onState = append(onState, from+"/"+to) + }, + }) + require.NoError(t, err) + + err = p.Start() + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + lock.Lock() + require.ElementsMatch(t, []string{"2"}, args) + require.True(t, onStart) + require.Equal(t, stateFinished.String(), onExit) + require.ElementsMatch(t, []string{ + "finished/starting", + "starting/running", + "running/finished", + }, onState) + lock.Unlock() +} diff --git a/process/scheduler.go b/process/scheduler.go new file mode 100644 index 00000000..dbed44ae --- /dev/null +++ b/process/scheduler.go @@ -0,0 +1,69 @@ +package process + +import ( + "fmt" + "time" + + "github.com/adhocore/gronx" +) + +type Scheduler interface { + // Next returns the duration until the next scheduled time in reference + // to time.Npw(). If there's no next scheduled time, a negative duration + // and an error will be returned. + Next() (time.Duration, error) + + // NextAfter returns the same as Next(), but with the given reference + // time. + NextAfter(after time.Time) (time.Duration, error) +} + +type scheduler struct { + pattern string + pit time.Time + isCron bool +} + +func NewScheduler(pattern string) (Scheduler, error) { + s := &scheduler{} + + t, err := time.Parse(time.RFC3339, pattern) + if err == nil { + s.pit = t + s.isCron = false + } else { + cron := gronx.New() + if !cron.IsValid(pattern) { + return nil, err + } + s.pattern = pattern + s.isCron = true + } + + return s, nil +} + +func (s *scheduler) Next() (time.Duration, error) { + return s.NextAfter(time.Now()) +} + +func (s *scheduler) NextAfter(after time.Time) (time.Duration, error) { + var t time.Time + var err error + + if s.isCron { + t, err = gronx.NextTickAfter(s.pattern, after, false) + if err != nil { + return time.Duration(-1), fmt.Errorf("no next time has been scheduled") + } + } else { + t = s.pit + } + + d := t.Sub(after) + if d < time.Duration(0) { + return d, fmt.Errorf("no next time has been scheduled") + } + + return d, nil +} diff --git a/process/scheduler_test.go b/process/scheduler_test.go new file mode 100644 index 00000000..c1d6da18 --- /dev/null +++ b/process/scheduler_test.go @@ -0,0 +1,45 @@ +package process + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSchedulerPointInTime(t *testing.T) { + s, err := NewScheduler("2023-03-20T11:06:39Z") + require.NoError(t, err) + + p, err := time.Parse(time.RFC3339, "2023-03-20T11:05:39Z") + require.NoError(t, err) + + d, err := s.NextAfter(p) + require.NoError(t, err) + require.Equal(t, time.Minute, d) + + p, err = time.Parse(time.RFC3339, "2023-03-20T11:07:39Z") + require.NoError(t, err) + + _, err = s.NextAfter(p) + require.Error(t, err) +} + +func TestSchedulerCron(t *testing.T) { + s, err := NewScheduler("* * * * *") + require.NoError(t, err) + + sc := s.(*scheduler) + require.True(t, sc.isCron) + + p, err := time.Parse(time.RFC3339, "2023-03-20T11:05:39Z") + require.NoError(t, err) + + d, err := s.NextAfter(p) + require.NoError(t, err) + require.Equal(t, 21*time.Second, d) + + d, err = s.NextAfter(p.Add(21 * time.Second)) + require.NoError(t, err) + require.Equal(t, 60*time.Second, d) +} diff --git a/restream/app/process.go b/restream/app/process.go index 4ec6036a..b84da534 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -44,6 +44,8 @@ type Config struct { ReconnectDelay uint64 `json:"reconnect_delay_seconds"` // seconds Autostart bool `json:"autostart"` StaleTimeout uint64 `json:"stale_timeout_seconds"` // seconds + Timeout uint64 `json:"timeout_seconds"` // seconds + Scheduler string `json:"scheduler"` LimitCPU float64 `json:"limit_cpu_usage"` // percent LimitMemory uint64 `json:"limit_memory_bytes"` // bytes LimitWaitFor uint64 `json:"limit_waitfor_seconds"` // seconds @@ -58,6 +60,8 @@ func (config *Config) Clone() *Config { ReconnectDelay: config.ReconnectDelay, Autostart: config.Autostart, StaleTimeout: config.StaleTimeout, + Timeout: config.Timeout, + Scheduler: config.Scheduler, LimitCPU: config.LimitCPU, LimitMemory: config.LimitMemory, LimitWaitFor: config.LimitWaitFor, diff --git a/restream/restream.go b/restream/restream.go index 0089d529..ff4d9622 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -360,6 +360,8 @@ func (r *restream) load() error { Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, + Timeout: time.Duration(t.config.Timeout) * time.Second, + Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, @@ -506,6 +508,8 @@ func (r *restream) createTask(config *app.Config) (*task, error) { Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, + Timeout: time.Duration(t.config.Timeout) * time.Second, + Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, @@ -1221,6 +1225,8 @@ func (r *restream) reloadProcess(id string) error { Reconnect: t.config.Reconnect, ReconnectDelay: time.Duration(t.config.ReconnectDelay) * time.Second, StaleTimeout: time.Duration(t.config.StaleTimeout) * time.Second, + Timeout: time.Duration(t.config.Timeout) * time.Second, + Scheduler: t.config.Scheduler, Args: t.command, Parser: t.parser, Logger: t.logger, @@ -1268,12 +1274,8 @@ func (r *restream) GetProcessState(id string) (*app.State, error) { state.Command = status.CommandArgs state.LastLog = task.parser.LastLogline() - if state.Order == "start" && !task.ffmpeg.IsRunning() && task.config.Reconnect { - state.Reconnect = float64(task.config.ReconnectDelay) - state.Duration - - if state.Reconnect < 0 { - state.Reconnect = 0 - } + if status.Reconnect >= time.Duration(0) { + state.Reconnect = status.Reconnect.Round(10 * time.Millisecond).Seconds() } convertProgressFromParser(&state.Progress, task.parser.Progress()) diff --git a/vendor/github.com/adhocore/gronx/.editorconfig b/vendor/github.com/adhocore/gronx/.editorconfig new file mode 100644 index 00000000..e0e124ae --- /dev/null +++ b/vendor/github.com/adhocore/gronx/.editorconfig @@ -0,0 +1,13 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true + +[*.go] +indent_style = tab +tab_width = 2 diff --git a/vendor/github.com/adhocore/gronx/.gitignore b/vendor/github.com/adhocore/gronx/.gitignore new file mode 100644 index 00000000..b085b1d8 --- /dev/null +++ b/vendor/github.com/adhocore/gronx/.gitignore @@ -0,0 +1,7 @@ +.idea/ +.DS_Store +*~ +*.out +vendor/ +dist/ +.env diff --git a/vendor/github.com/adhocore/gronx/CHANGELOG.md b/vendor/github.com/adhocore/gronx/CHANGELOG.md new file mode 100644 index 00000000..9224d685 --- /dev/null +++ b/vendor/github.com/adhocore/gronx/CHANGELOG.md @@ -0,0 +1,119 @@ +## [v0.2.7](https://github.com/adhocore/gronx/releases/tag/v0.2.7) (2022-06-28) + +### Miscellaneous +- **Workflow**: Run tests on 1.18x (Jitendra) +- Tests for go v1.17.x, add codecov (Jitendra) + + +## [v0.2.6](https://github.com/adhocore/gronx/releases/tag/v0.2.6) (2021-10-14) + +### Miscellaneous +- Fix 'with' languages (Jitendra Adhikari) [_a813b55_](https://github.com/adhocore/gronx/commit/a813b55) +- Init/setup github codeql (Jitendra Adhikari) [_fe2aa5a_](https://github.com/adhocore/gronx/commit/fe2aa5a) + + +## [v0.2.5](https://github.com/adhocore/gronx/releases/tag/v0.2.5) (2021-07-25) + +### Bug Fixes +- **Tasker**: The clause should be using OR (Jitendra Adhikari) [_b813b85_](https://github.com/adhocore/gronx/commit/b813b85) + + +## [v0.2.4](https://github.com/adhocore/gronx/releases/tag/v0.2.4) (2021-05-05) + +### Features +- **Pkg.tasker**: Capture cmd output in tasker logger, error in stderr (Jitendra Adhikari) [_0da0aae_](https://github.com/adhocore/gronx/commit/0da0aae) + +### Internal Refactors +- **Cmd.tasker**: Taskify is now method of tasker (Jitendra Adhikari) [_8b1373b_](https://github.com/adhocore/gronx/commit/8b1373b) + + +## [v0.2.3](https://github.com/adhocore/gronx/releases/tag/v0.2.3) (2021-05-04) + +### Bug Fixes +- **Pkg.tasker**: Sleep 100ms so abort can be bailed asap, remove dup msg (Jitendra Adhikari) [_d868920_](https://github.com/adhocore/gronx/commit/d868920) + +### Miscellaneous +- Allow leeway period at the end (Jitendra Adhikari) [_5ebf923_](https://github.com/adhocore/gronx/commit/5ebf923) + + +## [v0.2.2](https://github.com/adhocore/gronx/releases/tag/v0.2.2) (2021-05-03) + +### Bug Fixes +- **Pkg.tasker**: DoRun checks if timed out before run (Jitendra Adhikari) [_f27a657_](https://github.com/adhocore/gronx/commit/f27a657) + +### Internal Refactors +- **Pkg.tasker**: Use dateFormat var, update final tick phrase (Jitendra Adhikari) [_fad0271_](https://github.com/adhocore/gronx/commit/fad0271) + + +## [v0.2.1](https://github.com/adhocore/gronx/releases/tag/v0.2.1) (2021-05-02) + +### Bug Fixes +- **Pkg.tasker**: Deprecate sleep dur if next tick timeout (Jitendra Adhikari) [_3de45a1_](https://github.com/adhocore/gronx/commit/3de45a1) + + +## [v0.2.0](https://github.com/adhocore/gronx/releases/tag/v0.2.0) (2021-05-02) + +### Features +- **Cmd.tasker**: Add tasker for standalone usage as task daemon (Jitendra Adhikari) [_0d99409_](https://github.com/adhocore/gronx/commit/0d99409) +- **Pkg.tasker**: Add parser for tasker pkg (Jitendra Adhikari) [_e7f1811_](https://github.com/adhocore/gronx/commit/e7f1811) +- **Pkg.tasker**: Add tasker pkg (Jitendra Adhikari) [_a57b1c4_](https://github.com/adhocore/gronx/commit/a57b1c4) + +### Bug Fixes +- **Pkg.tasker**: Use log.New() instead (Jitendra Adhikari) [_0cf2c07_](https://github.com/adhocore/gronx/commit/0cf2c07) +- **Validator**: This check is not really required (Jitendra Adhikari) [_c3d75e3_](https://github.com/adhocore/gronx/commit/c3d75e3) + +### Internal Refactors +- **Gronx**: Add public methods for internal usage, expose spaceRe (Jitendra Adhikari) [_94eb20b_](https://github.com/adhocore/gronx/commit/94eb20b) + +### Miscellaneous +- **Pkg.tasker**: Use file perms as octal (Jitendra Adhikari) [_83f258d_](https://github.com/adhocore/gronx/commit/83f258d) +- **Workflow**: Include all tests in action (Jitendra Adhikari) [_7328cbf_](https://github.com/adhocore/gronx/commit/7328cbf) + +### Documentations +- Add task mangager and tasker docs/usages (Jitendra Adhikari) [_e77aa5f_](https://github.com/adhocore/gronx/commit/e77aa5f) + + +## [v0.1.4](https://github.com/adhocore/gronx/releases/tag/v0.1.4) (2021-04-25) + +### Miscellaneous +- **Mod**: 1.13 is okay too (Jitendra Adhikari) [_6c328e7_](https://github.com/adhocore/gronx/commit/6c328e7) +- Try go 1.13.x (Jitendra Adhikari) [_b017ec4_](https://github.com/adhocore/gronx/commit/b017ec4) + +### Documentations +- Practical usage (Jitendra Adhikari) [_9572e61_](https://github.com/adhocore/gronx/commit/9572e61) + + +## [v0.1.3](https://github.com/adhocore/gronx/releases/tag/v0.1.3) (2021-04-22) + +### Internal Refactors +- **Checker**: Preserve error, for pos 2 & 4 bail only on due or err (Jitendra Adhikari) [_39a9cd5_](https://github.com/adhocore/gronx/commit/39a9cd5) +- **Validator**: Do not discard error from strconv (Jitendra Adhikari) [_3b0f444_](https://github.com/adhocore/gronx/commit/3b0f444) + + +## [v0.1.2](https://github.com/adhocore/gronx/releases/tag/v0.1.2) (2021-04-21) + +### Features +- Add IsValid() (Jitendra Adhikari) [_150687b_](https://github.com/adhocore/gronx/commit/150687b) + +### Documentations +- IsValid usage (Jitendra Adhikari) [_b747116_](https://github.com/adhocore/gronx/commit/b747116) + + +## [v0.1.1](https://github.com/adhocore/gronx/releases/tag/v0.1.1) (2021-04-21) + +### Features +- Add main gronx api (Jitendra Adhikari) [_1b3b108_](https://github.com/adhocore/gronx/commit/1b3b108) +- Add cron segment checker (Jitendra Adhikari) [_a56be7c_](https://github.com/adhocore/gronx/commit/a56be7c) +- Add validator (Jitendra Adhikari) [_455a024_](https://github.com/adhocore/gronx/commit/455a024) + +### Miscellaneous +- **Workflow**: Update actions (Jitendra Adhikari) [_8b54cc3_](https://github.com/adhocore/gronx/commit/8b54cc3) +- Init module (Jitendra Adhikari) [_bada37d_](https://github.com/adhocore/gronx/commit/bada37d) +- Add license (Jitendra Adhikari) [_5f20b96_](https://github.com/adhocore/gronx/commit/5f20b96) +- **Gh**: Add meta files (Jitendra Adhikari) [_35a1310_](https://github.com/adhocore/gronx/commit/35a1310) +- **Workflow**: Add lint/test actions (Jitendra Adhikari) [_884d5cb_](https://github.com/adhocore/gronx/commit/884d5cb) +- Add editorconfig (Jitendra Adhikari) [_8b75494_](https://github.com/adhocore/gronx/commit/8b75494) + +### Documentations +- On cron expressions (Jitendra Adhikari) [_547fd72_](https://github.com/adhocore/gronx/commit/547fd72) +- Add readme (Jitendra Adhikari) [_3955e88_](https://github.com/adhocore/gronx/commit/3955e88) diff --git a/vendor/github.com/adhocore/gronx/LICENSE b/vendor/github.com/adhocore/gronx/LICENSE new file mode 100644 index 00000000..f1148764 --- /dev/null +++ b/vendor/github.com/adhocore/gronx/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021-2099 Jitendra Adhikari + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/adhocore/gronx/README.md b/vendor/github.com/adhocore/gronx/README.md new file mode 100644 index 00000000..59edd0aa --- /dev/null +++ b/vendor/github.com/adhocore/gronx/README.md @@ -0,0 +1,250 @@ +# adhocore/gronx + +[![Latest Version](https://img.shields.io/github/release/adhocore/gronx.svg?style=flat-square)](https://github.com/adhocore/gronx/releases) +[![Software License](https://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat-square)](LICENSE) +[![Go Report](https://goreportcard.com/badge/github.com/adhocore/gronx)](https://goreportcard.com/report/github.com/adhocore/gronx) +[![Test](https://github.com/adhocore/gronx/actions/workflows/test-action.yml/badge.svg)](https://github.com/adhocore/gronx/actions/workflows/test-action.yml) +[![Lint](https://github.com/adhocore/gronx/actions/workflows/lint-action.yml/badge.svg)](https://github.com/adhocore/gronx/actions/workflows/lint-action.yml) +[![Codecov](https://img.shields.io/codecov/c/github/adhocore/gronx/main.svg?style=flat-square)](https://codecov.io/gh/adhocore/gronx) +[![Donate 15](https://img.shields.io/badge/donate-paypal-blue.svg?style=flat-square&label=donate+15)](https://www.paypal.me/ji10/15usd) +[![Donate 25](https://img.shields.io/badge/donate-paypal-blue.svg?style=flat-square&label=donate+25)](https://www.paypal.me/ji10/25usd) +[![Donate 50](https://img.shields.io/badge/donate-paypal-blue.svg?style=flat-square&label=donate+50)](https://www.paypal.me/ji10/50usd) +[![Tweet](https://img.shields.io/twitter/url/http/shields.io.svg?style=social)](https://twitter.com/intent/tweet?text=Lightweight+fast+and+deps+free+cron+expression+parser+for+Golang&url=https://github.com/adhocore/gronx&hashtags=go,golang,parser,cron,cronexpr,cronparser) + +`gronx` is Golang cron expression parser ported from [adhocore/cron-expr](https://github.com/adhocore/php-cron-expr) with task runner +and daemon that supports crontab like task list file. Use it programatically in Golang or as standalone binary instead of crond. + +- Zero dependency. +- Very **fast** because it bails early in case a segment doesn't match. + +Find gronx in [pkg.go.dev](https://pkg.go.dev/github.com/adhocore/gronx). + +## Installation + +```sh +go get -u github.com/adhocore/gronx +``` + +## Usage + +```go +import ( + "time" + + "github.com/adhocore/gronx" +) + +gron := gronx.New() +expr := "* * * * *" + +// check if expr is even valid, returns bool +gron.IsValid(expr) // true + +// check if expr is due for current time, returns bool and error +gron.IsDue(expr) // true|false, nil + +// check if expr is due for given time +gron.IsDue(expr, time.Date(2021, time.April, 1, 1, 1, 0, 0, time.UTC)) // true|false, nil +``` + +### Next Tick + +To find out when is the cron due next (onwards): +```go +allowCurrent = true // includes current time as well +nextTime, err := gron.NextTick(expr, allowCurrent) // gives time.Time, error + +// OR, next tick after certain reference time +refTime = time.Date(2022, time.November, 1, 1, 1, 0, 0, time.UTC) +allowCurrent = false // excludes the ref time +nextTime, err := gron.NextTickAfter(expr, refTime, allowCurrent) // gives time.Time, error +``` + +### Standalone Daemon + +In a more practical level, you would use this tool to manage and invoke jobs in app itself and not +mess around with `crontab` for each and every new tasks/jobs. ~~It doesn't yet replace that but rather supplements it. +There is a plan though [#1](https://github.com/adhocore/gronx/issues/1)~~. + +In crontab just put one entry with `* * * * *` which points to your Go entry point that uses this tool. +Then in that entry point you would invoke different tasks if the corresponding Cron expr is due. +Simple map structure would work for this. + +Check the section below for more sophisticated way of managing tasks automatically using `gronx` daemon called `tasker`. + +--- +### Go Tasker + +Tasker is a task manager that can be programatically used in Golang applications. It runs as a daemon and and invokes tasks scheduled with cron expression: +```go +package main + +import ( + "context" + "time" + + "github.com/adhocore/gronx/pkg/tasker" +) + +func main() { + taskr := tasker.New(tasker.Option{ + Verbose: true, + // optional: defaults to local + Tz: "Asia/Bangkok", + // optional: defaults to stderr log stream + Out: "/full/path/to/output-file", + }) + + // add task to run every minute + taskr.Task("* * * * *", func(ctx context.Context) (int, error) { + // do something ... + + // then return exit code and error, for eg: if everything okay + return 0, nil + }).Task("*/5 * * * *", func(ctx context.Context) (int, error) { // every 5 minutes + // you can also log the output to Out file as configured in Option above: + taskr.Log.Printf("done something in %d s", 2) + + return 0, nil + }) + + // every 10 minute with arbitrary command + taskr.Task("@10minutes", taskr.Taskify("command --option val -- args")) + + // ... add more tasks + + // optionally if you want tasker to stop after 2 hour, pass the duration with Until(): + taskr.Until(2 * time.Hour) + + // finally run the tasker, it ticks sharply on every minute and runs all the tasks due on that time! + // it exits gracefully when ctrl+c is received making sure pending tasks are completed. + taskr.Run() +} +``` + +### Task Daemon +It can also be used as standalone task daemon instead of programmatic usage for Golang application. + +First, just install tasker command: +```sh +go get -u github.com/adhocore/gronx/cmd/tasker +``` + +Then prepare a taskfile ([example](./tests/../test/taskfile.txt)) in crontab format +(or can even point to existing crontab). +> `user` is not supported: it is just cron expr followed by the command. + +Finally run the task daemon like so +``` +tasker -file path/to/taskfile +``` +> You can pass more options to control the behavior of task daemon, see below. + +#### Tasker command options: +```txt +-file string + The task file in crontab format +-out string + The fullpath to file where output from tasks are sent to +-shell string + The shell to use for running tasks (default "/usr/bin/bash") +-tz string + The timezone to use for tasks (default "Local") +-until int + The timeout for task daemon in minutes +-verbose + The verbose mode outputs as much as possible +``` + +Examples: +```sh +tasker -verbose -file path/to/taskfile -until 120 # run until next 120min (i.e 2hour) with all feedbacks echoed back +tasker -verbose -file path/to/taskfile -out path/to/output # with all feedbacks echoed to the output file +tasker -tz America/New_York -file path/to/taskfile -shell zsh # run all tasks using zsh shell based on NY timezone +``` + +> File extension of taskfile for (`-file` option) does not matter: can be any or none. +> The directory for outfile (`-out` option) must exist, file is created by task daemon. + +> Same timezone applies for all tasks currently and it might support overriding timezone per task in future release. + +#### Notes on Windows +In Windows if it doesn't find `bash.exe` or `git-bash.exe` it will use `powershell`. +`powershell` may not be compatible with Unix flavored commands. Also to note: +you can't do chaining with `cmd1 && cmd2` but rather `cmd1 ; cmd2`. + +--- +### Cron Expression + +Cron expression normally consists of 5 segments viz: +``` + +``` +and sometimes there can be 6th segment for `` at the end. + +For each segments you can have multiple choices separated by comma: +> Eg: `0,30 * * * *` means either 0th or 30th minute. + +To specify range of values you can use dash: +> Eg: `10-15 * * * *` means 10th, 11th, 12th, 13th, 14th and 15th minute. + +To specify range of step you can combine a dash and slash: +> Eg: `10-15/2 * * * *` means every 2 minutes between 10 and 15 i.e 10th, 12th and 14th minute. + +For the 3rd and 5th segment, there are additional [modifiers](#modifiers) (optional). + +And if you want, you can mix them up: +> `5,12-20/4,55 * * * *` matches if any one of `5` or `12-20/4` or `55` matches the minute. + +### Real Abbreviations + +You can use real abbreviations for month and week days. eg: `JAN`, `dec`, `fri`, `SUN` + +### Tags + +Following tags are available and they are converted to real cron expressions before parsing: + +- *@yearly* or *@annually* - every year +- *@monthly* - every month +- *@daily* - every day +- *@weekly* - every week +- *@hourly* - every hour +- *@5minutes* - every 5 minutes +- *@10minutes* - every 10 minutes +- *@15minutes* - every 15 minutes +- *@30minutes* - every 30 minutes +- *@always* - every minute + +```go +gron.IsDue("@5minutes") +``` + +### Modifiers + +Following modifiers supported + +- *Day of Month / 3rd segment:* + - `L` stands for last day of month (eg: `L` could mean 29th for February in leap year) + - `W` stands for closest week day (eg: `10W` is closest week days (MON-FRI) to 10th date) +- *Day of Week / 5th segment:* + - `L` stands for last weekday of month (eg: `2L` is last monday) + - `#` stands for nth day of week in the month (eg: `1#2` is second sunday) + +--- +## License + +> © [MIT](./LICENSE) | 2021-2099, Jitendra Adhikari + +## Credits + +This project is ported from [adhocore/cron-expr](https://github.com/adhocore/php-cron-expr) and +release managed by [please](https://github.com/adhocore/please). + +--- +### Other projects +My other golang projects you might find interesting and useful: + +- [**urlsh**](https://github.com/adhocore/urlsh) - URL shortener and bookmarker service with UI, API, Cache, Hits Counter and forwarder using postgres and redis in backend, bulma in frontend; has [web](https://urlssh.xyz) and cli client +- [**fast**](https://github.com/adhocore/fast) - Check your internet speed with ease and comfort right from the terminal +- [**goic**](https://github.com/adhocore/goic) - Go Open ID Connect, is OpenID connect client library for Golang, supports the Authorization Code Flow of OpenID Connect specification. +- [**chin**](https://github.com/adhocore/chin) - A GO lang command line tool to show a spinner as user waits for some long running jobs to finish. diff --git a/vendor/github.com/adhocore/gronx/VERSION b/vendor/github.com/adhocore/gronx/VERSION new file mode 100644 index 00000000..34707cbb --- /dev/null +++ b/vendor/github.com/adhocore/gronx/VERSION @@ -0,0 +1 @@ +v0.2.7 diff --git a/vendor/github.com/adhocore/gronx/checker.go b/vendor/github.com/adhocore/gronx/checker.go new file mode 100644 index 00000000..46988d4a --- /dev/null +++ b/vendor/github.com/adhocore/gronx/checker.go @@ -0,0 +1,110 @@ +package gronx + +import ( + "strconv" + "strings" + "time" +) + +// Checker is interface for cron segment due check. +type Checker interface { + GetRef() time.Time + SetRef(ref time.Time) + CheckDue(segment string, pos int) (bool, error) +} + +// SegmentChecker is factory implementation of Checker. +type SegmentChecker struct { + ref time.Time +} + +// GetRef returns the current reference time +func (c *SegmentChecker) GetRef() time.Time { + return c.ref +} + +// SetRef sets the reference time for which to check if a cron expression is due. +func (c *SegmentChecker) SetRef(ref time.Time) { + c.ref = ref +} + +// CheckDue checks if the cron segment at given position is due. +// It returns bool or error if any. +func (c *SegmentChecker) CheckDue(segment string, pos int) (bool, error) { + ref, last := c.GetRef(), -1 + val, loc := valueByPos(ref, pos), ref.Location() + + for _, offset := range strings.Split(segment, ",") { + mod := pos == 2 || pos == 4 + due, err := c.isOffsetDue(offset, val, pos) + + if due || (!mod && err != nil) { + return due, err + } + if mod && !strings.ContainsAny(offset, "LW#") { + continue + } + if last == -1 { + last = time.Date(ref.Year(), ref.Month(), 1, 0, 0, 0, 0, loc).AddDate(0, 1, 0).Add(-time.Nanosecond).Day() + } + if pos == 2 { + due, err = isValidMonthDay(offset, last, ref) + } else if pos == 4 { + due, err = isValidWeekDay(offset, last, ref) + } + if due || err != nil { + return due, err + } + } + + return false, nil +} + +func (c *SegmentChecker) isOffsetDue(offset string, val, pos int) (bool, error) { + if offset == "*" || offset == "?" { + return true, nil + } + if strings.Contains(offset, "/") { + return inStep(val, offset) + } + if strings.Contains(offset, "-") { + if pos == 4 { + offset = strings.Replace(offset, "7-", "0-", 1) + } + return inRange(val, offset) + } + + if pos != 4 && (val == 0 || offset == "0") { + return offset == "0" && val == 0, nil + } + + nval, err := strconv.Atoi(offset) + if err != nil { + return false, err + } + + if pos == 4 && nval == 7 { + nval = 0 + } + + return nval == val, nil +} + +func valueByPos(ref time.Time, pos int) int { + switch pos { + case 0: + return ref.Minute() + case 1: + return ref.Hour() + case 2: + return ref.Day() + case 3: + return int(ref.Month()) + case 4: + return int(ref.Weekday()) + case 5: + return ref.Year() + } + + return 0 +} diff --git a/vendor/github.com/adhocore/gronx/gronx.go b/vendor/github.com/adhocore/gronx/gronx.go new file mode 100644 index 00000000..4b57da2a --- /dev/null +++ b/vendor/github.com/adhocore/gronx/gronx.go @@ -0,0 +1,105 @@ +package gronx + +import ( + "errors" + "regexp" + "strings" + "time" +) + +var literals = strings.NewReplacer( + "SUN", "0", "MON", "1", "TUE", "2", "WED", "3", "THU", "4", "FRI", "5", "SAT", "6", + "JAN", "1", "FEB", "2", "MAR", "3", "APR", "4", "MAY", "5", "JUN", "6", "JUL", "7", + "AUG", "8", "SEP", "9", "OCT", "10", "NOV", "11", "DEC", "12", +) + +var expressions = map[string]string{ + "@yearly": "0 0 1 1 *", + "@annually": "0 0 1 1 *", + "@monthly": "0 0 1 * *", + "@weekly": "0 0 * * 0", + "@daily": "0 0 * * *", + "@hourly": "0 * * * *", + "@always": "* * * * *", + "@5minutes": "*/5 * * * *", + "@10minutes": "*/10 * * * *", + "@15minutes": "*/15 * * * *", + "@30minutes": "0,30 * * * *", +} + +// SpaceRe is regex for whitespace. +var SpaceRe = regexp.MustCompile(`\s+`) + +func normalize(expr string) []string { + expr = strings.Trim(expr, " \t") + if e, ok := expressions[strings.ToLower(expr)]; ok { + expr = e + } + + expr = SpaceRe.ReplaceAllString(expr, " ") + expr = literals.Replace(strings.ToUpper(expr)) + + return strings.Split(strings.ReplaceAll(expr, " ", " "), " ") +} + +// Gronx is the main program. +type Gronx struct { + C Checker +} + +// New initializes Gronx with factory defaults. +func New() Gronx { + return Gronx{&SegmentChecker{}} +} + +// IsDue checks if cron expression is due for given reference time (or now). +// It returns bool or error if any. +func (g *Gronx) IsDue(expr string, ref ...time.Time) (bool, error) { + if len(ref) > 0 { + g.C.SetRef(ref[0]) + } else { + g.C.SetRef(time.Now()) + } + + segs, err := Segments(expr) + if err != nil { + return false, err + } + + return g.SegmentsDue(segs) +} + +// Segments splits expr into array array of cron parts. +// It returns array or error. +func Segments(expr string) ([]string, error) { + segs := normalize(expr) + if len(segs) < 5 || len(segs) > 6 { + return []string{}, errors.New("expr should contain 5-6 segments separated by space") + } + + return segs, nil +} + +// SegmentsDue checks if all cron parts are due. +// It returns bool. You should use IsDue(expr) instead. +func (g *Gronx) SegmentsDue(segments []string) (bool, error) { + for pos, seg := range segments { + if seg == "*" || seg == "?" { + continue + } + + if due, err := g.C.CheckDue(seg, pos); !due { + return due, err + } + } + + return true, nil +} + +// IsValid checks if cron expression is valid. +// It returns bool. +func (g *Gronx) IsValid(expr string) bool { + _, err := g.IsDue(expr) + + return err == nil +} diff --git a/vendor/github.com/adhocore/gronx/next.go b/vendor/github.com/adhocore/gronx/next.go new file mode 100644 index 00000000..7700fe91 --- /dev/null +++ b/vendor/github.com/adhocore/gronx/next.go @@ -0,0 +1,122 @@ +package gronx + +import ( + "errors" + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +// CronDateFormat is Y-m-d H:i (seconds are not significant) +const CronDateFormat = "2006-01-02 15:04" + +// FullDateFormat is Y-m-d H:i:s (with seconds) +const FullDateFormat = "2006-01-02 15:04:05" + +// NextTick gives next run time from now +func NextTick(expr string, inclRefTime bool) (time.Time, error) { + return NextTickAfter(expr, time.Now(), inclRefTime) +} + +// NextTickAfter gives next run time from the provided time.Time +func NextTickAfter(expr string, start time.Time, inclRefTime bool) (time.Time, error) { + gron, next := New(), start.Truncate(time.Minute) + due, err := gron.IsDue(expr, start) + if err != nil || (due && inclRefTime) { + return start, err + } + + segments, _ := Segments(expr) + if len(segments) > 5 && isPastYear(segments[5], next, inclRefTime) { + return next, fmt.Errorf("unreachable year segment: %s", segments[5]) + } + + if next, err = loop(gron, segments, next, inclRefTime); err != nil { + // Ignore superfluous err + if due, _ = gron.IsDue(expr, next); due { + err = nil + } + } + return next, err +} + +func loop(gron Gronx, segments []string, start time.Time, incl bool) (next time.Time, err error) { + iter, next, bumped := 1000, start, false + for iter > 0 { + over: + iter-- + for pos, seg := range segments { + if seg == "*" || seg == "?" { + continue + } + if next, bumped, err = bumpUntilDue(gron.C, seg, pos, next); bumped { + goto over + } + } + if !incl && next.Format(CronDateFormat) == start.Format(CronDateFormat) { + next, _, err = bumpUntilDue(gron.C, segments[0], 0, next.Add(time.Minute)) + continue + } + return next, err + } + return start, errors.New("tried so hard") +} + +var dashRe = regexp.MustCompile(`/.*$`) + +func isPastYear(year string, ref time.Time, incl bool) bool { + if year == "*" || year == "?" { + return false + } + + min := ref.Year() + if !incl { + min++ + } + for _, offset := range strings.Split(year, ",") { + if strings.Index(offset, "*/") == 0 || strings.Index(offset, "0/") == 0 { + return false + } + for _, part := range strings.Split(dashRe.ReplaceAllString(offset, ""), "-") { + val, err := strconv.Atoi(part) + if err != nil || val >= min { + return false + } + } + } + return true +} + +var limit = map[int]int{0: 60, 1: 24, 2: 31, 3: 12, 4: 366, 5: 100} + +func bumpUntilDue(c Checker, segment string, pos int, ref time.Time) (time.Time, bool, error) { + // + iter := limit[pos] + for iter > 0 { + c.SetRef(ref) + if ok, _ := c.CheckDue(segment, pos); ok { + return ref, iter != limit[pos], nil + } + ref = bump(ref, pos) + iter-- + } + return ref, false, errors.New("tried so hard") +} + +func bump(ref time.Time, pos int) time.Time { + switch pos { + case 0: + ref = ref.Add(time.Minute) + case 1: + ref = ref.Add(time.Hour) + case 2, 4: + ref = ref.AddDate(0, 0, 1) + case 3: + ref = ref.AddDate(0, 1, 0) + case 5: + ref = ref.AddDate(1, 0, 0) + } + return ref +} diff --git a/vendor/github.com/adhocore/gronx/validator.go b/vendor/github.com/adhocore/gronx/validator.go new file mode 100644 index 00000000..074415b8 --- /dev/null +++ b/vendor/github.com/adhocore/gronx/validator.go @@ -0,0 +1,136 @@ +package gronx + +import ( + "errors" + "strconv" + "strings" + "time" +) + +func inStep(val int, s string) (bool, error) { + parts := strings.Split(s, "/") + step, err := strconv.Atoi(parts[1]) + if err != nil { + return false, err + } + if step == 0 { + return false, errors.New("step can't be 0") + } + + if strings.Index(s, "*/") == 0 || strings.Index(s, "0/") == 0 { + return val%step == 0, nil + } + + sub, end := strings.Split(parts[0], "-"), val + start, err := strconv.Atoi(sub[0]) + if err != nil { + return false, err + } + + if len(sub) > 1 { + end, err = strconv.Atoi(sub[1]) + if err != nil { + return false, err + } + } + + return inStepRange(val, start, end, step), nil +} + +func inRange(val int, s string) (bool, error) { + parts := strings.Split(s, "-") + start, err := strconv.Atoi(parts[0]) + if err != nil { + return false, err + } + + end, err := strconv.Atoi(parts[1]) + if err != nil { + return false, err + } + + return start <= val && val <= end, nil +} + +func inStepRange(val, start, end, step int) bool { + for i := start; i <= end && i <= val; i += step { + if i == val { + return true + } + } + return false +} + +func isValidMonthDay(val string, last int, ref time.Time) (bool, error) { + day, loc := ref.Day(), ref.Location() + if val == "L" { + return day == last, nil + } + + pos := strings.Index(val, "W") + if pos < 1 { + return false, errors.New("invalid offset value: " + val) + } + + nval, err := strconv.Atoi(val[0:pos]) + if err != nil { + return false, err + } + + for _, i := range []int{0, -1, 1, -2, 2} { + incr := i + nval + if incr > 0 && incr <= last { + iref := time.Date(ref.Year(), ref.Month(), incr, ref.Hour(), ref.Minute(), ref.Second(), 0, loc) + week := int(iref.Weekday()) + + if week > 0 && week < 6 && iref.Month() == ref.Month() { + return day == iref.Day(), nil + } + } + } + + return false, nil +} + +func isValidWeekDay(val string, last int, ref time.Time) (bool, error) { + loc := ref.Location() + if pos := strings.Index(strings.ReplaceAll(val, "7L", "0L"), "L"); pos > 0 { + nval, err := strconv.Atoi(val[0:pos]) + if err != nil { + return false, err + } + + for i := 0; i < 7; i++ { + decr := last - i + dref := time.Date(ref.Year(), ref.Month(), decr, ref.Hour(), ref.Minute(), ref.Second(), ref.Nanosecond(), loc) + + if int(dref.Weekday()) == nval { + return ref.Day() == decr, nil + } + } + + return false, nil + } + + pos := strings.Index(val, "#") + parts := strings.Split(strings.ReplaceAll(val, "7#", "0#"), "#") + if pos < 1 || len(parts) < 2 { + return false, errors.New("invalid offset value: " + val) + } + + day, err := strconv.Atoi(parts[0]) + if err != nil { + return false, err + } + + nth, err := strconv.Atoi(parts[1]) + if err != nil { + return false, err + } + + if day < 0 || day > 7 || nth < 1 || nth > 5 || int(ref.Weekday()) != day { + return false, nil + } + + return ref.Day()/7 == nth-1, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3f677268..bb331e75 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -30,6 +30,9 @@ github.com/KyleBanks/depth # github.com/Masterminds/semver/v3 v3.1.1 ## explicit; go 1.12 github.com/Masterminds/semver/v3 +# github.com/adhocore/gronx v1.1.2 +## explicit; go 1.13 +github.com/adhocore/gronx # github.com/agnivade/levenshtein v1.1.1 ## explicit; go 1.13 github.com/agnivade/levenshtein