feat: updated crons to have a manifest in db

This commit is contained in:
Andrey Melnikov
2020-04-30 14:52:26 -07:00
parent b0571f69c9
commit 1fe4201704
8 changed files with 487 additions and 551 deletions

View File

@@ -48,21 +48,27 @@ func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow
})
}
if err := workflowTemplate.UpdateManifestParameters(workflow.Parameters); err != nil {
return nil, err
}
workflowTemplateManifest := workflowTemplate.GetManifestBytes()
if err := cronWorkflow.AddToManifestSpec("workflowSpec", string(workflowTemplateManifest)); err != nil {
return nil, err
}
if opts.Labels == nil {
opts.Labels = &map[string]string{}
}
(*opts.Labels)[workflowTemplateUIDLabelKey] = workflowTemplate.UID
(*opts.Labels)[workflowTemplateVersionLabelKey] = fmt.Sprint(workflowTemplate.Version)
var argoCronWorkflow wfv1.CronWorkflow
argoCronWorkflow.Spec.Schedule = cronWorkflow.Schedule
argoCronWorkflow.Spec.Timezone = cronWorkflow.Timezone
argoCronWorkflow.Spec.Suspend = cronWorkflow.Suspend
argoCronWorkflow.Spec.ConcurrencyPolicy = wfv1.ConcurrencyPolicy(cronWorkflow.ConcurrencyPolicy)
argoCronWorkflow.Spec.StartingDeadlineSeconds = cronWorkflow.StartingDeadlineSeconds
argoCronWorkflow.Spec.SuccessfulJobsHistoryLimit = cronWorkflow.SuccessfulJobsHistoryLimit
argoCronWorkflow.Spec.FailedJobsHistoryLimit = cronWorkflow.FailedJobsHistoryLimit
//UX prevents multiple workflows
var argoCronWorkflowSpec wfv1.CronWorkflowSpec
if err := yaml.Unmarshal([]byte(cronWorkflow.Manifest), &argoCronWorkflowSpec); err != nil {
return nil, err
}
argoCronWorkflow.Spec = argoCronWorkflowSpec
manifestBytes, err := workflowTemplate.GetWorkflowManifestBytes()
if err != nil {
return nil, err
@@ -77,82 +83,68 @@ func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow
}).Error("Error parsing workflow.")
return nil, err
}
if len(workflows) != 1 {
return nil, fmt.Errorf("more than one workflow in spec")
}
for _, wf := range workflows {
argoCronWorkflow.Spec.WorkflowSpec = wf.Spec
argoCreatedCronWorkflow, err := c.updateCronWorkflow(namespace, name, &workflowTemplate.ID, &wf, &argoCronWorkflow, opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"CronWorkflow": cronWorkflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
cronWorkflow.Name = argoCreatedCronWorkflow.Name
cronWorkflow.CreatedAt = argoCreatedCronWorkflow.CreationTimestamp.UTC()
cronWorkflow.UID = string(argoCreatedCronWorkflow.ObjectMeta.UID)
cronWorkflow.WorkflowExecution.WorkflowTemplate = workflowTemplate
// Manifests could get big, don't return them in this case.
cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = ""
wf := workflows[0]
argoCronWorkflow.Spec.WorkflowSpec = wf.Spec
_, err = c.updateCronWorkflow(namespace, name, &workflowTemplate.ID, &wf, &argoCronWorkflow, opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"CronWorkflow": cronWorkflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
cronWorkflow.WorkflowExecution.WorkflowTemplate = workflowTemplate
// Manifests could get big, don't return them in this case.
cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = ""
workflowSpec, err := yaml.Marshal(argoCreatedCronWorkflow.Spec.WorkflowSpec)
if err != nil {
return nil, err
}
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
_, err = sb.Update("cron_workflows").
SetMap(sq.Eq{
"manifest": cronWorkflow.Manifest,
}).Where(sq.Eq{
"id": cronWorkflow.ID,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
_, err = sb.Update("cron_workflows").
SetMap(sq.Eq{
"schedule": cronWorkflow.Schedule,
"timezone": cronWorkflow.Timezone,
"suspend": cronWorkflow.Suspend,
"concurrency_policy": cronWorkflow.ConcurrencyPolicy,
"starting_deadline_seconds": cronWorkflow.StartingDeadlineSeconds,
"successful_jobs_history_limit": cronWorkflow.SuccessfulJobsHistoryLimit,
"failed_jobs_history_limit": cronWorkflow.FailedJobsHistoryLimit,
"workflow_spec": workflowSpec,
}).Where(sq.Eq{
"id": cronWorkflow.ID,
}).
// delete all labels then replace
_, err = sb.Delete("labels").
Where(sq.Eq{
"resource": TypeCronWorkflow,
"resource_id": cronWorkflow.ID,
}).RunWith(tx).
Exec()
if err != nil {
return nil, err
}
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
// delete all labels then replace
_, err = sb.Delete("labels").
Where(sq.Eq{
"resource": TypeCronWorkflow,
"resource_id": cronWorkflow.ID,
}).RunWith(tx).
Exec()
if err != nil {
return nil, err
}
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil
}
return nil, nil
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil
}
func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow) (*CronWorkflow, error) {
@@ -167,7 +159,7 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
return nil, util.NewUserError(codes.NotFound, "Error with getting workflow template.")
}
// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
//// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
opts := &WorkflowExecutionOptions{}
re, _ := regexp.Compile(`[^a-zA-Z0-9-]{1,}`)
opts.GenerateName = strings.ToLower(re.ReplaceAllString(workflowTemplate.Name, `-`)) + "-"
@@ -178,6 +170,16 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
})
}
if err := workflowTemplate.UpdateManifestParameters(workflow.Parameters); err != nil {
return nil, err
}
workflowTemplateManifest := workflowTemplate.GetManifestBytes()
if err := cronWorkflow.AddToManifestSpec("workflowSpec", string(workflowTemplateManifest)); err != nil {
return nil, err
}
if opts.Labels == nil {
opts.Labels = &map[string]string{}
}
@@ -186,14 +188,12 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
label.MergeLabelsPrefix(*opts.Labels, workflow.Labels, label.TagPrefix)
var argoCronWorkflow wfv1.CronWorkflow
argoCronWorkflow.Spec.Schedule = cronWorkflow.Schedule
argoCronWorkflow.Spec.Timezone = cronWorkflow.Timezone
argoCronWorkflow.Spec.Suspend = cronWorkflow.Suspend
argoCronWorkflow.Spec.ConcurrencyPolicy = wfv1.ConcurrencyPolicy(cronWorkflow.ConcurrencyPolicy)
argoCronWorkflow.Spec.StartingDeadlineSeconds = cronWorkflow.StartingDeadlineSeconds
argoCronWorkflow.Spec.SuccessfulJobsHistoryLimit = cronWorkflow.SuccessfulJobsHistoryLimit
argoCronWorkflow.Spec.FailedJobsHistoryLimit = cronWorkflow.FailedJobsHistoryLimit
//UX prevents multiple workflows
var argoCronWorkflowSpec wfv1.CronWorkflowSpec
if err := yaml.Unmarshal([]byte(cronWorkflow.Manifest), &argoCronWorkflowSpec); err != nil {
return nil, err
}
argoCronWorkflow.Spec = argoCronWorkflowSpec
manifestBytes, err := workflowTemplate.GetWorkflowManifestBytes()
if err != nil {
return nil, err
@@ -208,101 +208,74 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
}).Error("Error parsing workflow.")
return nil, err
}
for _, wf := range workflows {
argoCronWorkflow.Spec.WorkflowSpec = wf.Spec
argoCreatedCronWorkflow, err := c.createCronWorkflow(namespace, &workflowTemplate.ID, &wf, &argoCronWorkflow, opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"CronWorkflow": cronWorkflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
cronWorkflow.Name = argoCreatedCronWorkflow.Name
cronWorkflow.CreatedAt = argoCreatedCronWorkflow.CreationTimestamp.UTC()
cronWorkflow.UID = string(argoCreatedCronWorkflow.ObjectMeta.UID)
cronWorkflow.WorkflowExecution.WorkflowTemplate = workflowTemplate
// Manifests could get big, don't return them in this case.
cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = ""
workflowSpec, err := yaml.Marshal(argoCreatedCronWorkflow.Spec.WorkflowSpec)
if err != nil {
return nil, err
}
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("cron_workflows").
SetMap(sq.Eq{
"uid": cronWorkflow.UID,
"name": cronWorkflow.Name,
"workflow_template_version_id": workflowTemplate.WorkflowTemplateVersionId,
"schedule": cronWorkflow.Schedule,
"timezone": cronWorkflow.Timezone,
"suspend": cronWorkflow.Suspend,
"concurrency_policy": cronWorkflow.ConcurrencyPolicy,
"starting_deadline_seconds": cronWorkflow.StartingDeadlineSeconds,
"successful_jobs_history_limit": cronWorkflow.SuccessfulJobsHistoryLimit,
"failed_jobs_history_limit": cronWorkflow.FailedJobsHistoryLimit,
"workflow_spec": workflowSpec,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&cronWorkflow.ID)
if err != nil {
return nil, err
}
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil
if len(workflows) != 1 {
return nil, fmt.Errorf("more than one workflow in spec")
}
return nil, nil
wf := workflows[0]
argoCronWorkflow.Spec.WorkflowSpec = wf.Spec
argoCreatedCronWorkflow, err := c.createCronWorkflow(namespace, &workflowTemplate.ID, &wf, &argoCronWorkflow, opts)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"CronWorkflow": cronWorkflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
cronWorkflow.Name = argoCreatedCronWorkflow.Name
cronWorkflow.CreatedAt = argoCreatedCronWorkflow.CreationTimestamp.UTC()
cronWorkflow.UID = string(argoCreatedCronWorkflow.ObjectMeta.UID)
cronWorkflow.WorkflowExecution.WorkflowTemplate = workflowTemplate
// Manifests could get big, don't return them in this case.
cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = ""
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("cron_workflows").
SetMap(sq.Eq{
"uid": cronWorkflow.UID,
"name": cronWorkflow.Name,
"workflow_template_version_id": workflowTemplate.WorkflowTemplateVersionId,
"manifest": cronWorkflow.Manifest,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&cronWorkflow.ID)
if err != nil {
return nil, err
}
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil
}
func (c *Client) GetCronWorkflow(namespace, name string) (cronWorkflow *CronWorkflow, err error) {
cwf, err := c.ArgoprojV1alpha1().CronWorkflows(namespace).Get(name, metav1.GetOptions{})
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("CronWorkflow not found.")
return nil, util.NewUserError(codes.NotFound, "CronWorkflow not found.")
}
cronWorkflow = &CronWorkflow{}
cronWorkflow = &CronWorkflow{
CreatedAt: cwf.CreationTimestamp.UTC(),
UID: string(cwf.UID),
Name: cwf.Name,
Schedule: cwf.Spec.Schedule,
Timezone: cwf.Spec.Timezone,
Suspend: cwf.Spec.Suspend,
ConcurrencyPolicy: string(cwf.Spec.ConcurrencyPolicy),
StartingDeadlineSeconds: cwf.Spec.StartingDeadlineSeconds,
SuccessfulJobsHistoryLimit: cwf.Spec.SuccessfulJobsHistoryLimit,
FailedJobsHistoryLimit: cwf.Spec.FailedJobsHistoryLimit,
WorkflowExecution: nil,
}
err = c.cronWorkflowSelectBuilderNamespaceName(namespace, name).
RunWith(c.DB).
QueryRow().
Scan(cronWorkflow)
return
}
@@ -387,19 +360,6 @@ func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagina
if err := c.DB.Select(&cronWorkflows, query, args...); err != nil {
return nil, err
}
// @todo remove this once we get the manifest in the db.
for _, cwf := range cronWorkflows {
parameters, err := cwf.GetParametersFromWorkflowSpec()
if err != nil {
continue
}
cwf.WorkflowExecution = &WorkflowExecution{
Parameters: parameters,
}
}
labelsMap, err := c.GetDbLabelsMapped(TypeCronWorkflow, CronWorkflowsToIds(cronWorkflows)...)
if err != nil {
log.WithFields(log.Fields{
@@ -587,10 +547,7 @@ func (c *Client) createCronWorkflow(namespace string, workflowTemplateId *uint64
}
func (c *Client) TerminateCronWorkflow(namespace, name string) (err error) {
query, args, err := sb.Select().
Columns("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id").
Columns("cw.schedule", "cw.timezone", "cw.suspend", "cw.concurrency_policy", "cw.starting_deadline_seconds").
Columns("cw.successful_jobs_history_limit", "cw.failed_jobs_history_limit", "cw.workflow_spec", "wtv.version").
query, args, err := sb.Select(cronWorkflowColumns("wtv.version")...).
From("cron_workflows cw").
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
@@ -645,9 +602,21 @@ func unmarshalCronWorkflows(cwfBytes []byte, strict bool) (cwfs wfv1.CronWorkflo
func (c *Client) cronWorkflowSelectBuilder(namespace string, workflowTemplateUid string) sq.SelectBuilder {
sb := c.cronWorkflowSelectBuilderNoColumns(namespace, workflowTemplateUid).
Columns("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id").
Columns("cw.schedule", "cw.timezone", "cw.suspend", "cw.concurrency_policy", "cw.starting_deadline_seconds").
Columns("cw.successful_jobs_history_limit", "cw.failed_jobs_history_limit", "cw.workflow_spec", "wtv.version")
Columns(cronWorkflowColumns("wtv.version")...)
return sb
}
func (c *Client) cronWorkflowSelectBuilderNamespaceName(namespace string, name string) sq.SelectBuilder {
sb := sb.Select("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id").
Columns("cw.manifest", "wtv.version").
From("cron_workflows cw").
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{
"wt.namespace": namespace,
"cw.name": name,
})
return sb
}
@@ -727,3 +696,13 @@ func (c *Client) GetCronWorkflowStatisticsForTemplates(workflowTemplates ...*Wor
return
}
func cronWorkflowColumns(extraColumns ...string) []string {
results := []string{"cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id", "cw.manifest"}
for _, str := range extraColumns {
results = append(results, str)
}
return results
}