mirror of
				https://github.com/lzh-1625/go_process_manager.git
				synced 2025-10-31 11:26:49 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			137 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package logic
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/lzh-1625/go_process_manager/internal/app/eum"
 | |
| 	"github.com/lzh-1625/go_process_manager/internal/app/middle"
 | |
| 	"github.com/lzh-1625/go_process_manager/internal/app/model"
 | |
| 	"github.com/lzh-1625/go_process_manager/log"
 | |
| 	"github.com/robfig/cron/v3"
 | |
| )
 | |
| 
 | |
| type TaskJob struct {
 | |
| 	Cron       *cron.Cron         `json:"-"`
 | |
| 	TaskConfig *model.Task        `json:"task"`
 | |
| 	Running    bool               `json:"running"`
 | |
| 	Cancel     context.CancelFunc `json:"-"`
 | |
| 	StartTime  time.Time          `json:"startTime"`
 | |
| 	EndTime    time.Time          `json:"endTime"`
 | |
| }
 | |
| 
 | |
| func NewTaskJob(data model.Task) (*TaskJob, error) {
 | |
| 	tj := &TaskJob{
 | |
| 		TaskConfig: &data,
 | |
| 		StartTime:  time.Now(),
 | |
| 	}
 | |
| 	if data.Enable && data.CronExpression != "" {
 | |
| 		err := tj.InitCronHandle()
 | |
| 		if err != nil {
 | |
| 			log.Logger.Warnw("定时任务启动失败", "err", err, "task", data.Id)
 | |
| 		}
 | |
| 	}
 | |
| 	return tj, nil
 | |
| }
 | |
| 
 | |
| func (t *TaskJob) Run(ctx context.Context) {
 | |
| 	if ctx.Value(eum.CtxTaskTraceId{}) == nil {
 | |
| 		ctx = context.WithValue(ctx, eum.CtxTaskTraceId{}, uuid.NewString())
 | |
| 	}
 | |
| 	EventLogic.Create(t.TaskConfig.Name, eum.EventTaskStart, "traceId", ctx.Value(eum.CtxTaskTraceId{}).(string))
 | |
| 	defer EventLogic.Create(t.TaskConfig.Name, eum.EventTaskStop, "traceId", ctx.Value(eum.CtxTaskTraceId{}).(string))
 | |
| 	t.Running = true
 | |
| 	middle.TaskWaitCond.Trigger()
 | |
| 	defer func() {
 | |
| 		t.Running = false
 | |
| 		middle.TaskWaitCond.Trigger()
 | |
| 	}()
 | |
| 	var ok bool
 | |
| 	// 判断条件是否满足
 | |
| 	if t.TaskConfig.Condition == eum.TaskCondPass {
 | |
| 		ok = true
 | |
| 	} else {
 | |
| 		proc, err := ProcessCtlLogic.GetProcess(t.TaskConfig.OperationTarget)
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		ok = conditionHandle[t.TaskConfig.Condition](t.TaskConfig, proc)
 | |
| 	}
 | |
| 	log.Logger.Debugw("任务条件判断", "pass", ok)
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	proc, err := ProcessCtlLogic.GetProcess(t.TaskConfig.OperationTarget)
 | |
| 	if err != nil {
 | |
| 		log.Logger.Debugw("不存在该进程,结束任务")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// 执行操作
 | |
| 	log.Logger.Infow("任务开始执行")
 | |
| 	if !OperationHandle[t.TaskConfig.Operation](t.TaskConfig, proc) {
 | |
| 		log.Logger.Warnw("任务执行失败")
 | |
| 		return
 | |
| 	}
 | |
| 	log.Logger.Infow("任务执行成功", "target", t.TaskConfig.OperationTarget)
 | |
| 
 | |
| 	if t.TaskConfig.NextId != nil {
 | |
| 		nextTask, err := TaskLogic.getTaskJob(*t.TaskConfig.NextId)
 | |
| 		if err != nil {
 | |
| 			log.Logger.Errorw("无法获取到下一个节点,结束任务", "nextId", t.TaskConfig.NextId)
 | |
| 			return
 | |
| 		}
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			log.Logger.Infow("任务流被手动结束")
 | |
| 		default:
 | |
| 			log.Logger.Debugw("执行下一个节点", "nextId", *t.TaskConfig.NextId)
 | |
| 			if nextTask.Running {
 | |
| 				log.Logger.Errorw("下一个节点已在运行,结束任务", "nextId", t.TaskConfig.NextId)
 | |
| 				return
 | |
| 			}
 | |
| 			nextTask.Run(ctx)
 | |
| 		}
 | |
| 	} else {
 | |
| 		log.Logger.Infow("任务流结束")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (t *TaskJob) InitCronHandle() error {
 | |
| 	if _, err := cron.ParseStandard(t.TaskConfig.CronExpression); err != nil { // cron表达式校验
 | |
| 		log.Logger.Errorw("cron解析失败", "cron", t.TaskConfig.CronExpression, "err", err)
 | |
| 		return err
 | |
| 	}
 | |
| 	c := cron.New()
 | |
| 	_, err := c.AddFunc(t.TaskConfig.CronExpression, func() {
 | |
| 		log.Logger.Infow("定时任务启动")
 | |
| 		if t.Running {
 | |
| 			log.Logger.Infow("任务已在运行,跳过当前任务")
 | |
| 			return
 | |
| 		}
 | |
| 		ctx, cancel := context.WithCancel(context.Background())
 | |
| 		defer cancel()
 | |
| 		t.Cancel = cancel
 | |
| 		t.Run(ctx)
 | |
| 		log.Logger.Infow("定时任务结束")
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	c.Start()
 | |
| 	t.Cron = c
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (t *TaskJob) EditStatus(status bool) error {
 | |
| 	if t.Cron != nil && !status {
 | |
| 		t.Cron.Stop()
 | |
| 	} else if err := t.InitCronHandle(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	t.TaskConfig.Enable = status
 | |
| 	return nil
 | |
| }
 | 
