Merge remote-tracking branch 'origin/feat/workspaces' into feat/integrate.workflow.changes

This commit is contained in:
Andrey Melnikov
2020-04-27 09:54:06 -07:00
21 changed files with 1430 additions and 70 deletions

View File

@@ -276,6 +276,17 @@ func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (e
if err != nil {
return
}
// Check that entrypoint and onExit templates are DAGs
for _, t := range wf.Spec.Templates {
if t.Name == wf.Spec.Entrypoint && t.DAG == nil {
return errors.New("\"entrypoint\" template should be a DAG")
}
if wf.Spec.OnExit != "" && t.Name == wf.Spec.OnExit && t.DAG == nil {
return errors.New("\"onExit\" template should be a DAG")
}
}
}
return
@@ -1204,9 +1215,7 @@ func (c *Client) GetWorkflowExecutionStatisticsForTemplates(workflowTemplates ..
Will build a template that makes a CURL request to the onepanel-core API,
with statistics about the workflow that was just executed.
*/
func GetExitHandlerWorkflowStatistics(namespace string, workflowTemplateId *uint64) (workflowStepName, workflowStepTemplate, workflowStepWhen string, err error, wfv1Template wfv1.Template) {
workflowStepName = "workflow-statistics"
workflowStepTemplate = "workflow-statistics-template"
func getExitHandlerWorkflowStatistics(namespace string, workflowTemplateId *uint64) (statsTemplate *wfv1.Template, err error) {
host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "")
if host == "" {
err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.")
@@ -1225,18 +1234,18 @@ func GetExitHandlerWorkflowStatistics(namespace string, workflowTemplateId *uint
}
jsonRequestBytes, err := json.Marshal(jsonRequestStruct)
if err != nil {
return "", "", "", err, wfv1.Template{}
return nil, err
}
jsonRequestStr := string(jsonRequestBytes)
curlJSONBody := fmt.Sprintf("--data '%s'", jsonRequestStr)
token, err := GetBearerToken(namespace)
if err != nil {
return "", "", "", err, wfv1.Template{}
return nil, err
}
wfv1Template = wfv1.Template{
Name: workflowStepTemplate,
statsTemplate = &wfv1.Template{
Name: "workflow-statistics",
Container: &corev1.Container{
Image: "curlimages/curl",
Command: []string{"sh", "-c"},
@@ -1283,23 +1292,38 @@ func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Tem
}
func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace string, workflowTemplateId *uint64) error {
exitHandlerStepName, exitHandlerStepTemplate, exitHandlerStepWhen, err, exitHandlerTemplate := GetExitHandlerWorkflowStatistics(namespace, workflowTemplateId)
statsTemplate, err := getExitHandlerWorkflowStatistics(namespace, workflowTemplateId)
if err != nil {
return err
}
if exitHandlerStepTemplate != "" {
exitHandler := wfv1.Template{
dagTask := wfv1.DAGTask{
Name: statsTemplate.Name,
Template: statsTemplate.Name,
}
wf.Spec.Templates = append(wf.Spec.Templates, *statsTemplate)
if wf.Spec.OnExit != "" {
for _, t := range wf.Spec.Templates {
if t.Name == wf.Spec.OnExit {
lasTaskIndex := len(t.DAG.Tasks) - 1
dagTask.Dependencies = []string{t.DAG.Tasks[lasTaskIndex].Name}
t.DAG.Tasks = append(t.DAG.Tasks, dagTask)
break
}
}
} else {
exitHandlerDAG := wfv1.Template{
Name: "exit-handler",
Steps: []wfv1.ParallelSteps{
{
Steps: []wfv1.WorkflowStep{
{Name: exitHandlerStepName, Template: exitHandlerStepTemplate, When: exitHandlerStepWhen},
},
DAG: &wfv1.DAGTemplate{
Tasks: []wfv1.DAGTask{
dagTask,
},
},
}
wf.Spec.OnExit = "exit-handler"
wf.Spec.Templates = append(wf.Spec.Templates, exitHandler, exitHandlerTemplate)
wf.Spec.Templates = append(wf.Spec.Templates, exitHandlerDAG)
}
return nil
}