update: fixed issues where workspace template version labels were not being correctly set/got. Also updated generic server endpoints with new logic

This commit is contained in:
Andrey Melnikov
2020-08-09 13:45:24 -07:00
parent 2f5720aefc
commit c81c2d7672
5 changed files with 79 additions and 105 deletions

View File

@@ -7,44 +7,45 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/mapping"
"github.com/onepanelio/core/pkg/util/types"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func (c *Client) ListLabels(resource string, uid string) (labels []*Label, err error) {
sb := sb.Select("l.id", "l.created_at", "l.key", "l.value", "l.resource", "l.resource_id").
From("labels l").
Where(sq.Eq{
"resource": resource,
}).
OrderBy("l.created_at")
sb := sb.Select("labels").
From(TypeToTableName(resource))
switch resource {
case TypeWorkflowTemplate:
sb = sb.Join("workflow_templates wt ON wt.id = l.resource_id").
Where(sq.Eq{"wt.uid": uid})
sb = sb.Where(sq.Eq{"uid": uid})
case TypeWorkflowExecution:
sb = sb.Join("workflow_executions we ON we.id = l.resource_id").
Where(sq.Eq{"we.uid": uid})
sb = sb.Where(sq.Eq{"uid": uid})
case TypeCronWorkflow:
sb = sb.Join("cron_workflows cw ON cw.id = l.resource_id").
Where(sq.Eq{"cw.uid": uid})
sb = sb.Where(sq.Eq{"uid": uid})
case TypeWorkspace:
sb = sb.Join("workspaces ws ON ws.id = l.resource_id").
Where(sq.And{
sq.Eq{"ws.uid": uid},
sq.NotEq{"ws.phase": "Terminated"},
sb = sb.Where(sq.And{
sq.Eq{"uid": uid},
sq.NotEq{"phase": "Terminated"},
})
default:
return nil, fmt.Errorf("unsupported label resource %v", resource)
}
query, args, sqlErr := sb.ToSql()
if sqlErr != nil {
err = sqlErr
result := types.JSONLabels{}
err = c.DB.Getx(&result, sb)
if err != nil {
return
}
err = c.DB.Select(&labels, query, args...)
for key, value := range result {
newLabel := &Label{
Key: key,
Value: value,
Resource: resource,
}
labels = append(labels, newLabel)
}
return
}
@@ -138,50 +139,25 @@ func (c *Client) ReplaceLabels(namespace, resource, uid string, keyValues map[st
}
}
resourceID := uint64(0)
err = sb.Select("id").
From(tableName).
_, err = sb.Update(tableName).
SetMap(sq.Eq{
"labels": types.JSONLabels(keyValues),
}).
Where(whereCondition).
RunWith(tx).
QueryRow().
Scan(&resourceID)
if err != nil {
return err
}
return c.ReplaceLabelsUsingKnownID(namespace, resource, resourceID, uid, keyValues)
}
func (c *Client) ReplaceLabelsUsingKnownID(namespace, resource string, resourceID uint64, uid string, keyValues map[string]string) error {
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = sb.Delete("labels").
Where(sq.Eq{
"resource": resource,
"resource_id": resourceID,
}).
RunWith(tx).
Exec()
if err != nil {
return err
}
if len(keyValues) > 0 {
_, err = c.InsertLabelsBuilder(resource, resourceID, keyValues).
RunWith(tx).
Exec()
if err != nil {
return err
}
}
if err := tx.Commit(); err != nil {
return err
}
return c.ReplaceLabelsUsingKnownID(namespace, resource, uid, keyValues)
}
func (c *Client) ReplaceLabelsUsingKnownID(namespace, resource string, uid string, keyValues map[string]string) error {
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
if err != nil {
return err

View File

@@ -12,6 +12,7 @@ import (
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/pagination"
"github.com/onepanelio/core/pkg/util/ptr"
"github.com/onepanelio/core/pkg/util/types"
uid2 "github.com/onepanelio/core/pkg/util/uid"
"golang.org/x/net/context"
"gopkg.in/yaml.v2"
@@ -330,7 +331,7 @@ func (c *Client) ArchiveWorkflowExecution(namespace, uid string) error {
// createWorkflow creates the workflow in the database and argo.
// Name is == to UID, no user friendly name.
// Workflow execution name == uid, example: name = my-friendly-wf-name-8skjz, uid = my-friendly-wf-name-8skjz
func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, workflowTemplateVersionID uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions) (createdWorkflow *WorkflowExecution, err error) {
func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, workflowTemplateVersionID uint64, wf *wfv1.Workflow, opts *WorkflowExecutionOptions, labels types.JSONLabels) (createdWorkflow *WorkflowExecution, err error) {
if opts == nil {
opts = &WorkflowExecutionOptions{}
}
@@ -396,6 +397,7 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
WorkflowTemplateVersionID: workflowTemplateVersionID,
},
Parameters: opts.Parameters,
Labels: labels,
}
if err = createdWorkflow.GenerateUID(createdArgoWorkflow.Name); err != nil {
@@ -483,7 +485,7 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return nil, fmt.Errorf("workflow Template contained more than 1 workflow execution")
}
createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionID, &workflows[0], opts)
createdWorkflow, err := c.createWorkflow(namespace, workflowTemplate.ID, workflowTemplate.WorkflowTemplateVersionID, &workflows[0], opts, workflow.Labels)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
@@ -493,10 +495,6 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return nil, err
}
if _, err := c.InsertLabels(TypeWorkflowExecution, createdWorkflow.ID, workflow.Labels); err != nil {
return nil, err
}
workflow.ID = createdWorkflow.ID
workflow.Name = createdWorkflow.Name
workflow.CreatedAt = createdWorkflow.CreatedAt.UTC()

View File

@@ -23,25 +23,26 @@ import (
)
// createWorkspaceTemplateVersionDB creates a workspace template version in the database.
func createWorkspaceTemplateVersionDB(tx *sql.Tx, workspaceTemplateID uint64, version int64, manifest string, isLatest bool) (id uint64, err error) {
func createWorkspaceTemplateVersionDB(tx sq.BaseRunner, template *WorkspaceTemplate) (err error) {
err = sb.Insert("workspace_template_versions").
SetMap(sq.Eq{
"version": version,
"is_latest": isLatest,
"manifest": manifest,
"workspace_template_id": workspaceTemplateID,
"version": template.Version,
"is_latest": template.IsLatest,
"manifest": template.Manifest,
"workspace_template_id": template.ID,
"labels": template.Labels,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&id)
Scan(&template.ID)
return
}
// markWorkspaceTemplateVersionsOutdatedDB updates all of the workspace template versions in db so is_latest is false
// given the workspaceTemplateID
func markWorkspaceTemplateVersionsOutdatedDB(tx *sql.Tx, workspaceTemplateID uint64) (err error) {
func markWorkspaceTemplateVersionsOutdatedDB(tx sq.BaseRunner, workspaceTemplateID uint64) (err error) {
_, err = sb.Update("workspace_template_versions").
SetMap(sq.Eq{"is_latest": false}).
Where(sq.Eq{
@@ -55,15 +56,22 @@ func markWorkspaceTemplateVersionsOutdatedDB(tx *sql.Tx, workspaceTemplateID uin
}
// createLatestWorkspaceTemplateVersionDB creates a new workspace template version and marks all previous versions as not latest.
func createLatestWorkspaceTemplateVersionDB(tx *sql.Tx, workspaceTemplateID uint64, version int64, manifest string) (id uint64, err error) {
err = markWorkspaceTemplateVersionsOutdatedDB(tx, workspaceTemplateID)
func createLatestWorkspaceTemplateVersionDB(tx sq.BaseRunner, template *WorkspaceTemplate) (err error) {
if template == nil {
return fmt.Errorf("workspaceTemplate is nil")
}
if template.ID < 1 {
return fmt.Errorf("workspaceTemplate.ID is not set")
}
err = markWorkspaceTemplateVersionsOutdatedDB(tx, template.ID)
if err != nil {
return
}
id, err = createWorkspaceTemplateVersionDB(tx, workspaceTemplateID, version, manifest, true)
template.IsLatest = true
return
return createWorkspaceTemplateVersionDB(tx, template)
}
func parseWorkspaceSpec(template string) (spec *WorkspaceSpec, err error) {
@@ -723,6 +731,7 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
"name": workspaceTemplate.Name,
"namespace": namespace,
"workflow_template_id": workspaceTemplate.WorkflowTemplate.ID,
"labels": workspaceTemplate.Labels,
}).
Suffix("RETURNING id, created_at").
RunWith(tx).
@@ -738,7 +747,7 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
return nil, util.NewUserErrorWrap(err, errorMsg) //return the source error
}
workspaceTemplateVersionID, err := createWorkspaceTemplateVersionDB(tx, workspaceTemplate.ID, workspaceTemplate.Version, workspaceTemplate.Manifest, true)
err = createWorkspaceTemplateVersionDB(tx, workspaceTemplate)
if err != nil {
errorMsg := "Error with insert into workspace_templates_versions. "
_, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
@@ -749,11 +758,6 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
return nil, util.NewUserErrorWrap(err, errorMsg) // return the source error
}
_, err = c.InsertLabelsRunner(tx, TypeWorkspaceTemplateVersion, workspaceTemplateVersionID, workspaceTemplate.Labels)
if err != nil {
return nil, err
}
if err = tx.Commit(); err != nil {
_, errArchive := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
if errArchive != nil {
@@ -776,12 +780,16 @@ func (c *Client) workspaceTemplatesSelectBuilder(namespace string) sq.SelectBuil
}
func (c *Client) workspaceTemplateVersionsSelectBuilder(namespace, uid string) sq.SelectBuilder {
sb := c.workspaceTemplatesSelectBuilder(namespace).
Columns("wtv.id \"workspace_template_version_id\"", "wtv.created_at \"created_at\"", "wtv.version", "wtv.manifest", "wft.id \"workflow_template.id\"", "wft.uid \"workflow_template.uid\"", "wftv.version \"workflow_template.version\"", "wftv.manifest \"workflow_template.manifest\"").
sb := sb.Select(getWorkspaceTemplateColumnsWithoutLabels("wt")...).
From("workspace_templates wt").
Columns("wtv.id \"workspace_template_version_id\"", "wtv.created_at \"created_at\"", "wtv.version", "wtv.manifest", "wtv.labels", "wft.id \"workflow_template.id\"", "wft.uid \"workflow_template.uid\"", "wftv.version \"workflow_template.version\"", "wftv.manifest \"workflow_template.manifest\"").
Join("workspace_template_versions wtv ON wtv.workspace_template_id = wt.id").
Join("workflow_templates wft ON wft.id = wt.workflow_template_id").
Join("workflow_template_versions wftv ON wftv.workflow_template_id = wft.id").
Where(sq.Eq{"wt.uid": uid})
Where(sq.Eq{
"wt.uid": uid,
"wt.namespace": namespace,
})
return sb
}
@@ -997,12 +1005,18 @@ func (c *Client) UpdateWorkspaceTemplate(namespace string, workspaceTemplate *Wo
}
defer tx.Rollback()
workspaceTemplateVersionID, err := createLatestWorkspaceTemplateVersionDB(tx, workspaceTemplate.ID, workspaceTemplate.Version, workspaceTemplate.Manifest)
if err != nil {
if err := createLatestWorkspaceTemplateVersionDB(tx, workspaceTemplate); err != nil {
return nil, err
}
_, err = c.InsertLabelsRunner(tx, TypeWorkspaceTemplateVersion, workspaceTemplateVersionID, workspaceTemplate.Labels)
_, err = sb.Update("workspace_templates").
Set("labels", workspaceTemplate.Labels).
Where(sq.Eq{
"uid": workspaceTemplate.UID,
"namespace": workspaceTemplate.Namespace,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
@@ -1030,7 +1044,6 @@ func (c *Client) ListWorkspaceTemplates(namespace string, paginator *pagination.
}
// ListWorkspaceTemplateVersions returns an array of WorkspaceTemplates with the version information loaded. Latest id is first.
// Labels are also loaded.
func (c *Client) ListWorkspaceTemplateVersions(namespace, uid string) (workspaceTemplates []*WorkspaceTemplate, err error) {
sb := c.workspaceTemplateVersionsSelectBuilder(namespace, uid).
Options("DISTINCT ON (wtv.version) wtv.version,").
@@ -1040,20 +1053,7 @@ func (c *Client) ListWorkspaceTemplateVersions(namespace, uid string) (workspace
}).
OrderBy("wtv.version DESC")
if err = c.DB.Selectx(&workspaceTemplates, sb); err != nil {
return
}
labelsMap, err := c.GetDBLabelsMapped(TypeWorkspaceTemplateVersion, WorkspaceTemplatesToVersionIDs(workspaceTemplates)...)
if err != nil {
return nil, err
}
for _, workspaceTemplate := range workspaceTemplates {
if labels, ok := labelsMap[workspaceTemplate.WorkspaceTemplateVersionID]; ok {
workspaceTemplate.Labels = labels
}
}
err = c.DB.Selectx(&workspaceTemplates, sb)
return
}

View File

@@ -102,3 +102,10 @@ func getWorkspaceTemplateColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "uid", "created_at", "modified_at", "name", "namespace", "is_archived", "workflow_template_id", "labels"}
return sql.FormatColumnSelect(columns, aliasAndDestination...)
}
// getWorkspaceTemplateColumnsWithoutLabels returns all of the columns for workspace template, excluding labels, modified by alias, destination.
// see formatColumnSelect
func getWorkspaceTemplateColumnsWithoutLabels(aliasAndDestination ...string) []string {
columns := []string{"id", "uid", "created_at", "modified_at", "name", "namespace", "is_archived", "workflow_template_id"}
return sql.FormatColumnSelect(columns, aliasAndDestination...)
}

View File

@@ -179,13 +179,6 @@ func (s *WorkflowServer) GetWorkflowExecution(ctx context.Context, req *api.GetW
return nil, err
}
mappedLabels, err := client.GetDBLabelsMapped(v1.TypeWorkflowExecution, wf.ID)
if err != nil {
return nil, err
}
if labels, ok := mappedLabels[wf.ID]; ok {
wf.Labels = labels
}
wf.Namespace = req.Namespace
webRouter, err := client.GetWebRouter()