add workflow status webhook

This commit is contained in:
rushtehrani
2020-05-19 17:33:02 -07:00
parent 227d9c0d4e
commit 2df35cb27e
7 changed files with 544 additions and 49 deletions

View File

@@ -236,6 +236,11 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, wor
wf.ObjectMeta.Labels = *opts.Labels
}
err = injectWorkflowExecutionStatusCaller(wf, wfv1.NodeRunning)
if err != nil {
return 0, nil, err
}
err = injectExitHandlerWorkflowExecutionStatistic(wf, &workflowTemplateId)
if err != nil {
return 0, nil, err
@@ -501,7 +506,8 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string
"workflow_template_version_id": cronWorkflow.WorkflowTemplateVersionId,
"name": uid,
"namespace": namespace,
"phase": wfv1.NodePending,
"phase": wfv1.NodeRunning,
"started_at": time.Now().UTC(),
"cron_workflow_id": cronWorkflow.ID,
"parameters": string(parametersJson),
}
@@ -1392,6 +1398,44 @@ func injectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, workflowTemp
return err
}
// Inject template as entrypoint in DAG
wf.Spec.Templates = append(wf.Spec.Templates, *containerTemplate)
for i, t := range wf.Spec.Templates {
if t.Name == wf.Spec.Entrypoint {
// DAG is always required for entrypoint templates
if t.DAG != nil {
for j, task := range t.DAG.Tasks {
if task.Dependencies == nil {
wf.Spec.Templates[i].DAG.Tasks[j].Dependencies = []string{containerTemplate.Name}
wf.Spec.Templates[i].DAG.Tasks = append(t.DAG.Tasks, wfv1.DAGTask{
Name: containerTemplate.Name,
Template: containerTemplate.Name,
})
}
}
}
break
}
}
return nil
}
func injectWorkflowExecutionStatusCaller(wf *wfv1.Workflow, phase wfv1.NodePhase) error {
curlPath := "/apis/v1beta1/{{workflow.namespace}}/workflow_executions/{{workflow.name}}/status"
status := WorkflowExecutionStatus{
Phase: phase,
}
statusBytes, err := json.Marshal(status)
if err != nil {
return err
}
containerTemplate, err := getCURLNodeTemplate("sys-send-status", http.MethodPost, curlPath, string(statusBytes), wfv1.Inputs{})
if err != nil {
return err
}
// Inject template as entrypoint in DAG
wf.Spec.Templates = append(wf.Spec.Templates, *containerTemplate)
for i, t := range wf.Spec.Templates {
if t.Name == wf.Spec.Entrypoint {
@@ -1468,3 +1512,28 @@ func (c *Client) getWorkflowExecutionAndTemplate(namespace string, uid string) (
return
}
// UpdateWorkflowExecutionPhase updates workflow execution phases and times.
// `modified_at` time is always updated when this method is called.
func (c *Client) UpdateWorkflowExecutionStatus(namespace, uid string, status *WorkflowExecutionStatus) (err error) {
fieldMap := sq.Eq{
"phase": status.Phase,
}
switch status.Phase {
case wfv1.NodeRunning:
fieldMap["started_at"] = time.Now().UTC()
break
}
_, err = sb.Update("workflow_executions").
SetMap(fieldMap).
Where(sq.Eq{
"namespace": namespace,
"uid": uid,
}).
RunWith(c.DB).Exec()
if err != nil {
return util.NewUserError(codes.NotFound, "Workflow execution not found.")
}
return
}