update: cron workflows to use new labels

This commit is contained in:
Andrey Melnikov
2020-08-09 16:15:05 -07:00
parent 55eeb90e2a
commit 8586639a6c
7 changed files with 28 additions and 261 deletions

View File

@@ -6,10 +6,12 @@ ALTER TABLE workflow_executions ADD COLUMN labels JSONB DEFAULT '{}'::JSONB;
ALTER TABLE workspaces ADD COLUMN labels JSONB DEFAULT '{}'::JSONB; ALTER TABLE workspaces ADD COLUMN labels JSONB DEFAULT '{}'::JSONB;
ALTER TABLE workspace_templates ADD COLUMN labels JSONB DEFAULT '{}'::JSONB; ALTER TABLE workspace_templates ADD COLUMN labels JSONB DEFAULT '{}'::JSONB;
ALTER TABLE workspace_template_versions ADD COLUMN labels JSONB DEFAULT '{}'::JSONB; ALTER TABLE workspace_template_versions ADD COLUMN labels JSONB DEFAULT '{}'::JSONB;
ALTER TABLE cron_workflows ADD COLUMN labels JSONB DEFAULT '{}'::JSONB;
-- +goose Down -- +goose Down
-- SQL in this section is executed when the migration is rolled back. -- SQL in this section is executed when the migration is rolled back.
ALTER TABLE cron_workflows DROP COLUMN labels;
ALTER TABLE workspace_template_versions DROP COLUMN labels; ALTER TABLE workspace_template_versions DROP COLUMN labels;
ALTER TABLE workspace_templates DROP COLUMN labels; ALTER TABLE workspace_templates DROP COLUMN labels;
ALTER TABLE workspaces DROP COLUMN labels; ALTER TABLE workspaces DROP COLUMN labels;

View File

@@ -105,45 +105,17 @@ func (c *Client) UpdateCronWorkflow(namespace string, uid string, cronWorkflow *
// Manifests could get big, don't return them in this case. // Manifests could get big, don't return them in this case.
cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = "" cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = ""
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
_, err = sb.Update("cron_workflows"). _, err = sb.Update("cron_workflows").
SetMap(sq.Eq{ SetMap(sq.Eq{
"manifest": cronWorkflow.Manifest, "manifest": cronWorkflow.Manifest,
}).Where(sq.Eq{ "labels": cronWorkflow.Labels,
"id": cronWorkflow.ID, }).Where(sq.Eq{"id": cronWorkflow.ID}).
}). RunWith(c.DB).
RunWith(tx).
Exec() Exec()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// delete all labels then replace
_, err = sb.Delete("labels").
Where(sq.Eq{
"resource": TypeCronWorkflow,
"resource_id": cronWorkflow.ID,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
_, err = c.InsertLabelsRunner(tx, TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil return cronWorkflow, nil
} }
@@ -159,7 +131,7 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
return nil, util.NewUserError(codes.NotFound, "Error with getting workflow template.") return nil, util.NewUserError(codes.NotFound, "Error with getting workflow template.")
} }
//// TODO: Need to pull system parameters from k8s config/secret here, example: HOST // TODO: Need to pull system parameters from k8s config/secret here, example: HOST
opts := &WorkflowExecutionOptions{ opts := &WorkflowExecutionOptions{
Labels: make(map[string]string), Labels: make(map[string]string),
} }
@@ -239,12 +211,6 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
// Manifests could get big, don't return them in this case. // Manifests could get big, don't return them in this case.
cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = "" cronWorkflow.WorkflowExecution.WorkflowTemplate.Manifest = ""
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("cron_workflows"). err = sb.Insert("cron_workflows").
SetMap(sq.Eq{ SetMap(sq.Eq{
"uid": cronWorkflow.UID, "uid": cronWorkflow.UID,
@@ -253,38 +219,25 @@ func (c *Client) CreateCronWorkflow(namespace string, cronWorkflow *CronWorkflow
"manifest": cronWorkflow.Manifest, "manifest": cronWorkflow.Manifest,
"namespace": namespace, "namespace": namespace,
"is_archived": false, "is_archived": false,
"labels": cronWorkflow.Labels,
}). }).
Suffix("RETURNING id"). Suffix("RETURNING id").
RunWith(tx). RunWith(c.DB).
QueryRow(). QueryRow().
Scan(&cronWorkflow.ID) Scan(&cronWorkflow.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(cronWorkflow.Labels) > 0 {
_, err = c.InsertLabelsBuilder(TypeCronWorkflow, cronWorkflow.ID, cronWorkflow.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil {
return nil, err
}
return cronWorkflow, nil return cronWorkflow, nil
} }
// GetCronWorkflow gets information about a cron workflow uniquely identified by a namespace/uid
func (c *Client) GetCronWorkflow(namespace, uid string) (cronWorkflow *CronWorkflow, err error) { func (c *Client) GetCronWorkflow(namespace, uid string) (cronWorkflow *CronWorkflow, err error) {
cronWorkflow = &CronWorkflow{} cronWorkflow = &CronWorkflow{}
err = c.cronWorkflowSelectBuilderNoColumns(namespace, uid). sb := c.cronWorkflowSelectBuilder(namespace, uid)
RunWith(c.DB). err = c.Getx(cronWorkflow, sb)
QueryRow().
Scan(cronWorkflow)
return return
} }
@@ -355,6 +308,7 @@ func (c *Client) DeleteCronWorkflowLabel(namespace, name string, keysToDelete ..
return wf.Labels, nil return wf.Labels, nil
} }
// ListCronWorkflows selects all of the cron workflows for the given namespace and workflow template uid
func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagination *pagination.PaginationRequest) (cronWorkflows []*CronWorkflow, err error) { func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagination *pagination.PaginationRequest) (cronWorkflows []*CronWorkflow, err error) {
sb := c.cronWorkflowSelectBuilder(namespace, workflowTemplateUID). sb := c.cronWorkflowSelectBuilder(namespace, workflowTemplateUID).
OrderBy("cw.created_at DESC") OrderBy("cw.created_at DESC")
@@ -363,18 +317,6 @@ func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagina
if err := c.DB.Selectx(&cronWorkflows, sb); err != nil { if err := c.DB.Selectx(&cronWorkflows, sb); err != nil {
return nil, err return nil, err
} }
labelsMap, err := c.GetDBLabelsMapped(TypeCronWorkflow, CronWorkflowsToIDs(cronWorkflows)...)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Unable to get Cron Workflow Labels")
return nil, err
}
for _, resource := range cronWorkflows {
resource.Labels = labelsMap[resource.ID]
}
return return
} }

View File

@@ -3,6 +3,7 @@ package v1
import ( import (
"encoding/json" "encoding/json"
"github.com/onepanelio/core/pkg/util/mapping" "github.com/onepanelio/core/pkg/util/mapping"
"github.com/onepanelio/core/pkg/util/types"
"github.com/onepanelio/core/util/sql" "github.com/onepanelio/core/util/sql"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"time" "time"
@@ -17,7 +18,7 @@ type CronWorkflow struct {
Name string Name string
GenerateName string GenerateName string
WorkflowExecution *WorkflowExecution WorkflowExecution *WorkflowExecution
Labels map[string]string Labels types.JSONLabels
Version int64 Version int64
WorkflowTemplateVersionID uint64 `db:"workflow_template_version_id"` WorkflowTemplateVersionID uint64 `db:"workflow_template_version_id"`
Manifest string Manifest string
@@ -85,7 +86,7 @@ func (cw *CronWorkflow) AddToManifestSpec(key, manifest string) error {
// getCronWorkflowColumns returns all of the columns for cronWorkflow modified by alias, destination. // getCronWorkflowColumns returns all of the columns for cronWorkflow modified by alias, destination.
// see formatColumnSelect // see formatColumnSelect
func getCronWorkflowColumns(aliasAndDestination ...string) []string { func getCronWorkflowColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "created_at", "uid", "name", "workflow_template_version_id", "manifest", "namespace"} columns := []string{"id", "created_at", "uid", "name", "workflow_template_version_id", "manifest", "namespace", "labels"}
return sql.FormatColumnSelect(columns, aliasAndDestination...) return sql.FormatColumnSelect(columns, aliasAndDestination...)
} }

View File

@@ -1,7 +1,6 @@
package v1 package v1
import ( import (
"database/sql"
"fmt" "fmt"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
@@ -51,52 +50,6 @@ func (c *Client) ListLabels(resource string, uid string) (labels []*Label, err e
} }
func (c *Client) AddLabels(namespace, resource, uid string, keyValues map[string]string) error { func (c *Client) AddLabels(namespace, resource, uid string, keyValues map[string]string) error {
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
tableName := TypeToTableName(resource)
if tableName == "" {
return fmt.Errorf("unknown resources '%v'", resource)
}
resourceId := uint64(0)
err = sb.Select("id").
From(tableName).
Where(sq.Eq{
"uid": uid,
}).
RunWith(tx).
QueryRow().
Scan(&resourceId)
if err != nil {
return err
}
_, err = sb.Delete("labels").
Where(sq.Eq{
"key": mapping.PluckKeysStr(keyValues),
"resource": resource,
"resource_id": resourceId,
}).RunWith(tx).
Exec()
if err != nil {
return err
}
_, err = c.InsertLabelsBuilder(resource, resourceId, keyValues).
RunWith(tx).
Exec()
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
source, meta, err := c.GetK8sLabelResource(namespace, resource, uid) source, meta, err := c.GetK8sLabelResource(namespace, resource, uid)
if err != nil { if err != nil {
return err return err
@@ -256,88 +209,6 @@ func (c *Client) DeleteResourceLabels(runner sq.BaseRunner, resource string) err
return err return err
} }
func (c *Client) InsertLabelsBuilder(resource string, resourceID uint64, keyValues map[string]string) sq.InsertBuilder {
sb := sb.Insert("labels").
Columns("resource", "resource_id", "key", "value")
for key, value := range keyValues {
sb = sb.Values(resource, resourceID, key, value)
}
return sb
}
// InsertLabelsRunner inserts the labels for the resource into the db using the provided runner.
// If no labels are provided, does nothing and returns nil, nil.
func (c *Client) InsertLabelsRunner(runner sq.BaseRunner, resource string, resourceID uint64, keyValues map[string]string) (sql.Result, error) {
if len(keyValues) == 0 {
return nil, nil
}
return c.InsertLabelsBuilder(resource, resourceID, keyValues).
RunWith(runner).
Exec()
}
// InsertLabels inserts the labels for the resource into the db using the client's DB.
// If no labels are provided, does nothing and returns nil, nil.
func (c *Client) InsertLabels(resource string, resourceID uint64, keyValues map[string]string) (sql.Result, error) {
return c.InsertLabelsRunner(c.DB, resource, resourceID, keyValues)
}
func (c *Client) GetDbLabels(resource string, ids ...uint64) (labels []*Label, err error) {
if len(ids) == 0 {
return make([]*Label, 0), nil
}
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
query, args, err := sb.Select("id", "created_at", "key", "value", "resource", "resource_id").
From("labels").
Where(sq.Eq{
"resource_id": ids,
"resource": resource,
}).
OrderBy("key").
ToSql()
if err != nil {
return nil, err
}
err = c.DB.Select(&labels, query, args...)
if err != nil {
return nil, err
}
return
}
// GetDBLabelsMapped returns a map where the key is the id of the resource
// and the value is the labels as a map[string]string
func (c *Client) GetDBLabelsMapped(resource string, ids ...uint64) (result map[uint64]map[string]string, err error) {
dbLabels, err := c.GetDbLabels(resource, ids...)
if err != nil {
return
}
result = make(map[uint64]map[string]string)
for _, dbLabel := range dbLabels {
_, ok := result[dbLabel.ResourceID]
if !ok {
result[dbLabel.ResourceID] = make(map[string]string)
}
result[dbLabel.ResourceID][dbLabel.Key] = dbLabel.Value
}
return
}
func (c *Client) GetK8sLabelResource(namespace, resource, uid string) (source interface{}, result *v1.ObjectMeta, err error) { func (c *Client) GetK8sLabelResource(namespace, resource, uid string) (source interface{}, result *v1.ObjectMeta, err error) {
switch resource { switch resource {
case TypeWorkflowTemplateVersion: case TypeWorkflowTemplateVersion:

View File

@@ -565,29 +565,18 @@ func (c *Client) createWorkflowExecutionDB(namespace string, workflowExecution *
} }
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) { func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) {
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
updateMap := sq.Eq{
"started_at": startedAt.UTC(),
"name": name,
"namespace": namespace,
"finished_at": time.Now().UTC(),
"phase": phase,
}
_, err = sb.Update("workflow_executions"). _, err = sb.Update("workflow_executions").
SetMap(updateMap).Where(sq.Eq{"name": name}).RunWith(tx).Exec() SetMap(sq.Eq{
if err != nil { "started_at": startedAt.UTC(),
return err "name": name,
} "namespace": namespace,
err = tx.Commit() "finished_at": time.Now().UTC(),
if err != nil { "phase": phase,
return err }).
} Where(sq.Eq{"name": name}).
RunWith(c.DB).
Exec()
return err return err
} }
@@ -609,12 +598,6 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string
return err return err
} }
tx, err := c.DB.Begin()
if err != nil {
return err
}
defer tx.Rollback()
parametersJSON, err := cronWorkflow.GetParametersFromWorkflowSpecJSON() parametersJSON, err := cronWorkflow.GetParametersFromWorkflowSpecJSON()
if err != nil { if err != nil {
return err return err
@@ -631,26 +614,16 @@ func (c *Client) CronStartWorkflowExecutionStatisticInsert(namespace, uid string
"started_at": time.Now().UTC(), "started_at": time.Now().UTC(),
"cron_workflow_id": cronWorkflow.ID, "cron_workflow_id": cronWorkflow.ID,
"parameters": string(parametersJSON), "parameters": string(parametersJSON),
"labels": cronWorkflow.Labels,
}). }).
Suffix("RETURNING id"). Suffix("RETURNING id").
RunWith(tx). RunWith(c.DB).
QueryRow(). QueryRow().
Scan(&workflowExecutionID) Scan(&workflowExecutionID)
if err != nil { if err != nil {
return err return err
} }
cronLabels, err := c.GetDBLabelsMapped(TypeCronWorkflow, cronWorkflow.ID)
if err != nil {
return err
}
labelsMapped := cronLabels[cronWorkflow.ID]
if _, err := c.InsertLabelsRunner(tx, TypeWorkflowExecution, workflowExecutionID, labelsMapped); err != nil {
return err
}
err = tx.Commit()
return err return err
} }

View File

@@ -778,7 +778,6 @@ func (c *Client) ListWorkflowTemplates(namespace string, paginator *pagination.P
// appendExtraWorkflowTemplateData adds extra information to workflow templates // appendExtraWorkflowTemplateData adds extra information to workflow templates
// * execution statistics (including cron) // * execution statistics (including cron)
// * labels
func (c *Client) appendExtraWorkflowTemplateData(namespace string, workflowTemplateVersions []*WorkflowTemplate) (err error) { func (c *Client) appendExtraWorkflowTemplateData(namespace string, workflowTemplateVersions []*WorkflowTemplate) (err error) {
err = c.GetWorkflowExecutionStatisticsForTemplates(workflowTemplateVersions...) err = c.GetWorkflowExecutionStatisticsForTemplates(workflowTemplateVersions...)
if err != nil { if err != nil {
@@ -798,19 +797,6 @@ func (c *Client) appendExtraWorkflowTemplateData(namespace string, workflowTempl
return util.NewUserError(codes.NotFound, "Unable to get Cron Workflow Statistic for Templates.") return util.NewUserError(codes.NotFound, "Unable to get Cron Workflow Statistic for Templates.")
} }
labelsMap, err := c.GetDBLabelsMapped(TypeWorkflowTemplateVersion, WorkflowTemplatesToVersionIDs(workflowTemplateVersions)...)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Unable to get Workflow Template Labels")
return err
}
for _, workflowTemplate := range workflowTemplateVersions {
workflowTemplate.Labels = labelsMap[workflowTemplate.WorkflowTemplateVersionID]
}
return return
} }

View File

@@ -342,14 +342,6 @@ func (c *Client) ListWorkspacesByTemplateID(namespace string, templateID uint64)
return nil, err return nil, err
} }
labelMap, err := c.GetDBLabelsMapped(TypeWorkspace, WorkspacesToIDs(workspaces)...)
if err != nil {
return nil, err
}
for _, workspace := range workspaces {
workspace.Labels = labelMap[workspace.ID]
}
return return
} }