mirror of
https://github.com/onepanelio/onepanel.git
synced 2025-09-26 17:51:13 +08:00
1284 lines
40 KiB
Go
1284 lines
40 KiB
Go
package v1
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/onepanelio/core/pkg/util/env"
|
|
"github.com/onepanelio/core/pkg/util/extensions"
|
|
"github.com/onepanelio/core/pkg/util/ptr"
|
|
"github.com/onepanelio/core/pkg/util/request"
|
|
pagination "github.com/onepanelio/core/pkg/util/request/pagination"
|
|
yaml3 "gopkg.in/yaml.v3"
|
|
|
|
sq "github.com/Masterminds/squirrel"
|
|
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
|
|
argojson "github.com/argoproj/pkg/json"
|
|
"github.com/ghodss/yaml"
|
|
"github.com/onepanelio/core/pkg/util"
|
|
"github.com/onepanelio/core/pkg/util/label"
|
|
log "github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc/codes"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
// WorkflowTemplateFilter represents the available ways we can filter WorkflowTemplates
|
|
type WorkflowTemplateFilter struct {
|
|
Labels []*Label
|
|
}
|
|
|
|
// GetLabels returns the labels in the filter
|
|
func (wt *WorkflowTemplateFilter) GetLabels() []*Label {
|
|
return wt.Labels
|
|
}
|
|
|
|
// replaceSysNodePoolOptions replaces a select.nodepool parameter with the nodePool options in the systemConfig
|
|
// and returns the new parameters with the change.
|
|
func (c *Client) replaceSysNodePoolOptions(parameters []Parameter) (result []Parameter, err error) {
|
|
nodePoolOptions, err := c.systemConfig.NodePoolOptionsAsParameters()
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
for i := range parameters {
|
|
param := parameters[i]
|
|
if param.Type == "select.nodepool" {
|
|
param.Options = nodePoolOptions
|
|
|
|
if param.Value != nil && *param.Value == "default" {
|
|
param.Value = ptr.String(param.Options[0].Value)
|
|
}
|
|
}
|
|
|
|
result = append(result, param)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// parameterOptionToNodes returns a mapping Node where the content's are the options name/value
|
|
func parameterOptionToNodes(option *ParameterOption) *yaml3.Node {
|
|
result := &yaml3.Node{
|
|
Kind: yaml3.MappingNode,
|
|
}
|
|
|
|
result.Content = append(result.Content, &yaml3.Node{
|
|
Kind: yaml3.ScalarNode,
|
|
Value: "name",
|
|
})
|
|
|
|
result.Content = append(result.Content, &yaml3.Node{
|
|
Kind: yaml3.ScalarNode,
|
|
Value: option.Name,
|
|
})
|
|
|
|
result.Content = append(result.Content, &yaml3.Node{
|
|
Kind: yaml3.ScalarNode,
|
|
Value: "value",
|
|
})
|
|
|
|
result.Content = append(result.Content, &yaml3.Node{
|
|
Kind: yaml3.ScalarNode,
|
|
Value: option.Value,
|
|
})
|
|
|
|
return result
|
|
}
|
|
|
|
// parameterOptionsToNodes returns an array of nodes representing the options
|
|
func parameterOptionsToNodes(options []*ParameterOption) []*yaml3.Node {
|
|
result := make([]*yaml3.Node, 0)
|
|
|
|
for _, option := range options {
|
|
result = append(result, parameterOptionToNodes(option))
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func applyWorkflowTemplateFilter(sb sq.SelectBuilder, request *request.Request) (sq.SelectBuilder, error) {
|
|
if !request.HasFilter() {
|
|
return sb, nil
|
|
}
|
|
|
|
filter, ok := request.Filter.(WorkflowTemplateFilter)
|
|
if !ok {
|
|
return sb, nil
|
|
}
|
|
|
|
sb, err := ApplyLabelSelectQuery("wt.labels", sb, &filter)
|
|
if err != nil {
|
|
return sb, err
|
|
}
|
|
|
|
return sb, nil
|
|
}
|
|
|
|
// createWorkflowTemplateVersionDB inserts a record into workflow_template_versions using the current time accurate to nanoseconds
|
|
// the data is returned in the resulting WorkflowTemplateVersion struct.
|
|
func createWorkflowTemplateVersionDB(runner sq.BaseRunner, workflowTemplateVersion *WorkflowTemplateVersion, parameters []Parameter) (err error) {
|
|
if workflowTemplateVersion == nil {
|
|
return fmt.Errorf("workflowTemplateVersion is nil")
|
|
}
|
|
if workflowTemplateVersion.WorkflowTemplate == nil {
|
|
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate is nil")
|
|
}
|
|
if workflowTemplateVersion.WorkflowTemplate.ID < 1 {
|
|
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate.ID must be > 0. %v given", workflowTemplateVersion.WorkflowTemplate.ID)
|
|
}
|
|
|
|
workflowTemplateVersion.Version = time.Now().UnixNano()
|
|
|
|
pj, err := json.Marshal(parameters)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if pj == nil {
|
|
pj = []byte("[]")
|
|
}
|
|
|
|
err = sb.Insert("workflow_template_versions").
|
|
SetMap(sq.Eq{
|
|
"workflow_template_id": workflowTemplateVersion.WorkflowTemplate.ID,
|
|
"version": workflowTemplateVersion.Version,
|
|
"is_latest": true,
|
|
"manifest": workflowTemplateVersion.Manifest,
|
|
"parameters": pj,
|
|
"labels": workflowTemplateVersion.Labels,
|
|
"description": workflowTemplateVersion.Description,
|
|
}).
|
|
Suffix("RETURNING id").
|
|
RunWith(runner).
|
|
QueryRow().
|
|
Scan(&workflowTemplateVersion.ID)
|
|
|
|
return
|
|
}
|
|
|
|
// updateWorkflowTemplateVersionDB will update a WorkflowTemplateVersion row in the database.
|
|
func updateWorkflowTemplateVersionDB(runner sq.BaseRunner, wtv *WorkflowTemplateVersion) (err error) {
|
|
pj, err := json.Marshal(wtv.Parameters)
|
|
if err != nil {
|
|
return
|
|
}
|
|
_, err = sb.Update("workflow_template_versions").
|
|
SetMap(sq.Eq{
|
|
"manifest": wtv.Manifest,
|
|
"is_latest": wtv.IsLatest,
|
|
"parameters": string(pj),
|
|
}).
|
|
Where(sq.Eq{
|
|
"id": wtv.ID,
|
|
}).RunWith(runner).Exec()
|
|
return
|
|
}
|
|
|
|
// createLatestWorkflowTemplateVersionDB creates a new workflow template version and marks all previous versions as not latest.
|
|
func createLatestWorkflowTemplateVersionDB(runner sq.BaseRunner, workflowTemplateVersion *WorkflowTemplateVersion) (err error) {
|
|
if workflowTemplateVersion == nil {
|
|
return fmt.Errorf("workflowTemplateVersion is nil")
|
|
}
|
|
if workflowTemplateVersion.WorkflowTemplate == nil {
|
|
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate is nil")
|
|
}
|
|
if workflowTemplateVersion.WorkflowTemplate.ID < 1 {
|
|
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate.ID must be > 0. %v given", workflowTemplateVersion.WorkflowTemplate.ID)
|
|
}
|
|
|
|
_, err = sb.Update("workflow_template_versions").
|
|
Set("is_latest", false).
|
|
Where(sq.Eq{
|
|
"workflow_template_id": workflowTemplateVersion.WorkflowTemplate.ID,
|
|
}).
|
|
RunWith(runner).
|
|
Exec()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
params, err := ParseParametersFromManifest([]byte(workflowTemplateVersion.Manifest))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return createWorkflowTemplateVersionDB(runner, workflowTemplateVersion, params)
|
|
}
|
|
|
|
// createWorkflowTemplate creates a WorkflowTemplate and all of the DB/Argo/K8s related resources
|
|
// The returned WorkflowTemplate has the ArgoWorkflowTemplate set to the newly created one.
|
|
func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, *WorkflowTemplateVersion, error) {
|
|
if err := workflowTemplate.GenerateUID(workflowTemplate.Name); err != nil {
|
|
return nil, nil, util.NewUserError(codes.InvalidArgument, "Template name must be 30 characters or less")
|
|
}
|
|
|
|
tx, err := c.DB.Begin()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
err = sb.Insert("workflow_templates").
|
|
SetMap(sq.Eq{
|
|
"uid": workflowTemplate.UID,
|
|
"name": workflowTemplate.Name,
|
|
"namespace": namespace,
|
|
"is_system": workflowTemplate.IsSystem,
|
|
"labels": workflowTemplate.Labels,
|
|
}).
|
|
Suffix("RETURNING id").
|
|
RunWith(tx).
|
|
QueryRow().
|
|
Scan(&workflowTemplate.ID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
params, err := ParseParametersFromManifest([]byte(workflowTemplate.Manifest))
|
|
if err != nil {
|
|
return nil, nil, util.NewUserError(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
workflowTemplateVersion := &WorkflowTemplateVersion{
|
|
WorkflowTemplate: workflowTemplate,
|
|
Manifest: workflowTemplate.Manifest,
|
|
Labels: workflowTemplate.Labels,
|
|
Description: workflowTemplate.Description,
|
|
}
|
|
err = createWorkflowTemplateVersionDB(tx, workflowTemplateVersion, params)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
workflowTemplate.WorkflowTemplateVersionID = workflowTemplateVersion.ID
|
|
|
|
argoWft, err := createArgoWorkflowTemplate(workflowTemplate, workflowTemplateVersion.Version)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
argoWft.Labels[label.WorkflowTemplateVersionUid] = strconv.FormatInt(workflowTemplateVersion.Version, 10)
|
|
|
|
if workflowTemplate.Resource != nil && workflowTemplate.ResourceUID != nil {
|
|
if *workflowTemplate.Resource == TypeWorkspaceTemplate {
|
|
argoWft.Labels[label.WorkspaceTemplateVersionUid] = *workflowTemplate.ResourceUID
|
|
}
|
|
}
|
|
|
|
argoWft, err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(argoWft)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
if errDelete := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(argoWft.Name, &v1.DeleteOptions{}); errDelete != nil {
|
|
err = fmt.Errorf("%w; %s", err, errDelete)
|
|
}
|
|
return nil, nil, err
|
|
}
|
|
|
|
workflowTemplate.ArgoWorkflowTemplate = argoWft
|
|
workflowTemplate.Version = workflowTemplateVersion.Version
|
|
|
|
return workflowTemplate, workflowTemplateVersion, nil
|
|
}
|
|
|
|
// baseWorkflowTemplatesSelectBuilder returns a SelectBuilder selecting a WorkflowTemplate from the database for a given namespace
|
|
// no columns are selected.
|
|
func (c *Client) baseWorkflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
|
|
sb := sb.Select().
|
|
From("workflow_templates wt").
|
|
Where(sq.Eq{
|
|
"wt.namespace": namespace,
|
|
})
|
|
|
|
return sb
|
|
}
|
|
|
|
// workflowTemplatesSelectBuilder returns a SelectBuilder selecting a WorkflowTemplate from the database for a given namespace
|
|
// with all of the columns of a WorkflowTemplate
|
|
func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
|
|
sb := c.baseWorkflowTemplatesSelectBuilder(namespace).
|
|
Columns(getWorkflowTemplateColumns("wt")...)
|
|
|
|
return sb
|
|
}
|
|
|
|
// countWorkflowTemplateSelectBuilder returns a select builder selecting the total count of WorkflowTemplates
|
|
// given the namespace
|
|
func (c *Client) countWorkflowTemplateSelectBuilder(namespace string) sq.SelectBuilder {
|
|
sb := c.baseWorkflowTemplatesSelectBuilder(namespace).
|
|
Columns("COUNT(*)")
|
|
|
|
return sb
|
|
}
|
|
|
|
// workflowTemplatesVersionSelectBuilder selects data from workflow template versions joined to a workflow template
|
|
// the versions/template are filtered by the workflow template's namespace.
|
|
func (c *Client) workflowTemplatesVersionSelectBuilder(namespace string) sq.SelectBuilder {
|
|
sb := sb.Select(getWorkflowTemplateVersionColumns("wtv")...).
|
|
From("workflow_template_versions wtv").
|
|
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
|
|
Where(sq.Eq{
|
|
"wt.namespace": namespace,
|
|
})
|
|
|
|
return sb
|
|
}
|
|
|
|
// workflowTemplateVersionSelectBuilderAll
|
|
func (c *Client) workflowTemplateVersionSelectBuilderAll() sq.SelectBuilder {
|
|
sb := sb.Select(getWorkflowTemplateVersionColumns("wtv")...).
|
|
From("workflow_template_versions wtv")
|
|
return sb
|
|
}
|
|
|
|
// GetWorkflowTemplateDB returns a WorkflowTemplate from the database that is not archived, should one exist.
|
|
func (c *Client) getWorkflowTemplateDB(namespace, name string) (workflowTemplate *WorkflowTemplate, err error) {
|
|
workflowTemplate = &WorkflowTemplate{}
|
|
|
|
sb := c.workflowTemplatesSelectBuilder(namespace).
|
|
Where(sq.Eq{
|
|
"wt.name": name,
|
|
"wt.is_archived": false,
|
|
})
|
|
|
|
err = c.DB.Getx(workflowTemplate, sb)
|
|
|
|
return
|
|
}
|
|
|
|
// getWorkflowTemplateVersionDB will return a WorkflowTemplateVersion given the arguments.
|
|
// version can be a number as a string, or the string "latest" to get the latest.
|
|
func (c *Client) getWorkflowTemplateVersionDB(namespace, name, version string) (workflowTemplateVersion *WorkflowTemplateVersion, err error) {
|
|
workflowTemplateVersion = &WorkflowTemplateVersion{}
|
|
|
|
whereMap := sq.Eq{
|
|
"wt.name": name,
|
|
"wt.is_archived": false,
|
|
}
|
|
|
|
if version == "latest" {
|
|
whereMap["wtv.is_latest"] = "true"
|
|
} else {
|
|
whereMap["wtv.version"] = version
|
|
}
|
|
|
|
sb := c.workflowTemplatesVersionSelectBuilder(namespace).
|
|
Where(whereMap)
|
|
|
|
err = c.DB.Getx(workflowTemplateVersion, sb)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
workflowTemplateVersion.Parameters = make([]Parameter, 0)
|
|
if err := json.Unmarshal(workflowTemplateVersion.ParametersBytes, &workflowTemplateVersion.Parameters); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetLatestWorkflowTemplateVersionDB returns the latest WorkflowTemplateVersion
|
|
func (c *Client) getLatestWorkflowTemplateVersionDB(namespace, name string) (workflowTemplateVersion *WorkflowTemplateVersion, err error) {
|
|
return c.getWorkflowTemplateVersionDB(namespace, name, "latest")
|
|
}
|
|
|
|
func (c *Client) getWorkflowTemplateById(id uint64) (workflowTemplate *WorkflowTemplate, err error) {
|
|
workflowTemplate = &WorkflowTemplate{}
|
|
|
|
query, args, err := sb.Select(getWorkflowTemplateColumns("wt", "")...).
|
|
From("workflow_templates wt").
|
|
Where(sq.Eq{"id": id}).
|
|
ToSql()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = c.DB.Get(workflowTemplate, query, args...)
|
|
|
|
return
|
|
}
|
|
|
|
func (c *Client) getWorkflowTemplateRaw(namespace, uid string) (workflowTemplate *WorkflowTemplate, err error) {
|
|
workflowTemplate = &WorkflowTemplate{}
|
|
|
|
query := sb.Select(getWorkflowTemplateColumns("wt", "")...).
|
|
From("workflow_templates wt").
|
|
Where(sq.Eq{
|
|
"namespace": namespace,
|
|
"uid": uid,
|
|
})
|
|
|
|
err = c.DB.Getx(workflowTemplate, query)
|
|
|
|
return
|
|
}
|
|
|
|
// getWorkflowTemplate gets the workflowtemplate given the input data.
|
|
// it also loads the argo workflow and labels data.
|
|
// If version is <= 0, the latest workflow template is fetched.
|
|
// If not found, (nil, nil) is returned
|
|
func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (workflowTemplate *WorkflowTemplate, err error) {
|
|
workflowTemplate = &WorkflowTemplate{
|
|
WorkflowExecutionStatisticReport: &WorkflowExecutionStatisticReport{},
|
|
}
|
|
|
|
// A new workflow template version is created upon a change, so we use it's created_at
|
|
// as a modified_at for the workflow template.
|
|
sb := c.workflowTemplatesSelectBuilder(namespace).
|
|
Columns("wtv.manifest", "wtv.version", "wtv.id workflow_template_version_id", "wtv.created_at modified_at").
|
|
Join("workflow_template_versions wtv ON wt.id = wtv.workflow_template_id").
|
|
Where(sq.Eq{
|
|
"wt.uid": uid,
|
|
"wt.is_archived": false,
|
|
})
|
|
|
|
if version <= 0 {
|
|
sb = sb.Where(sq.Eq{"wtv.is_latest": true})
|
|
} else {
|
|
sb = sb.Where(sq.Eq{"wtv.version": version})
|
|
}
|
|
|
|
if err = c.Getx(workflowTemplate, sb); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
versionAsString := "latest"
|
|
if version != 0 {
|
|
versionAsString = fmt.Sprintf("%v", version)
|
|
}
|
|
|
|
argoWft, err := c.getArgoWorkflowTemplate(namespace, uid, versionAsString)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
workflowTemplate.ArgoWorkflowTemplate = argoWft
|
|
|
|
templateVersion, err := strconv.ParseInt(argoWft.Labels[label.Version], 10, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workflowTemplate.Version = templateVersion
|
|
|
|
wtv, err := c.getWorkflowTemplateVersionDB(namespace, workflowTemplate.Name, versionAsString)
|
|
if err != nil {
|
|
return workflowTemplate, err
|
|
}
|
|
|
|
workflowTemplate.Parameters, err = c.replaceSysNodePoolOptions(wtv.Parameters)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return workflowTemplate, nil
|
|
}
|
|
|
|
// listWorkflowTemplateVersions grabs WorkflowTemplateVersions and returns them as WorkflowTemplates.
|
|
func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
|
|
dbVersions, err := c.selectWorkflowTemplateVersionsDB(namespace, uid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, version := range dbVersions {
|
|
if version.ParametersBytes != nil {
|
|
_, err := version.LoadParametersFromBytes()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
updatedParameters, err := c.replaceSysNodePoolOptions(version.Parameters)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
version.Parameters = updatedParameters
|
|
}
|
|
|
|
newItem := WorkflowTemplate{
|
|
ID: version.WorkflowTemplate.ID,
|
|
CreatedAt: version.CreatedAt.UTC(),
|
|
UID: version.UID,
|
|
Name: version.WorkflowTemplate.Name,
|
|
Manifest: version.Manifest,
|
|
Version: version.Version,
|
|
IsLatest: version.IsLatest,
|
|
IsArchived: version.WorkflowTemplate.IsArchived,
|
|
Labels: version.Labels,
|
|
Parameters: version.Parameters,
|
|
Description: version.Description,
|
|
}
|
|
|
|
workflowTemplateVersions = append(workflowTemplateVersions, &newItem)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *Client) selectWorkflowTemplatesQuery(namespace string, request *request.Request) (sb sq.SelectBuilder, err error) {
|
|
sb = c.workflowTemplatesSelectBuilder(namespace).
|
|
Column("COUNT(wtv.*) versions, MAX(wtv.id) workflow_template_version_id").
|
|
Join("workflow_template_versions wtv ON wtv.workflow_template_id = wt.id").
|
|
GroupBy("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived").
|
|
OrderBy("wt.created_at DESC")
|
|
|
|
sb, err = applyWorkflowTemplateFilter(sb, request)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
sb = *request.ApplyPaginationToSelect(&sb)
|
|
|
|
return
|
|
}
|
|
|
|
// selectWorkflowTemplatesDB loads non-archived and non-system workflow templates from the database for the input namespace
|
|
// it also selects the total number of versions and latest version id
|
|
func (c *Client) selectWorkflowTemplatesDB(namespace string, request *request.Request) (workflowTemplates []*WorkflowTemplate, err error) {
|
|
workflowTemplates = make([]*WorkflowTemplate, 0)
|
|
|
|
sb, err := c.selectWorkflowTemplatesQuery(namespace, request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sb = sb.Where(sq.Eq{
|
|
"wt.is_archived": false,
|
|
"wt.is_system": false,
|
|
})
|
|
|
|
err = c.DB.Selectx(&workflowTemplates, sb)
|
|
|
|
return
|
|
}
|
|
|
|
// selectAllWorkflowTemplatesDB loads all workflow templates from the database for the input namespace
|
|
// it also selects the total number of versions and latest version id
|
|
func (c *Client) selectAllWorkflowTemplatesDB(namespace string, request *request.Request) (workflowTemplates []*WorkflowTemplate, err error) {
|
|
workflowTemplates = make([]*WorkflowTemplate, 0)
|
|
|
|
sb, err := c.selectWorkflowTemplatesQuery(namespace, request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = c.DB.Selectx(&workflowTemplates, sb)
|
|
|
|
return
|
|
}
|
|
|
|
// CountWorkflowTemplates counts the total number of workflow templates for the given namespace
|
|
// archived, and system templates are ignored.
|
|
func (c *Client) CountWorkflowTemplates(namespace string, request *request.Request) (count int, err error) {
|
|
sb := sb.Select("COUNT(*)").
|
|
From("workflow_templates wt").
|
|
Where(sq.Eq{
|
|
"wt.namespace": namespace,
|
|
"wt.is_archived": false,
|
|
"wt.is_system": false,
|
|
})
|
|
|
|
sb, err = applyWorkflowTemplateFilter(sb, request)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
err = sb.RunWith(c.DB).
|
|
QueryRow().
|
|
Scan(&count)
|
|
|
|
return
|
|
}
|
|
|
|
func (c *Client) validateWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (err error) {
|
|
// validate workflow template
|
|
finalBytes, err := workflowTemplate.WrapSpec()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = c.ValidateWorkflowExecution(namespace, finalBytes)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"WorkflowTemplate": workflowTemplate,
|
|
"Error": err.Error(),
|
|
}).Error("Workflow could not be validated.")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *Client) CreateWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
|
|
// validate workflow template
|
|
if err := c.validateWorkflowTemplate(namespace, workflowTemplate); err != nil {
|
|
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
newWorkflowTemplate, _, err := c.createWorkflowTemplate(namespace, workflowTemplate)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"WorkflowTemplate": workflowTemplate,
|
|
"Error": err.Error(),
|
|
}).Error("Could not create workflow template.")
|
|
return nil, util.NewUserErrorWrap(err, "Workflow template")
|
|
}
|
|
|
|
return newWorkflowTemplate, nil
|
|
}
|
|
|
|
// CreateWorkflowTemplateVersion creates a new workflow template version including argo resources.
|
|
// It marks any older workflow template versions as not latest
|
|
//
|
|
// Pre-condition: a Workflow Template version already exists
|
|
// Post-condition: the input workflow template will have it's fields updated so it matches the new version data.
|
|
func (c *Client) CreateWorkflowTemplateVersion(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
|
|
if workflowTemplate.UID == "" {
|
|
return nil, fmt.Errorf("uid required for CreateWorkflowTemplateVersion")
|
|
}
|
|
|
|
// validate workflow template
|
|
if err := c.validateWorkflowTemplate(namespace, workflowTemplate); err != nil {
|
|
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
tx, err := c.DB.Begin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
wftSb := c.workflowTemplatesSelectBuilder(namespace).
|
|
Where(sq.Eq{
|
|
"wt.uid": workflowTemplate.UID,
|
|
"wt.is_archived": false,
|
|
})
|
|
workflowTemplateDB := &WorkflowTemplate{}
|
|
if err = c.DB.Getx(workflowTemplateDB, wftSb); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workflowTemplateVersion := &WorkflowTemplateVersion{
|
|
WorkflowTemplate: workflowTemplateDB,
|
|
Manifest: workflowTemplate.Manifest,
|
|
Labels: workflowTemplate.Labels,
|
|
Description: workflowTemplate.Description,
|
|
}
|
|
|
|
err = createLatestWorkflowTemplateVersionDB(tx, workflowTemplateVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
workflowTemplate.WorkflowTemplateVersionID = workflowTemplateVersion.ID
|
|
|
|
updatedTemplate, err := createArgoWorkflowTemplate(workflowTemplate, workflowTemplateVersion.Version)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
updatedTemplate.TypeMeta = v1.TypeMeta{}
|
|
updatedTemplate.ObjectMeta.ResourceVersion = ""
|
|
updatedTemplate.ObjectMeta.SetSelfLink("")
|
|
updatedTemplate.Labels[label.WorkflowTemplateVersionUid] = strconv.FormatInt(workflowTemplateVersion.Version, 10)
|
|
|
|
parametersMap, err := workflowTemplate.GetParametersKeyString()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if updatedTemplate.Annotations == nil {
|
|
updatedTemplate.Annotations = make(map[string]string)
|
|
}
|
|
for key, value := range parametersMap {
|
|
updatedTemplate.Annotations[key] = value
|
|
}
|
|
|
|
latest, err := c.getArgoWorkflowTemplate(namespace, workflowTemplate.UID, "latest")
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"WorkflowTemplate": workflowTemplate,
|
|
"Error": err.Error(),
|
|
}).Error("Could not get latest argo workflow template")
|
|
|
|
return nil, err
|
|
}
|
|
delete(latest.Labels, label.VersionLatest)
|
|
|
|
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(updatedTemplate); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
latest, err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(latest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Make sure the associated workflow template has the latest labels
|
|
_, err = sb.Update("workflow_templates").
|
|
Set("labels", workflowTemplate.Labels).
|
|
Where(sq.Eq{
|
|
"id": workflowTemplateDB.ID,
|
|
}).
|
|
RunWith(tx).
|
|
Exec()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
workflowTemplate.ID = workflowTemplateDB.ID
|
|
workflowTemplate.Version = workflowTemplateVersion.Version
|
|
|
|
return workflowTemplate, nil
|
|
}
|
|
|
|
// UpdateWorkflowTemplateVersion will update a given WorkflowTemplateVersion in the database.
|
|
// The intent is to change specific database values for a WorkflowTemplateVersion.
|
|
// - wtv.ID has to be set and greater than 0
|
|
func (c *Client) UpdateWorkflowTemplateVersion(wtv *WorkflowTemplateVersion) error {
|
|
if wtv.ID <= 0 {
|
|
return fmt.Errorf("id required for UpdateWorkflowTemplateVersionDB")
|
|
}
|
|
return updateWorkflowTemplateVersionDB(c.DB, wtv)
|
|
}
|
|
|
|
// GetWorkflowTemplateRaw returns the WorkflowTemplate without any version information
|
|
func (c *Client) GetWorkflowTemplateRaw(namespace, uid string) (workflowTemplate *WorkflowTemplate, err error) {
|
|
workflowTemplate, err = c.getWorkflowTemplateRaw(namespace, uid)
|
|
if err != nil && strings.Contains(err.Error(), "sql: no rows in result set") {
|
|
return nil, nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetWorkflowTemplate returns a WorkflowTemplate with data loaded from various sources
|
|
// If version is 0, it returns the latest version data.
|
|
//
|
|
// Data loaded includes
|
|
// * Database Information
|
|
// * ArgoWorkflowTemplate
|
|
// * Labels
|
|
func (c *Client) GetWorkflowTemplate(namespace, uid string, version int64) (workflowTemplate *WorkflowTemplate, err error) {
|
|
workflowTemplate, err = c.getWorkflowTemplate(namespace, uid, version)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"WorkflowTemplate": workflowTemplate,
|
|
"Error": err.Error(),
|
|
}).Error("Get Workflow Template failed.")
|
|
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
|
|
}
|
|
if workflowTemplate == nil {
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// GetLatestWorkflowTemplate returns a workflow template with the latest version data.
|
|
func (c *Client) GetLatestWorkflowTemplate(namespace, uid string) (workflowTemplate *WorkflowTemplate, err error) {
|
|
return c.GetWorkflowTemplate(namespace, uid, 0)
|
|
}
|
|
|
|
// CountWorkflowTemplatesByName returns the number of WorkflowTemplates given the arguments.
|
|
// If archived is nil, it is not considered.
|
|
func (c *Client) CountWorkflowTemplatesByName(namespace, name string, archived *bool) (count uint64, err error) {
|
|
sb := c.countWorkflowTemplateSelectBuilder(namespace).
|
|
Where(sq.Eq{"wt.name": name})
|
|
|
|
if archived != nil {
|
|
sb = sb.Where(sq.Eq{"wt.is_archived": *archived})
|
|
}
|
|
|
|
err = sb.RunWith(c.DB).
|
|
QueryRow().
|
|
Scan(&count)
|
|
|
|
return
|
|
}
|
|
|
|
// CountWorkflowTemplateVersions returns the number of versions a non-archived WorkflowTemplate has.
|
|
func (c *Client) CountWorkflowTemplateVersions(namespace, uid string) (count uint64, err error) {
|
|
err = sb.Select("COUNT(*)").
|
|
From("workflow_templates wt").
|
|
Join("workflow_template_versions wtv ON wtv.workflow_template_id = wt.id").
|
|
Where(sq.Eq{
|
|
"wt.namespace": namespace,
|
|
"wt.uid": uid,
|
|
"wt.is_archived": false,
|
|
}).
|
|
RunWith(c.DB).
|
|
QueryRow().
|
|
Scan(&count)
|
|
|
|
return
|
|
}
|
|
|
|
// ListWorkflowTemplateVersions returns all the WorkflowTemplates for a given namespace and uid.
|
|
func (c *Client) ListWorkflowTemplateVersions(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
|
|
workflowTemplateVersions, err = c.listWorkflowTemplateVersions(namespace, uid)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Workflow template versions not found.")
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// ListWorkflowTemplateVersionsModels returns all the WorkflowTemplateVersions from the database.
|
|
// This function is a work-around for ListWorkflowTemplateVersions. Once that function is refactored,
|
|
// this function should no longer be necessary and removed.
|
|
// See: https://github.com/onepanelio/core/issues/436
|
|
func (c *Client) ListWorkflowTemplateVersionsModels(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplateVersion, err error) {
|
|
workflowTemplateVersions, err = c.selectWorkflowTemplateVersionsDB(namespace, uid)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Workflow template versions not found.")
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
|
|
}
|
|
return
|
|
}
|
|
|
|
// ListWorkflowTemplateVersionsAll returns all WorkflowTemplateVersions with no filtering.
|
|
func (c *Client) ListWorkflowTemplateVersionsAll(paginator *pagination.PaginationRequest) (workflowTemplateVersions []*WorkflowTemplateVersion, err error) {
|
|
workflowTemplateVersions, err = c.selectAllWorkflowTemplateVersionsDB(paginator)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Error": err.Error(),
|
|
}).Error("Workflow template versions not found.")
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
|
|
}
|
|
return
|
|
}
|
|
|
|
// ListAllWorkflowTemplates lists all of the workflow templates, including archived and system specific
|
|
func (c *Client) ListAllWorkflowTemplates(namespace string, request *request.Request) (workflowTemplateVersions []*WorkflowTemplate, err error) {
|
|
workflowTemplateVersions, err = c.selectAllWorkflowTemplatesDB(namespace, request)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"Error": err.Error(),
|
|
}).Error("Workflow templates not found.")
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow templates not found.")
|
|
}
|
|
|
|
err = c.appendExtraWorkflowTemplateData(namespace, workflowTemplateVersions)
|
|
|
|
return
|
|
}
|
|
|
|
// ListWorkflowTemplates returns all WorkflowTemplates where the results
|
|
// are filtered by is_archived and is_System is false.
|
|
func (c *Client) ListWorkflowTemplates(namespace string, request *request.Request) (workflowTemplateVersions []*WorkflowTemplate, err error) {
|
|
workflowTemplateVersions, err = c.selectWorkflowTemplatesDB(namespace, request)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"Error": err.Error(),
|
|
}).Error("Workflow templates not found.")
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow templates not found.")
|
|
}
|
|
|
|
err = c.appendExtraWorkflowTemplateData(namespace, workflowTemplateVersions)
|
|
|
|
return
|
|
}
|
|
|
|
// appendExtraWorkflowTemplateData adds extra information to workflow templates
|
|
// * execution statistics (including cron)
|
|
func (c *Client) appendExtraWorkflowTemplateData(namespace string, workflowTemplateVersions []*WorkflowTemplate) (err error) {
|
|
err = c.GetWorkflowExecutionStatisticsForTemplates(workflowTemplateVersions...)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"Error": err.Error(),
|
|
}).Error("Unable to get Workflow Execution Statistic for Templates.")
|
|
return util.NewUserError(codes.NotFound, "Unable to get Workflow Execution Statistic for Templates.")
|
|
}
|
|
|
|
err = c.GetCronWorkflowStatisticsForTemplates(workflowTemplateVersions...)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"Error": err.Error(),
|
|
}).Error("Unable to get Cron Workflow Statistic for Templates.")
|
|
return util.NewUserError(codes.NotFound, "Unable to get Cron Workflow Statistic for Templates.")
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *Client) getLatestWorkflowTemplate(namespace, uid string) (*WorkflowTemplate, error) {
|
|
return c.getWorkflowTemplate(namespace, uid, 0) //version=0 means latest
|
|
}
|
|
|
|
// ArchiveWorkflowTemplate will mark the workflow template identified by the (namespace, uid) as archived
|
|
// and will archive or delete resources where appropriate.
|
|
func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool, err error) {
|
|
workflowTemplate, err := c.getLatestWorkflowTemplate(namespace, uid)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Get Workflow Template failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
if workflowTemplate == nil {
|
|
return false, util.NewUserError(codes.NotFound, "Workflow template not found.")
|
|
}
|
|
|
|
wftVersions, err := c.listWorkflowTemplateVersions(namespace, uid)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Get Workflow Template Versions failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
|
|
//cron workflows
|
|
cronWorkflows := []*CronWorkflow{}
|
|
cwfSB := c.cronWorkflowSelectBuilder(namespace, uid).
|
|
OrderBy("cw.created_at DESC")
|
|
|
|
if err := c.DB.Selectx(&cronWorkflows, cwfSB); err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Get Cron Workflows failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
for _, cwf := range cronWorkflows {
|
|
err = c.ArchiveCronWorkflow(namespace, cwf.Name)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Archive Cron Workflow failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
}
|
|
|
|
//workflow executions
|
|
paginator := pagination.NewRequest(0, 100)
|
|
|
|
for _, wftVer := range wftVersions {
|
|
wfTempVer := strconv.FormatInt(wftVer.Version, 10)
|
|
workflowTemplateName := uid + "-v" + wfTempVer
|
|
|
|
for {
|
|
req := &request.Request{Pagination: &paginator}
|
|
wfs, err := c.ListWorkflowExecutions(namespace, uid, wfTempVer, true, req)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Get Workflow Executions failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
if len(wfs) == 0 {
|
|
break
|
|
}
|
|
for _, wf := range wfs {
|
|
err = c.ArchiveWorkflowExecution(namespace, wf.UID)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Archive Workflow Execution Failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
}
|
|
}
|
|
|
|
err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(workflowTemplateName, nil)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "not found") {
|
|
return true, nil
|
|
}
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
_, err = sb.Update("workflow_templates").
|
|
Set("is_archived", true).
|
|
Where(sq.Eq{
|
|
"uid": uid,
|
|
"namespace": namespace,
|
|
}).RunWith(c.DB).
|
|
Exec()
|
|
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"UID": uid,
|
|
"Error": err.Error(),
|
|
}).Error("Archive Workflow Template DB failed.")
|
|
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// createArgoWorkflowTemplate creates an argo workflow template from the workflowTemplate struct
|
|
// the argo template stores the version information.
|
|
func createArgoWorkflowTemplate(workflowTemplate *WorkflowTemplate, version int64) (*v1alpha1.WorkflowTemplate, error) {
|
|
var argoWft *v1alpha1.WorkflowTemplate
|
|
var jsonOpts []argojson.JSONOpt
|
|
jsonOpts = append(jsonOpts, argojson.DisallowUnknownFields)
|
|
|
|
finalBytes, err := workflowTemplate.WrapSpec()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = yaml.Unmarshal(finalBytes, &argoWft)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := workflowTemplate.GenerateUID(workflowTemplate.Name); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
argoWft.Name = fmt.Sprintf("%v-v%v", workflowTemplate.UID, version)
|
|
|
|
labels := map[string]string{
|
|
label.WorkflowTemplate: workflowTemplate.UID,
|
|
label.WorkflowTemplateUid: workflowTemplate.UID,
|
|
label.Version: fmt.Sprintf("%v", version),
|
|
label.VersionLatest: "true",
|
|
}
|
|
|
|
label.MergeLabelsPrefix(labels, workflowTemplate.Labels, label.TagPrefix)
|
|
argoWft.Labels = labels
|
|
|
|
return argoWft, nil
|
|
}
|
|
|
|
// getArgoWorkflowTemplate will load the argo workflow template.
|
|
// version "latest" will get the latest version, otherwise a number (as a string) will be used.
|
|
func (c *Client) getArgoWorkflowTemplate(namespace, workflowTemplateUID, version string) (*v1alpha1.WorkflowTemplate, error) {
|
|
labelSelect := fmt.Sprintf("%v=%v", label.WorkflowTemplateUid, workflowTemplateUID)
|
|
if version == "latest" {
|
|
labelSelect += "," + label.VersionLatest + "=true"
|
|
} else {
|
|
labelSelect += fmt.Sprintf(",%v=%v", label.Version, version)
|
|
}
|
|
|
|
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
|
|
LabelSelector: labelSelect,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
templates := workflowTemplates.Items
|
|
if templates.Len() == 0 {
|
|
return nil, errors.New("not found")
|
|
}
|
|
|
|
if templates.Len() > 1 {
|
|
return nil, errors.New("not unique result")
|
|
}
|
|
|
|
return &templates[0], nil
|
|
}
|
|
|
|
func (c *Client) listArgoWorkflowTemplates(namespace, workflowTemplateUid string) (*[]v1alpha1.WorkflowTemplate, error) {
|
|
labelSelect := fmt.Sprintf("%v=%v", label.WorkflowTemplateUid, workflowTemplateUid)
|
|
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
|
|
LabelSelector: labelSelect,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
templates := []v1alpha1.WorkflowTemplate(workflowTemplates.Items)
|
|
|
|
return &templates, nil
|
|
}
|
|
|
|
// listDBWorkflowTemplateVersions gets all of the workflow template versions for a specified workflow template uid
|
|
// archived ones are ignored. Returned in created_at desc order.
|
|
func (c *Client) selectWorkflowTemplateVersionsDB(namespace, workflowTemplateUID string) (versions []*WorkflowTemplateVersion, err error) {
|
|
versions = make([]*WorkflowTemplateVersion, 0)
|
|
|
|
sb := c.workflowTemplatesVersionSelectBuilder(namespace).
|
|
Columns(getWorkflowTemplateColumns("wt", "workflow_template")...).
|
|
Where(sq.Eq{
|
|
"wt.uid": workflowTemplateUID,
|
|
"wt.is_archived": false,
|
|
}).
|
|
OrderBy("wtv.created_at DESC")
|
|
|
|
err = c.DB.Selectx(&versions, sb)
|
|
|
|
return
|
|
}
|
|
|
|
// selectAllWorkflowTemplateVersionsDB returns all WorkflowTemplateVersions from the database without filter.
|
|
func (c *Client) selectAllWorkflowTemplateVersionsDB(paginator *pagination.PaginationRequest) (versions []*WorkflowTemplateVersion, err error) {
|
|
versions = make([]*WorkflowTemplateVersion, 0)
|
|
|
|
sb := c.workflowTemplateVersionSelectBuilderAll()
|
|
sb = *paginator.ApplyToSelect(&sb)
|
|
err = c.DB.Selectx(&versions, sb)
|
|
|
|
return
|
|
}
|
|
|
|
// prefix is the label prefix.
|
|
// e.g. prefix/my-label-key: my-label-value
|
|
// if version is 0, latest is used.
|
|
func (c *Client) GetWorkflowTemplateLabels(namespace, name, prefix string, version int64) (labels map[string]string, err error) {
|
|
versionAsString := "latest"
|
|
if version != 0 {
|
|
versionAsString = fmt.Sprintf("%v", version)
|
|
}
|
|
|
|
wf, err := c.getArgoWorkflowTemplate(namespace, name, versionAsString)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"Namespace": namespace,
|
|
"Name": name,
|
|
"Error": err.Error(),
|
|
}).Error("Workflow Template not found.")
|
|
return nil, util.NewUserError(codes.NotFound, "Workflow Template not found.")
|
|
}
|
|
|
|
labels = label.FilterByPrefix(prefix, wf.Labels)
|
|
labels = label.RemovePrefix(prefix, labels)
|
|
|
|
return
|
|
}
|
|
|
|
// GenerateWorkflowTemplateManifest replaces any special parameters with runtime values
|
|
func (c *Client) GenerateWorkflowTemplateManifest(manifest string) (string, error) {
|
|
root := &yaml3.Node{}
|
|
if err := yaml3.Unmarshal([]byte(manifest), root); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
parametersIndex := extensions.CreateYamlIndex("arguments", "parameters")
|
|
|
|
if extensions.HasNode(root, parametersIndex) {
|
|
resultNode, err := extensions.GetNode(root, parametersIndex)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
nodePoolOptions, err := c.systemConfig.NodePoolOptionsAsParameters()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
legalNodePoolValues := []string{env.Get("ONEPANEL_TEMPLATE_NODEPOOL_DEFAULT_VALUE", "default")}
|
|
for _, opt := range nodePoolOptions {
|
|
legalNodePoolValues = append(legalNodePoolValues, opt.Value)
|
|
}
|
|
|
|
if len(nodePoolOptions) == 0 {
|
|
return "", fmt.Errorf("no node pool options")
|
|
}
|
|
|
|
for _, child := range resultNode.Content {
|
|
hasKey, err := extensions.HasKeyValue(child, "type", "select.nodepool")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if hasKey {
|
|
isValid, err := extensions.HasKeyValue(child, "value", legalNodePoolValues...)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !isValid {
|
|
return "", util.NewUserError(codes.InvalidArgument, fmt.Sprintf("select.nodepool has an invalid value. Valid values are: %v", strings.Join(legalNodePoolValues, ", ")))
|
|
}
|
|
|
|
if err := extensions.SetKeyValue(child, "value", nodePoolOptions[0].Value); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
optionsIndex := extensions.CreateYamlIndex("options")
|
|
|
|
if extensions.HasNode(child, optionsIndex) {
|
|
return "", util.NewUserError(codes.InvalidArgument, "select.nodepool must not have any options")
|
|
}
|
|
|
|
child.Content = append(child.Content, &yaml3.Node{
|
|
Kind: yaml3.ScalarNode,
|
|
Value: "options",
|
|
}, &yaml3.Node{
|
|
Kind: yaml3.SequenceNode,
|
|
})
|
|
|
|
optionsNode, err := extensions.GetNode(child, optionsIndex)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
optionsNode.Kind = yaml3.SequenceNode
|
|
optionsNode.Content = parameterOptionsToNodes(nodePoolOptions)
|
|
}
|
|
}
|
|
}
|
|
|
|
finalManifest, err := yaml3.Marshal(root)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return string(finalManifest), err
|
|
}
|
|
|
|
// ListWorkflowTemplatesField loads all of the distinct field values for workflow templates
|
|
func (c *Client) ListWorkflowTemplatesField(namespace, field string, isSystem bool) (value []string, err error) {
|
|
if field != "name" {
|
|
return nil, fmt.Errorf("unsupported field '%v'", field)
|
|
}
|
|
|
|
columnName := fmt.Sprintf("wt.%v", field)
|
|
|
|
sb := sb.Select(columnName).
|
|
Distinct().
|
|
From("workflow_templates wt").
|
|
Where(sq.Eq{
|
|
"wt.namespace": namespace,
|
|
"wt.is_system": isSystem,
|
|
}).OrderBy(columnName)
|
|
|
|
err = c.DB.Selectx(&value, sb)
|
|
|
|
return
|
|
}
|