Added code to clean up Workflow Executions.

- Removes db and k8s entries.
This commit is contained in:
Aleksandr Melnikov
2020-05-20 14:59:01 -07:00
parent 819f48efde
commit b79cfa019f
2 changed files with 62 additions and 1 deletions

View File

@@ -1463,3 +1463,24 @@ func (c *Client) getWorkflowExecutionAndTemplate(namespace string, uid string) (
return return
} }
func (c *Client) DeleteWorkflowExecution(namespace, uid string) error {
err := c.ArgoprojV1alpha1().Workflows(namespace).Delete(uid, nil)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil
}
return err
}
return nil
}
func (c *Client) DeleteWorkflowExecutionDb(workflowUid string) error {
query := `DELETE FROM workflow_executions
WHERE uid = $1`
if _, err := c.DB.Exec(query, workflowUid); err != nil {
return err
}
return nil
}

View File

@@ -626,7 +626,8 @@ func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool,
} }
//clean up workflow templates //clean up workflow templates
workflowTemplateName := uid + "-v" + strconv.FormatInt(workflowTemplate.Version, 10) wfTempVer := strconv.FormatInt(workflowTemplate.Version, 10)
workflowTemplateName := uid + "-v" + wfTempVer
err = c.DeleteWorkflowTemplate(namespace, workflowTemplateName) err = c.DeleteWorkflowTemplate(namespace, workflowTemplateName)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@@ -671,6 +672,45 @@ func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool,
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.") return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
} }
} }
//workflow executions
paginator := pagination.NewRequest(0, 100)
for {
wfs, err := c.ListWorkflowExecutions(namespace, uid, wfTempVer, &paginator)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Workflow Executions failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
if len(wfs) == 0 {
break
}
for _, wf := range wfs {
err = c.DeleteWorkflowExecution(namespace, wf.UID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Delete Workflow Execution k8s failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
err = c.DeleteWorkflowExecutionDb(wf.UID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Delete Workflow Execution DB failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
}
}
archived, err = c.archiveWorkflowTemplate(namespace, uid) //db only
if !archived || err != nil { if !archived || err != nil {
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{