Files
go_process_manager/internal/app/logic/task.go
2025-08-31 20:54:25 +08:00

137 lines
3.7 KiB
Go

package logic
import (
"context"
"time"
"github.com/google/uuid"
"github.com/lzh-1625/go_process_manager/internal/app/eum"
"github.com/lzh-1625/go_process_manager/internal/app/middle"
"github.com/lzh-1625/go_process_manager/internal/app/model"
"github.com/lzh-1625/go_process_manager/log"
"github.com/robfig/cron/v3"
)
type TaskJob struct {
Cron *cron.Cron `json:"-"`
TaskConfig *model.Task `json:"task"`
Running bool `json:"running"`
Cancel context.CancelFunc `json:"-"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
}
func NewTaskJob(data model.Task) (*TaskJob, error) {
tj := &TaskJob{
TaskConfig: &data,
StartTime: time.Now(),
}
if data.Enable && data.CronExpression != "" {
err := tj.InitCronHandle()
if err != nil {
log.Logger.Warnw("定时任务启动失败", "err", err, "task", data.Id)
}
}
return tj, nil
}
func (t *TaskJob) Run(ctx context.Context) {
if ctx.Value(eum.CtxTaskTraceId{}) == nil {
ctx = context.WithValue(ctx, eum.CtxTaskTraceId{}, uuid.NewString())
}
EventLogic.Create(t.TaskConfig.Name, eum.EventTaskStart, "traceId", ctx.Value(eum.CtxTaskTraceId{}).(string))
defer EventLogic.Create(t.TaskConfig.Name, eum.EventTaskStop, "traceId", ctx.Value(eum.CtxTaskTraceId{}).(string))
t.Running = true
middle.TaskWaitCond.Trigger()
defer func() {
t.Running = false
middle.TaskWaitCond.Trigger()
}()
var ok bool
// 判断条件是否满足
if t.TaskConfig.Condition == eum.TaskCondPass {
ok = true
} else {
proc, err := ProcessCtlLogic.GetProcess(t.TaskConfig.OperationTarget)
if err != nil {
return
}
ok = conditionHandle[t.TaskConfig.Condition](t.TaskConfig, proc)
}
log.Logger.Debugw("任务条件判断", "pass", ok)
if !ok {
return
}
proc, err := ProcessCtlLogic.GetProcess(t.TaskConfig.OperationTarget)
if err != nil {
log.Logger.Debugw("不存在该进程,结束任务")
return
}
// 执行操作
log.Logger.Infow("任务开始执行")
if !OperationHandle[t.TaskConfig.Operation](t.TaskConfig, proc) {
log.Logger.Warnw("任务执行失败")
return
}
log.Logger.Infow("任务执行成功", "target", t.TaskConfig.OperationTarget)
if t.TaskConfig.NextId != nil {
nextTask, err := TaskLogic.getTaskJob(*t.TaskConfig.NextId)
if err != nil {
log.Logger.Errorw("无法获取到下一个节点,结束任务", "nextId", t.TaskConfig.NextId)
return
}
select {
case <-ctx.Done():
log.Logger.Infow("任务流被手动结束")
default:
log.Logger.Debugw("执行下一个节点", "nextId", *t.TaskConfig.NextId)
if nextTask.Running {
log.Logger.Errorw("下一个节点已在运行,结束任务", "nextId", t.TaskConfig.NextId)
return
}
nextTask.Run(ctx)
}
} else {
log.Logger.Infow("任务流结束")
}
}
func (t *TaskJob) InitCronHandle() error {
if _, err := cron.ParseStandard(t.TaskConfig.CronExpression); err != nil { // cron表达式校验
log.Logger.Errorw("cron解析失败", "cron", t.TaskConfig.CronExpression, "err", err)
return err
}
c := cron.New()
_, err := c.AddFunc(t.TaskConfig.CronExpression, func() {
log.Logger.Infow("定时任务启动")
if t.Running {
log.Logger.Infow("任务已在运行,跳过当前任务")
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Cancel = cancel
t.Run(ctx)
log.Logger.Infow("定时任务结束")
})
if err != nil {
return err
}
c.Start()
t.Cron = c
return nil
}
func (t *TaskJob) EditStatus(status bool) error {
if t.Cron != nil && !status {
t.Cron.Stop()
} else if err := t.InitCronHandle(); err != nil {
return err
}
t.TaskConfig.Enable = status
return nil
}