mirror of
https://github.com/rfyiamcool/cronlib.git
synced 2025-09-27 08:12:07 +08:00
489 lines
8.1 KiB
Go
489 lines
8.1 KiB
Go
package cronlib
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// copy robfig/cron's crontab parser to cronlib.cron_parser.go
|
|
// "github.com/robfig/cron"
|
|
|
|
const (
|
|
OnMode = true
|
|
OffMode = false
|
|
)
|
|
|
|
var (
|
|
ErrNotFoundJob = errors.New("not found job")
|
|
ErrAlreadyRegister = errors.New("the job already in pool")
|
|
ErrJobDOFuncNil = errors.New("callback func is nil")
|
|
ErrCronSpecInvalid = errors.New("crontab spec is invalid")
|
|
)
|
|
|
|
// null logger
|
|
var defualtLogger = func(level, s string) {}
|
|
|
|
type loggerType func(level, s string)
|
|
|
|
func SetLogger(logger loggerType) {
|
|
defualtLogger = logger
|
|
}
|
|
|
|
// panic call
|
|
var panicCaller = func(srv, err string) {
|
|
}
|
|
|
|
type panicType func(srv, err string)
|
|
|
|
func SetPanicCaller(p panicType) {
|
|
panicCaller = p
|
|
}
|
|
|
|
// New - create CronSchduler
|
|
func New() *CronSchduler {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &CronSchduler{
|
|
tasks: make(map[string]*JobModel),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
wg: &sync.WaitGroup{},
|
|
once: &sync.Once{},
|
|
}
|
|
}
|
|
|
|
// CronSchduler
|
|
type CronSchduler struct {
|
|
tasks map[string]*JobModel
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
wg *sync.WaitGroup
|
|
once *sync.Once
|
|
|
|
sync.RWMutex
|
|
}
|
|
|
|
// Register - only register srv's job model, don't start auto.
|
|
func (c *CronSchduler) Register(srv string, model *JobModel) error {
|
|
return c.reset(srv, model, true, false)
|
|
}
|
|
|
|
// UpdateJobModel - stop old job, update srv's job model
|
|
func (c *CronSchduler) UpdateJobModel(srv string, model *JobModel) error {
|
|
return c.reset(srv, model, false, true)
|
|
}
|
|
|
|
// DynamicRegister - after cronlib already run, dynamic add a job, the job autostart by cronlib.
|
|
func (c *CronSchduler) DynamicRegister(srv string, model *JobModel) error {
|
|
return c.reset(srv, model, false, true)
|
|
}
|
|
|
|
// reset - reset srv model
|
|
func (c *CronSchduler) reset(srv string, model *JobModel, denyReplace, autoStart bool) error {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
// validate model
|
|
err := model.validate()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cctx, cancel := context.WithCancel(c.ctx)
|
|
model.ctx = cctx
|
|
model.cancel = cancel
|
|
model.srv = srv
|
|
|
|
oldModel, ok := c.tasks[srv]
|
|
if denyReplace && ok {
|
|
return ErrAlreadyRegister
|
|
}
|
|
|
|
if ok {
|
|
oldModel.kill()
|
|
}
|
|
|
|
c.tasks[srv] = model
|
|
if autoStart {
|
|
c.wg.Add(1)
|
|
go c.tasks[srv].runLoop(c.wg)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UnRegister - stop and delete srv
|
|
func (c *CronSchduler) UnRegister(srv string) error {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
oldModel, ok := c.tasks[srv]
|
|
if !ok {
|
|
return ErrNotFoundJob
|
|
}
|
|
|
|
oldModel.kill()
|
|
delete(c.tasks, srv)
|
|
return nil
|
|
}
|
|
|
|
// Stop - stop all cron job
|
|
func (c *CronSchduler) Stop() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
for srv, job := range c.tasks {
|
|
job.kill()
|
|
delete(c.tasks, srv)
|
|
}
|
|
c.cancel()
|
|
}
|
|
|
|
// StopService - stop job by serviceName
|
|
func (c *CronSchduler) StopService(srv string) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
job, ok := c.tasks[srv]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
job.kill()
|
|
delete(c.tasks, srv)
|
|
}
|
|
|
|
// StopServicePrefix - stop job by srv regex prefix.
|
|
// if regex = "risk.scan", stop risk.scan.total, risk.scan.user at the same time
|
|
func (c *CronSchduler) StopServicePrefix(regex string) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
// regex match
|
|
for srv, job := range c.tasks {
|
|
if !strings.HasPrefix(srv, regex) {
|
|
continue
|
|
}
|
|
|
|
job.kill()
|
|
delete(c.tasks, srv)
|
|
}
|
|
}
|
|
|
|
func validateSpec(spec string) bool {
|
|
_, err := Parse(spec)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func getNextDue(spec string) (time.Time, error) {
|
|
sc, err := Parse(spec)
|
|
if err != nil {
|
|
return time.Now(), err
|
|
}
|
|
|
|
// avoid time.sub
|
|
time.Sleep(10 * time.Millisecond)
|
|
due := sc.Next(time.Now())
|
|
return due, err
|
|
}
|
|
|
|
func getNextDueSafe(spec string, last time.Time) (time.Time, error) {
|
|
var (
|
|
due time.Time
|
|
err error
|
|
)
|
|
|
|
for {
|
|
due, err = getNextDue(spec)
|
|
if err != nil {
|
|
return due, err
|
|
}
|
|
|
|
if last.Equal(due) {
|
|
// avoid time.sub lost some accuracy, repeat do job.
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
return due, err
|
|
}
|
|
|
|
func (c *CronSchduler) Start() {
|
|
// only once call
|
|
c.once.Do(func() {
|
|
|
|
for _, job := range c.tasks {
|
|
c.wg.Add(1)
|
|
job.runLoop(c.wg)
|
|
}
|
|
|
|
})
|
|
}
|
|
|
|
// Wait - if all jobs is exited, return.
|
|
func (c *CronSchduler) Wait() {
|
|
c.wg.Wait()
|
|
}
|
|
|
|
// WaitStop - when stop cronlib controller, return.
|
|
func (c *CronSchduler) WaitStop() {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
}
|
|
}
|
|
|
|
func (c *CronSchduler) GetServiceCron(srv string) (*JobModel, error) {
|
|
c.RLock()
|
|
defer c.RUnlock()
|
|
|
|
oldModel, ok := c.tasks[srv]
|
|
if !ok {
|
|
return nil, ErrNotFoundJob
|
|
}
|
|
|
|
return oldModel, nil
|
|
}
|
|
|
|
// NewJobModel - defualt block sync callfunc
|
|
func NewJobModel(spec string, f func(), options ...JobOption) (*JobModel, error) {
|
|
var err error
|
|
job := &JobModel{
|
|
running: true,
|
|
async: false,
|
|
do: f,
|
|
spec: spec,
|
|
notifyChan: make(chan int, 1), // avoid block
|
|
}
|
|
|
|
for _, opt := range options {
|
|
if opt != nil {
|
|
if err := opt(job); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
err = job.validate()
|
|
if err != nil {
|
|
return job, err
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
type JobOption func(*JobModel) error
|
|
|
|
func AsyncMode() JobOption {
|
|
return func(o *JobModel) error {
|
|
o.async = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func TryCatchMode() JobOption {
|
|
return func(o *JobModel) error {
|
|
o.tryCatch = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type JobModel struct {
|
|
// srv name
|
|
srv string
|
|
|
|
// callfunc
|
|
do func()
|
|
|
|
// if async = true; go func() { do() }
|
|
async bool
|
|
|
|
// try catch panic
|
|
tryCatch bool
|
|
|
|
// cron spec
|
|
spec string
|
|
|
|
// for control
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
notifyChan chan int
|
|
|
|
// break for { ... } loop
|
|
running bool
|
|
|
|
// ensure job worker is exited already
|
|
exited bool
|
|
|
|
sync.RWMutex
|
|
}
|
|
|
|
func (j *JobModel) SetTryCatch(b bool) {
|
|
j.tryCatch = b
|
|
}
|
|
|
|
func (j *JobModel) SetAsyncMode(b bool) {
|
|
j.async = b
|
|
}
|
|
|
|
func (j *JobModel) validate() error {
|
|
if j.do == nil {
|
|
return ErrJobDOFuncNil
|
|
}
|
|
|
|
if _, err := getNextDue(j.spec); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (j *JobModel) runLoop(wg *sync.WaitGroup) {
|
|
go j.run(wg)
|
|
}
|
|
|
|
func (j *JobModel) run(wg *sync.WaitGroup) {
|
|
var (
|
|
// stdout do time cost
|
|
doTimeCostFunc = func() {
|
|
startTS := time.Now()
|
|
defualtLogger("info",
|
|
fmt.Sprintf("scheduler service: %s begin run",
|
|
j.srv,
|
|
),
|
|
)
|
|
|
|
if j.tryCatch {
|
|
tryCatch(j)
|
|
} else {
|
|
j.do()
|
|
}
|
|
|
|
defualtLogger("info",
|
|
fmt.Sprintf("scheduler service: %s has been finished, time cost: %s, spec: %s",
|
|
j.srv,
|
|
time.Since(startTS).String(),
|
|
j.spec,
|
|
),
|
|
)
|
|
}
|
|
|
|
timer *time.Timer
|
|
lastNextTime time.Time
|
|
due time.Time
|
|
interval time.Duration
|
|
|
|
err error
|
|
)
|
|
|
|
// parse crontab spec
|
|
due, err = getNextDue(j.spec)
|
|
interval = due.Sub(time.Now())
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
|
|
lastNextTime = due
|
|
defualtLogger("info",
|
|
fmt.Sprintf("scheduler service: %s next time is %s, sub: %s",
|
|
j.srv,
|
|
due.String(),
|
|
interval.String(),
|
|
),
|
|
)
|
|
|
|
// int timer
|
|
timer = time.NewTimer(interval)
|
|
|
|
// release join counter
|
|
defer func() {
|
|
timer.Stop()
|
|
wg.Done()
|
|
j.exited = true
|
|
}()
|
|
|
|
for j.running {
|
|
select {
|
|
case <-timer.C:
|
|
if time.Now().Before(due) {
|
|
timer.Reset(
|
|
due.Sub(time.Now()) + 50*time.Millisecond,
|
|
)
|
|
continue
|
|
}
|
|
|
|
due, _ := getNextDueSafe(j.spec, lastNextTime)
|
|
lastNextTime = due
|
|
interval := due.Sub(time.Now())
|
|
timer.Reset(interval)
|
|
|
|
if j.async {
|
|
go doTimeCostFunc() // goroutine for per job
|
|
} else {
|
|
doTimeCostFunc()
|
|
}
|
|
|
|
defualtLogger("info",
|
|
fmt.Sprintf("scheduler service: %s next time is %s, sub: %s",
|
|
j.srv,
|
|
due.String(),
|
|
interval.String(),
|
|
),
|
|
)
|
|
|
|
case <-j.notifyChan:
|
|
// parse crontab spec again !
|
|
continue
|
|
|
|
case <-j.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (j *JobModel) kill() {
|
|
j.running = false
|
|
j.cancel()
|
|
}
|
|
|
|
func (j *JobModel) workerExited() bool {
|
|
return j.exited
|
|
}
|
|
|
|
func (j *JobModel) notifySig() {
|
|
select {
|
|
case j.notifyChan <- 1:
|
|
default:
|
|
// avoid block
|
|
return
|
|
}
|
|
}
|
|
|
|
func tryCatch(job *JobModel) {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
panicCaller(
|
|
job.srv,
|
|
fmt.Sprintf("%v", e),
|
|
)
|
|
|
|
defualtLogger(
|
|
"error",
|
|
fmt.Sprintf("srv: %s, trycatch panicing %v", job.srv, e),
|
|
)
|
|
}
|
|
}()
|
|
|
|
job.do()
|
|
}
|