Adding namespace to cron workflow related code.

This commit is contained in:
Aleksandr Melnikov
2020-05-21 12:47:42 -07:00
parent eed930c7f6
commit 92b9aba0d1
3 changed files with 13 additions and 7 deletions

View File

@@ -255,6 +255,7 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
"name": cronWorkflow.Name, "name": cronWorkflow.Name,
"workflow_template_version_id": workflowTemplate.WorkflowTemplateVersionId, "workflow_template_version_id": workflowTemplate.WorkflowTemplateVersionId,
"manifest": cronWorkflow.Manifest, "manifest": cronWorkflow.Manifest,
"namespace": namespace,
}). }).
Suffix("RETURNING id"). Suffix("RETURNING id").
RunWith(tx). RunWith(tx).
@@ -554,13 +555,13 @@ func (c *Client) cronWorkflowSelectBuilder(namespace string, workflowTemplateUid
} }
func (c *Client) cronWorkflowSelectBuilderNamespaceName(namespace string, uid string) sq.SelectBuilder { func (c *Client) cronWorkflowSelectBuilderNamespaceName(namespace string, uid string) sq.SelectBuilder {
sb := sb.Select("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id"). sb := sb.Select("cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id", "cw.namespace").
Columns("cw.manifest", "wtv.version"). Columns("cw.manifest", "wtv.version").
From("cron_workflows cw"). From("cron_workflows cw").
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id"). Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id"). Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{ Where(sq.Eq{
"wt.namespace": namespace, "cw.namespace": namespace,
"cw.name": uid, "cw.name": uid,
}) })
@@ -644,7 +645,7 @@ func (c *Client) GetCronWorkflowStatisticsForTemplates(workflowTemplates ...*Wor
} }
func cronWorkflowColumns(extraColumns ...string) []string { func cronWorkflowColumns(extraColumns ...string) []string {
results := []string{"cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id", "cw.manifest"} results := []string{"cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id", "cw.manifest", "cw.namespace"}
for _, str := range extraColumns { for _, str := range extraColumns {
results = append(results, str) results = append(results, str)
@@ -679,7 +680,8 @@ func (c *Client) SelectCronWorkflowWithExtraColumns(namespace, uid string, extra
func (c *Client) DeleteCronWorkflowDb(namespace, uid string) error { func (c *Client) DeleteCronWorkflowDb(namespace, uid string) error {
query, args, err := sb.Select("id").From("cron_workflows"). query, args, err := sb.Select("id").From("cron_workflows").
Where(sq.Eq{ Where(sq.Eq{
"uid": uid, "uid": uid,
"namespace": namespace,
}).ToSql() }).ToSql()
if err != nil { if err != nil {

View File

@@ -76,6 +76,7 @@ type CronWorkflow struct {
Version int64 Version int64
WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"` WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"`
Manifest string Manifest string
Namespace string `db:"namespace"`
} }
func (cw *CronWorkflow) GetParametersFromWorkflowSpec() ([]Parameter, error) { func (cw *CronWorkflow) GetParametersFromWorkflowSpec() ([]Parameter, error) {

View File

@@ -23,9 +23,10 @@ func apiCronWorkflow(cwf *v1.CronWorkflow) (cronWorkflow *api.CronWorkflow) {
} }
cronWorkflow = &api.CronWorkflow{ cronWorkflow = &api.CronWorkflow{
Name: cwf.Name, Name: cwf.Name,
Labels: converter.MappingToKeyValue(cwf.Labels), Labels: converter.MappingToKeyValue(cwf.Labels),
Manifest: cwf.Manifest, Manifest: cwf.Manifest,
Namespace: cwf.Namespace,
} }
if cwf.WorkflowExecution != nil { if cwf.WorkflowExecution != nil {
@@ -79,6 +80,7 @@ func (c *CronWorkflowServer) CreateCronWorkflow(ctx context.Context, req *api.Cr
WorkflowExecution: workflow, WorkflowExecution: workflow,
Manifest: req.CronWorkflow.Manifest, Manifest: req.CronWorkflow.Manifest,
Labels: converter.APIKeyValueToLabel(req.CronWorkflow.Labels), Labels: converter.APIKeyValueToLabel(req.CronWorkflow.Labels),
Namespace: req.Namespace,
} }
cwf, err := client.CreateCronWorkflow(req.Namespace, &cronWorkflow) cwf, err := client.CreateCronWorkflow(req.Namespace, &cronWorkflow)
@@ -125,6 +127,7 @@ func (c *CronWorkflowServer) UpdateCronWorkflow(ctx context.Context, req *api.Up
WorkflowExecution: workflow, WorkflowExecution: workflow,
Manifest: req.CronWorkflow.Manifest, Manifest: req.CronWorkflow.Manifest,
Labels: converter.APIKeyValueToLabel(req.CronWorkflow.Labels), Labels: converter.APIKeyValueToLabel(req.CronWorkflow.Labels),
Namespace: req.Namespace,
} }
cwf, err := client.UpdateCronWorkflow(req.Namespace, req.Uid, &cronWorkflow) cwf, err := client.UpdateCronWorkflow(req.Namespace, req.Uid, &cronWorkflow)