Merge branch 'dev' into feat/provider-specific-config

This commit is contained in:
Rush Tehrani
2020-07-09 14:18:49 -07:00
committed by GitHub
63 changed files with 1935 additions and 895 deletions

View File

@@ -2,6 +2,7 @@ package v1
import (
"bufio"
"database/sql"
"encoding/json"
"errors"
"fmt"
@@ -85,6 +86,18 @@ func UnmarshalWorkflows(wfBytes []byte, strict bool) (wfs []wfv1.Workflow, err e
return
}
// getWorkflowsFromWorkflowTemplate parses the WorkflowTemplate manifest and returns the argo workflows from it
func getWorkflowsFromWorkflowTemplate(wt *WorkflowTemplate) (wfs []wfv1.Workflow, err error) {
manifest, err := wt.GetWorkflowManifestBytes()
if err != nil {
return nil, err
}
wfs, err = UnmarshalWorkflows(manifest, true)
return
}
// appendArtifactRepositoryConfigIfMissing appends default artifact repository config to artifacts that have a key.
// Artifacts that contain anything other than key are skipped.
func injectArtifactRepositoryConfig(artifact *wfv1.Artifact, namespaceConfig *NamespaceConfig) {
@@ -265,11 +278,19 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
return
}
// ArchiveWorkflowExecution marks a WorkflowExecution as archived in database
// and deletes the argo workflow.
//
// If the database record does not exist, we still try to delete the argo workflow record.
// No errors are returned if the records do not exist.
func (c *Client) ArchiveWorkflowExecution(namespace, uid string) error {
_, err := sb.Update("workflow_executions").Set("is_archived", true).Where(sq.Eq{
"uid": uid,
"namespace": namespace,
}).RunWith(c.DB).Exec()
_, err := sb.Update("workflow_executions").
Set("is_archived", true).
Where(sq.Eq{
"uid": uid,
"namespace": namespace,
}).RunWith(c.DB).
Exec()
if err != nil {
return err
}
@@ -285,11 +306,10 @@ func (c *Client) ArchiveWorkflowExecution(namespace, uid string) error {
return nil
}
/*
Name is == to UID, no user friendly name.
Workflow execution name == uid, example: name = my-friendly-wf-name-8skjz, uid = my-friendly-wf-name-8skjz
*/
func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, workflowTemplateVersionId uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (newDbId uint64, createdWorkflow *wfv1.Workflow, err error) {
// createWorkflow creates the workflow in the database and argo.
// Name is == to UID, no user friendly name.
// Workflow execution name == uid, example: name = my-friendly-wf-name-8skjz, uid = my-friendly-wf-name-8skjz
func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, workflowTemplateVersionID uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *WorkflowExecution, err error) {
if opts == nil {
opts = &WorkflowExecutionOptions{}
}
@@ -330,34 +350,41 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateId uint64, wor
wf.ObjectMeta.Labels = opts.Labels
}
err = injectWorkflowExecutionStatusCaller(wf, wfv1.NodeRunning)
if err != nil {
return 0, nil, err
if err = injectWorkflowExecutionStatusCaller(wf, wfv1.NodeRunning); err != nil {
return nil, err
}
err = injectExitHandlerWorkflowExecutionStatistic(wf, &workflowTemplateId)
if err != nil {
return 0, nil, err
if err = injectExitHandlerWorkflowExecutionStatistic(wf, &workflowTemplateID); err != nil {
return nil, err
}
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
return 0, nil, err
return nil, err
}
createdWorkflow, err = c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
createdArgoWorkflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
if err != nil {
return 0, nil, err
return nil, err
}
uid, err := uid2.GenerateUID(createdWorkflow.Name, 63)
if err != nil {
return 0, nil, err
createdWorkflow = &WorkflowExecution{
Name: createdArgoWorkflow.Name,
CreatedAt: createdArgoWorkflow.CreationTimestamp.UTC(),
ArgoWorkflow: createdArgoWorkflow,
WorkflowTemplate: &WorkflowTemplate{
WorkflowTemplateVersionID: workflowTemplateVersionID,
},
Parameters: opts.Parameters,
}
if err = createdWorkflow.GenerateUID(createdArgoWorkflow.Name); err != nil {
return nil, err
}
//Create an entry for workflow_executions statistic
//CURL code will hit the API endpoint that will update the db row
newDbId, err = c.insertPreWorkflowExecutionStatistic(namespace, createdWorkflow.Name, workflowTemplateVersionId, createdWorkflow.CreationTimestamp.UTC(), uid, opts.Parameters)
if err != nil {
return 0, nil, err
if err := c.createWorkflowExecutionDB(namespace, createdWorkflow); err != nil {
return nil, err
}
return
@@ -376,7 +403,9 @@ func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (e
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(c.ArgoprojV1alpha1().WorkflowTemplates(namespace))
for _, wf := range workflows {
c.injectAutomatedFields(namespace, &wf, &WorkflowExecutionOptions{})
if err = c.injectAutomatedFields(namespace, &wf, &WorkflowExecutionOptions{}); err != nil {
return err
}
_, err = validate.ValidateWorkflow(wftmplGetter, &wf, validate.ValidateOpts{})
if err != nil {
return
@@ -398,16 +427,22 @@ func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (e
}
// CreateWorkflowExecution creates an argo workflow execution and related resources.
// Note that the workflow template is loaded from the database/k8s, so workflow.WorkflowTemplate.Manifest is not used.
// Required:
// * workflow.Parameters
// * workflow.Labels (optional)
// If workflow.Name is set, it is used instead of a generated name.
// If there is a parameter named "workflow-execution-name" in workflow.Parameters, it is set as the name.
func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExecution, workflowTemplate *WorkflowTemplate) (*WorkflowExecution, error) {
opts := &WorkflowExecutionOptions{
Labels: make(map[string]string),
Parameters: workflow.Parameters,
}
if workflow.Name != "" {
opts.Name = workflow.Name
}
if workflowExecutionName := workflow.GetParameterValue("workflow-execution-name"); workflowExecutionName != nil {
opts.Name = *workflowExecutionName
}
nameUID, err := uid2.GenerateUID(workflowTemplate.Name, 63)
if err != nil {
return nil, err
@@ -418,19 +453,16 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
opts.Labels[workflowTemplateVersionLabelKey] = fmt.Sprint(workflowTemplate.Version)
label.MergeLabelsPrefix(opts.Labels, workflow.Labels, label.TagPrefix)
// @todo we need to enforce the below requirement in API.
//UX will prevent multiple workflows
manifest, err := workflowTemplate.GetWorkflowManifestBytes()
workflows, err := getWorkflowsFromWorkflowTemplate(workflowTemplate)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Error with getting WorkflowManifest from workflow template")
return nil, err
}
workflows, err := UnmarshalWorkflows(manifest, true)
if len(workflows) != 1 {
return nil, fmt.Errorf("workflow Template contained more than 1 workflow execution")
}
createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionID, &workflows[0], opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
@@ -440,35 +472,14 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return nil, err
}
id, createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionID, &workflows[0], opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
if _, err := c.InsertLabels(TypeWorkflowExecution, createdWorkflow.ID, workflow.Labels); err != nil {
return nil, err
}
if _, err := c.InsertLabels(TypeWorkflowExecution, id, workflow.Labels); err != nil {
return nil, err
}
if createdWorkflow == nil {
err = errors.New("unable to create workflow")
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
workflow.ID = id
workflow.ID = createdWorkflow.ID
workflow.Name = createdWorkflow.Name
workflow.CreatedAt = createdWorkflow.CreationTimestamp.UTC()
workflow.UID = createdWorkflow.Name
workflow.CreatedAt = createdWorkflow.CreatedAt.UTC()
workflow.UID = createdWorkflow.UID
workflow.WorkflowTemplate = workflowTemplate
return workflow, nil
@@ -494,44 +505,41 @@ func (c *Client) CloneWorkflowExecution(namespace, uid string) (*WorkflowExecuti
return c.CreateWorkflowExecution(namespace, workflowExecution, workflowTemplate)
}
func (c *Client) insertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateVersionId uint64, createdAt time.Time, uid string, parameters []Parameter) (newId uint64, err error) {
tx, err := c.DB.Begin()
// createWorkflowExecutionDB inserts a workflow execution into the database.
// Required fields
// * name
// * createdAt // we sync the argo created at with the db
// * parameters, if any
// * WorkflowTemplate.WorkflowTemplateVersionID
//
// After success, the passed in WorkflowExecution will have it's ID set to the new db record.
func (c *Client) createWorkflowExecutionDB(namespace string, workflowExecution *WorkflowExecution) (err error) {
parametersJSON, err := json.Marshal(workflowExecution.Parameters)
if err != nil {
return 0, err
}
defer tx.Rollback()
parametersJSON, err := json.Marshal(parameters)
if err != nil {
return 0, err
return err
}
insertMap := sq.Eq{
"uid": uid,
"workflow_template_version_id": workflowTemplateVersionId,
"name": name,
"namespace": namespace,
"created_at": createdAt.UTC(),
"phase": wfv1.NodePending,
"parameters": string(parametersJSON),
"is_archived": false,
if err := workflowExecution.GenerateUID(workflowExecution.Name); err != nil {
return err
}
err = sb.Insert("workflow_executions").
SetMap(insertMap).
SetMap(sq.Eq{
"UID": workflowExecution.UID,
"workflow_template_version_id": workflowExecution.WorkflowTemplate.WorkflowTemplateVersionID,
"name": workflowExecution.Name,
"namespace": namespace,
"created_at": workflowExecution.CreatedAt.UTC(),
"phase": wfv1.NodePending,
"parameters": string(parametersJSON),
"is_archived": false,
}).
Suffix("RETURNING id").
RunWith(tx).
RunWith(c.DB).
QueryRow().
Scan(&newId)
Scan(&workflowExecution.ID)
if err != nil {
return 0, err
}
err = tx.Commit()
if err != nil {
return 0, err
}
return newId, err
return
}
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) {
@@ -562,32 +570,20 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name
}
func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string, workflowTemplateID int64) (err error) {
query, args, err := c.workflowTemplatesSelectBuilder(namespace).
queryWt := c.workflowTemplatesSelectBuilder(namespace).
Where(sq.Eq{
"wt.id": workflowTemplateID,
}).
ToSql()
if err != nil {
return err
}
})
workflowTemplate := &WorkflowTemplate{}
if err := c.DB.Get(workflowTemplate, query, args...); err != nil {
if err := c.DB.Getx(workflowTemplate, queryWt); err != nil {
return err
}
query, args, err = c.cronWorkflowSelectBuilder(namespace, workflowTemplate.UID).ToSql()
if err != nil {
return err
}
queryCw := c.cronWorkflowSelectBuilder(namespace, workflowTemplate.UID)
cronWorkflow := &CronWorkflow{}
if err := c.DB.Get(cronWorkflow, query, args...); err != nil {
return err
}
cronLabels, err := c.GetDbLabels(TypeCronWorkflow, cronWorkflow.ID)
if err != nil {
if err := c.DB.Getx(cronWorkflow, queryCw); err != nil {
return err
}
@@ -602,49 +598,43 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string
return err
}
insertMap := sq.Eq{
"uid": uid,
"workflow_template_version_id": cronWorkflow.WorkflowTemplateVersionID,
"name": uid,
"namespace": namespace,
"phase": wfv1.NodeRunning,
"started_at": time.Now().UTC(),
"cron_workflow_id": cronWorkflow.ID,
"parameters": string(parametersJSON),
}
workflowExecutionId := uint64(0)
workflowExecutionID := uint64(0)
err = sb.Insert("workflow_executions").
SetMap(insertMap).
SetMap(sq.Eq{
"uid": uid,
"workflow_template_version_id": cronWorkflow.WorkflowTemplateVersionID,
"name": uid,
"namespace": namespace,
"phase": wfv1.NodeRunning,
"started_at": time.Now().UTC(),
"cron_workflow_id": cronWorkflow.ID,
"parameters": string(parametersJSON),
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&workflowExecutionId)
Scan(&workflowExecutionID)
if err != nil {
return err
}
if len(cronLabels) > 0 {
labelsMapped := LabelsToMapping(cronLabels...)
_, err = c.InsertLabelsBuilder(TypeWorkflowExecution, workflowExecutionId, labelsMapped).
RunWith(tx).
Exec()
if err != nil {
return err
}
cronLabels, err := c.GetDBLabelsMapped(TypeCronWorkflow, cronWorkflow.ID)
if err != nil {
return err
}
labelsMapped := cronLabels[cronWorkflow.ID]
if _, err := c.InsertLabelsRunner(tx, TypeWorkflowExecution, workflowExecutionID, labelsMapped); err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
return err
}
func (c *Client) GetWorkflowExecution(namespace, uid string) (workflow *WorkflowExecution, err error) {
workflow = &WorkflowExecution{}
query, args, err := sb.Select(getWorkflowExecutionColumns("we", "")...).
query := sb.Select(getWorkflowExecutionColumns("we")...).
Columns(getWorkflowTemplateColumns("wt", "workflow_template")...).
Columns(`wtv.manifest "workflow_template.manifest"`).
From("workflow_executions we").
@@ -654,12 +644,13 @@ func (c *Client) GetWorkflowExecution(namespace, uid string) (workflow *Workflow
"wt.namespace": namespace,
"we.name": uid,
"we.is_archived": false,
}).
ToSql()
if err != nil {
return nil, err
}
if err := c.DB.Get(workflow, query, args...); err != nil {
})
if err := c.DB.Getx(workflow, query); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
@@ -677,7 +668,7 @@ func (c *Client) GetWorkflowExecution(namespace, uid string) (workflow *Workflow
version, err := strconv.ParseInt(
wf.ObjectMeta.Labels[workflowTemplateVersionLabelKey],
10,
32,
64,
)
if err != nil {
log.WithFields(log.Fields{
@@ -713,6 +704,7 @@ func (c *Client) GetWorkflowExecution(namespace, uid string) (workflow *Workflow
return
}
// ListWorkflowExecutions gets a list of WorkflowExecutions ordered by most recently created first.
func (c *Client) ListWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string, paginator *pagination.PaginationRequest) (workflows []*WorkflowExecution, err error) {
sb := workflowExecutionsSelectBuilder(namespace, workflowTemplateUID, workflowTemplateVersion).
OrderBy("we.created_at DESC")
@@ -725,10 +717,11 @@ func (c *Client) ListWorkflowExecutions(namespace, workflowTemplateUID, workflow
return
}
// CountWorkflowExecutions returns the number of workflow executions
func (c *Client) CountWorkflowExecutions(namespace, workflowTemplateUID, workflowTemplateVersion string) (count int, err error) {
err = workflowExecutionsSelectBuilderNoColumns(namespace, workflowTemplateUID, workflowTemplateVersion).
Columns("COUNT(*)").
RunWith(c.DB.DB).
RunWith(c.DB).
QueryRow().
Scan(&count)
@@ -1069,8 +1062,9 @@ func (c *Client) SuspendWorkflowExecution(namespace, uid string) (err error) {
return
}
// TerminateWorkflowExecution marks a workflows execution as terminated in DB and terminates the argo resource.
func (c *Client) TerminateWorkflowExecution(namespace, uid string) (err error) {
query, args, err := sb.Update("workflow_executions").
_, err = sb.Update("workflow_executions").
Set("phase", "Terminated").
Set("started_at", time.Time.UTC(time.Now())).
Set("finished_at", time.Time.UTC(time.Now())).
@@ -1078,16 +1072,12 @@ func (c *Client) TerminateWorkflowExecution(namespace, uid string) (err error) {
"uid": uid,
"namespace": namespace,
}).
ToSql()
RunWith(c.DB).
Exec()
if err != nil {
return err
}
if _, err := c.DB.Exec(query, args...); err != nil {
return err
}
err = argoutil.TerminateWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid)
return
@@ -1605,7 +1595,7 @@ func workflowExecutionsSelectBuilder(namespace, workflowTemplateUID, workflowTem
}
func (c *Client) getWorkflowExecutionAndTemplate(namespace string, uid string) (workflow *WorkflowExecution, err error) {
query, args, err := sb.Select(getWorkflowExecutionColumns("we", "")...).
sb := sb.Select(getWorkflowExecutionColumns("we", "")...).
Columns(getWorkflowTemplateColumns("wt", "workflow_template")...).
Columns(`wtv.manifest "workflow_template.manifest"`, `wtv.version "workflow_template.version"`).
From("workflow_executions we").
@@ -1615,16 +1605,10 @@ func (c *Client) getWorkflowExecutionAndTemplate(namespace string, uid string) (
"wt.namespace": namespace,
"we.name": uid,
"we.is_archived": false,
}).
ToSql()
if err != nil {
return nil, err
}
// TODO DB call
})
workflow = &WorkflowExecution{}
if err = c.DB.Get(workflow, query, args...); err != nil {
if err = c.DB.Getx(workflow, sb); err != nil {
return nil, err
}