Adding initContainer to the entry point for a given Cron Workflow.

This commit is contained in:
Aleksandr Melnikov
2020-04-21 16:55:22 -07:00
parent 33cc90ea4f
commit 4e524318bc
2 changed files with 87 additions and 1 deletions

View File

@@ -478,6 +478,33 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name,
return err
}
func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string, workflowTemplateID int64) (err error) {
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
insertMap := sq.Eq{
"workflow_template_id": workflowTemplateID,
"uid": uid,
"name": uid,
"namespace": namespace,
"created_at": time.Now().UTC(),
}
_, err = sb.Insert("workflow_executions").
SetMap(insertMap).RunWith(tx).Exec()
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return err
}
func (c *Client) GetWorkflowExecution(namespace, name string) (workflow *WorkflowExecution, err error) {
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{})
if err != nil {
@@ -1294,6 +1321,38 @@ func GetExitHandlerWorkflowStatistics(namespace, uid string, workflowTemplateId
return
}
func GetInitContainerForCronWorkflow(templateCorrespondingToEntryPoint *wfv1.Template, namespace, uid string, workflowTemplateId int64) (err error) {
host := env.GetEnv("ONEPANEL_CORE_SERVICE_HOST", "")
if host == "" {
err = errors.New("ONEPANEL_CORE_SERVICE_HOST is empty.")
return
}
port := env.GetEnv("ONEPANEL_CORE_SERVICE_PORT", "")
if port == "" {
err = errors.New("ONEPANEL_CORE_SERVICE_PORT is empty.")
return
}
curlEndpoint := fmt.Sprintf("http://%s:%s/apis/v1beta1/%s/workflow_executions/%s/cron_start_statistics/%d", host, port, namespace, uid, workflowTemplateId)
token, err := GetBearerToken(namespace)
if err != nil {
return err
}
initContainer := wfv1.UserContainer{
Container: corev1.Container{
Name: "cron-init-container",
Image: "curlimages/curl",
Command: []string{"sh", "-c"},
Args: []string{
"curl '" + curlEndpoint + "' -H \"Content-Type: application/json\" -H 'Connection: keep-alive' -H 'Accept: application/json' " +
"-H 'Authorization: Bearer " + token + "' ",
}}}
templateCorrespondingToEntryPoint.InitContainers = append(templateCorrespondingToEntryPoint.InitContainers, initContainer)
return
}
func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, wfExecUid string, workflowTemplateId *uint64) error {
exitHandlerStepName, exitHandlerStepTemplate, exitHandlerStepWhen, err, exitHandlerTemplate := GetExitHandlerWorkflowStatistics(namespace, wfExecUid, workflowTemplateId)
if err != nil {
@@ -1315,3 +1374,20 @@ func InjectExitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, w
}
return nil
}
func InjectInitHandlerWorkflowExecutionStatistic(wf *wfv1.Workflow, namespace, uid string, workflowTemplateId int64) error {
//Find the template that matches this entrypoint name
//Add the init container to that Template
var templateToUse *wfv1.Template
for idx := range wf.Spec.Templates {
if wf.Spec.Templates[idx].Name == wf.Spec.Entrypoint {
templateToUse = &wf.Spec.Templates[idx]
break
}
}
err := GetInitContainerForCronWorkflow(templateToUse, namespace, uid, workflowTemplateId)
if err != nil {
return err
}
return nil
}