mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-10-24 14:03:08 +08:00
Merge pull request #246 from onepanelio/fix/delete.scheduled.cron
Fixed TerminateCronWorkflow
This commit is contained in:
@@ -513,13 +513,45 @@ func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64
|
|||||||
func (c *Client) TerminateCronWorkflow(namespace, uid string) (err error) {
|
func (c *Client) TerminateCronWorkflow(namespace, uid string) (err error) {
|
||||||
err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Delete(uid, nil)
|
err = c.ArgoprojV1alpha1().CronWorkflows(namespace).Delete(uid, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.Contains(err.Error(), "not found") {
|
if !strings.Contains(err.Error(), "not found") {
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cronWorkflow, err := c.selectCronWorkflowWithWorkflowTemplateVersion(namespace, uid)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = sb.Delete("cron_workflows").
|
//workflow executions
|
||||||
|
var workflows []*WorkflowExecution
|
||||||
|
query, args, err := sb.Select().
|
||||||
|
Columns(getWorkflowExecutionColumns("we", "")...).
|
||||||
|
From("workflow_executions we").
|
||||||
|
Where(sq.Eq{
|
||||||
|
"cron_workflow_id": cronWorkflow.ID,
|
||||||
|
}).ToSql()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := c.DB.Select(&workflows, query, args...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, wf := range workflows {
|
||||||
|
err = c.ArchiveWorkflowExecution(namespace, wf.UID)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
"Namespace": namespace,
|
||||||
|
"UID": uid,
|
||||||
|
"Error": err.Error(),
|
||||||
|
}).Error("Archive Workflow Execution Failed.")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = sb.Update("cron_workflows").
|
||||||
|
Set("is_archived", true).
|
||||||
Where(sq.Eq{
|
Where(sq.Eq{
|
||||||
"uid": uid,
|
"uid": uid,
|
||||||
"namespace": namespace,
|
"namespace": namespace,
|
||||||
@@ -629,3 +661,27 @@ func (c *Client) GetCronWorkflowStatisticsForTemplates(workflowTemplates ...*Wor
|
|||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) selectCronWorkflowWithWorkflowTemplateVersion(namespace, uid string, extraColumns ...string) (*CronWorkflow, error) {
|
||||||
|
query, args, err := sb.Select(getCronWorkflowColumns(extraColumns...)...).
|
||||||
|
From("cron_workflows cw").
|
||||||
|
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
|
||||||
|
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
|
||||||
|
Where(sq.Eq{
|
||||||
|
"wt.namespace": namespace,
|
||||||
|
"cw.name": uid,
|
||||||
|
"cw.is_archived": false,
|
||||||
|
}).
|
||||||
|
ToSql()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cronWorkflow := &CronWorkflow{}
|
||||||
|
if err = c.DB.Get(cronWorkflow, query, args...); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return cronWorkflow, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -670,7 +670,7 @@ func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool,
|
|||||||
"Namespace": namespace,
|
"Namespace": namespace,
|
||||||
"UID": uid,
|
"UID": uid,
|
||||||
"Error": err.Error(),
|
"Error": err.Error(),
|
||||||
}).Error("Archive Workflow Execution.")
|
}).Error("Archive Workflow Execution Failed.")
|
||||||
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user