fix: issues with cron workflow labels not being set/saved.

This commit is contained in:
Andrey Melnikov
2020-04-29 14:01:37 -07:00
parent 92bde140b9
commit b0571f69c9
6 changed files with 171 additions and 59 deletions

View File

@@ -17,6 +17,15 @@ import (
)
func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow *CronWorkflow) (*CronWorkflow, error) {
err := c.cronWorkflowSelectBuilderNoColumns(namespace, cronWorkflow.WorkflowExecution.WorkflowTemplate.UID).
Columns("cw.id").
RunWith(c.DB).
QueryRow().
Scan(&cronWorkflow.ID)
if err != nil {
return nil, err
}
workflow := cronWorkflow.WorkflowExecution
workflowTemplate, err := c.GetWorkflowTemplate(namespace, workflow.WorkflowTemplate.UID, workflow.WorkflowTemplate.Version)
if err != nil {
@@ -91,6 +100,13 @@ func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow
if err != nil {
return nil, err
}
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
_, err = sb.Update("cron_workflows").
SetMap(sq.Eq{
"schedule": cronWorkflow.Schedule,
@@ -101,15 +117,39 @@ func (c *Client) UpdateCronWorkflow(namespace string, name string, cronWorkflow
"successful_jobs_history_limit": cronWorkflow.SuccessfulJobsHistoryLimit,
"failed_jobs_history_limit": cronWorkflow.FailedJobsHistoryLimit,
"workflow_spec": workflowSpec,
}).
Suffix("RETURNING id").
RunWith(c.DB.DB).
}).Where(sq.Eq{
"id": cronWorkflow.ID,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
// delete all labels then replace
_, err = sb.Delete("labels").
Where(sq.Eq{
"resource": TypeCronWorkflow,
"resource_id": cronWorkflow.ID,
}).RunWith(tx).
Exec()
if err != nil {
return nil, err
}
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil
}
return nil, nil
@@ -191,7 +231,14 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
if err != nil {
return nil, err
}
_, err = sb.Insert("cron_workflows").
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("cron_workflows").
SetMap(sq.Eq{
"uid": cronWorkflow.UID,
"name": cronWorkflow.Name,
@@ -206,13 +253,27 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
"workflow_spec": workflowSpec,
}).
Suffix("RETURNING id").
RunWith(c.DB.DB).
Exec()
RunWith(tx).
QueryRow().
Scan(&cronWorkflow.ID)
if err != nil {
return nil, err
}
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil
}
return nil, nil
@@ -327,6 +388,7 @@ func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagina
return nil, err
}
// @todo remove this once we get the manifest in the db.
for _, cwf := range cronWorkflows {
parameters, err := cwf.GetParametersFromWorkflowSpec()
if err != nil {
@@ -338,6 +400,19 @@ func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagina
}
}
labelsMap, err := c.GetDbLabelsMapped(TypeCronWorkflow, CronWorkflowsToIds(cronWorkflows)...)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Unable to get Workflow Template Labels")
return nil, err
}
for _, resource := range cronWorkflows {
resource.Labels = labelsMap[resource.ID]
}
return
}