Files
onepanel/pkg/workflow_template.go

1252 lines
39 KiB
Go

package v1
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/onepanelio/core/pkg/util/env"
"github.com/onepanelio/core/pkg/util/extensions"
"github.com/onepanelio/core/pkg/util/ptr"
"github.com/onepanelio/core/pkg/util/request"
pagination "github.com/onepanelio/core/pkg/util/request/pagination"
yaml3 "gopkg.in/yaml.v3"
"strconv"
"strings"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
argojson "github.com/argoproj/pkg/json"
"github.com/ghodss/yaml"
"github.com/onepanelio/core/pkg/util"
"github.com/onepanelio/core/pkg/util/label"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// WorkflowTemplateFilter represents the available ways we can filter WorkflowTemplates
type WorkflowTemplateFilter struct {
Labels []*Label
}
// GetLabels returns the labels in the filter
func (wt *WorkflowTemplateFilter) GetLabels() []*Label {
return wt.Labels
}
// replaceSysNodePoolOptions replaces a select.nodepool parameter with the nodePool options in the systemConfig
// and returns the new parameters with the change.
func (c *Client) replaceSysNodePoolOptions(parameters []Parameter) (result []Parameter, err error) {
nodePoolOptions, err := c.systemConfig.NodePoolOptionsAsParameters()
if err != nil {
return result, err
}
for i := range parameters {
param := parameters[i]
if param.Type == "select.nodepool" {
param.Options = nodePoolOptions
if param.Value != nil && *param.Value == "default" {
param.Value = ptr.String(param.Options[0].Value)
}
}
result = append(result, param)
}
return
}
// parameterOptionToNodes returns a mapping Node where the content's are the options name/value
func parameterOptionToNodes(option *ParameterOption) *yaml3.Node {
result := &yaml3.Node{
Kind: yaml3.MappingNode,
}
result.Content = append(result.Content, &yaml3.Node{
Kind: yaml3.ScalarNode,
Value: "name",
})
result.Content = append(result.Content, &yaml3.Node{
Kind: yaml3.ScalarNode,
Value: option.Name,
})
result.Content = append(result.Content, &yaml3.Node{
Kind: yaml3.ScalarNode,
Value: "value",
})
result.Content = append(result.Content, &yaml3.Node{
Kind: yaml3.ScalarNode,
Value: option.Value,
})
return result
}
// parameterOptionsToNodes returns an array of nodes representing the options
func parameterOptionsToNodes(options []*ParameterOption) []*yaml3.Node {
result := make([]*yaml3.Node, 0)
for _, option := range options {
result = append(result, parameterOptionToNodes(option))
}
return result
}
func applyWorkflowTemplateFilter(sb sq.SelectBuilder, request *request.Request) (sq.SelectBuilder, error) {
if !request.HasFilter() {
return sb, nil
}
filter, ok := request.Filter.(WorkflowTemplateFilter)
if !ok {
return sb, nil
}
sb, err := ApplyLabelSelectQuery("wt.labels", sb, &filter)
if err != nil {
return sb, err
}
return sb, nil
}
// createWorkflowTemplateVersionDB inserts a record into workflow_template_versions using the current time accurate to nanoseconds
// the data is returned in the resulting WorkflowTemplateVersion struct.
func createWorkflowTemplateVersionDB(runner sq.BaseRunner, workflowTemplateVersion *WorkflowTemplateVersion, parameters []Parameter) (err error) {
if workflowTemplateVersion == nil {
return fmt.Errorf("workflowTemplateVersion is nil")
}
if workflowTemplateVersion.WorkflowTemplate == nil {
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate is nil")
}
if workflowTemplateVersion.WorkflowTemplate.ID < 1 {
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate.ID must be > 0. %v given", workflowTemplateVersion.WorkflowTemplate.ID)
}
workflowTemplateVersion.Version = time.Now().UnixNano()
pj, err := json.Marshal(parameters)
if err != nil {
return
}
if pj == nil {
pj = []byte("[]")
}
err = sb.Insert("workflow_template_versions").
SetMap(sq.Eq{
"workflow_template_id": workflowTemplateVersion.WorkflowTemplate.ID,
"version": workflowTemplateVersion.Version,
"is_latest": true,
"manifest": workflowTemplateVersion.Manifest,
"parameters": pj,
"labels": workflowTemplateVersion.Labels,
}).
Suffix("RETURNING id").
RunWith(runner).
QueryRow().
Scan(&workflowTemplateVersion.ID)
return
}
// updateWorkflowTemplateVersionDB will update a WorkflowTemplateVersion row in the database.
func updateWorkflowTemplateVersionDB(runner sq.BaseRunner, wtv *WorkflowTemplateVersion) (err error) {
pj, err := json.Marshal(wtv.Parameters)
if err != nil {
return
}
_, err = sb.Update("workflow_template_versions").
SetMap(sq.Eq{
"manifest": wtv.Manifest,
"is_latest": wtv.IsLatest,
"parameters": string(pj),
}).
Where(sq.Eq{
"id": wtv.ID,
}).RunWith(runner).Exec()
return
}
// createLatestWorkflowTemplateVersionDB creates a new workflow template version and marks all previous versions as not latest.
func createLatestWorkflowTemplateVersionDB(runner sq.BaseRunner, workflowTemplateVersion *WorkflowTemplateVersion) (err error) {
if workflowTemplateVersion == nil {
return fmt.Errorf("workflowTemplateVersion is nil")
}
if workflowTemplateVersion.WorkflowTemplate == nil {
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate is nil")
}
if workflowTemplateVersion.WorkflowTemplate.ID < 1 {
return fmt.Errorf("workflowTemplateVersion.WorkflowTemplate.ID must be > 0. %v given", workflowTemplateVersion.WorkflowTemplate.ID)
}
_, err = sb.Update("workflow_template_versions").
Set("is_latest", false).
Where(sq.Eq{
"workflow_template_id": workflowTemplateVersion.WorkflowTemplate.ID,
}).
RunWith(runner).
Exec()
if err != nil {
return err
}
params, err := ParseParametersFromManifest([]byte(workflowTemplateVersion.Manifest))
if err != nil {
return err
}
return createWorkflowTemplateVersionDB(runner, workflowTemplateVersion, params)
}
// createWorkflowTemplate creates a WorkflowTemplate and all of the DB/Argo/K8s related resources
// The returned WorkflowTemplate has the ArgoWorkflowTemplate set to the newly created one.
func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, *WorkflowTemplateVersion, error) {
if err := workflowTemplate.GenerateUID(workflowTemplate.Name); err != nil {
return nil, nil, util.NewUserError(codes.InvalidArgument, "Template name must be 30 characters or less")
}
tx, err := c.DB.Begin()
if err != nil {
return nil, nil, err
}
defer tx.Rollback()
err = sb.Insert("workflow_templates").
SetMap(sq.Eq{
"uid": workflowTemplate.UID,
"name": workflowTemplate.Name,
"namespace": namespace,
"is_system": workflowTemplate.IsSystem,
"labels": workflowTemplate.Labels,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&workflowTemplate.ID)
if err != nil {
return nil, nil, err
}
params, err := ParseParametersFromManifest([]byte(workflowTemplate.Manifest))
if err != nil {
return nil, nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
workflowTemplateVersion := &WorkflowTemplateVersion{
WorkflowTemplate: workflowTemplate,
Manifest: workflowTemplate.Manifest,
Labels: workflowTemplate.Labels,
}
err = createWorkflowTemplateVersionDB(tx, workflowTemplateVersion, params)
if err != nil {
return nil, nil, err
}
workflowTemplate.WorkflowTemplateVersionID = workflowTemplateVersion.ID
argoWft, err := createArgoWorkflowTemplate(workflowTemplate, workflowTemplateVersion.Version)
if err != nil {
return nil, nil, err
}
argoWft.Labels[label.WorkflowTemplateVersionUid] = strconv.FormatInt(workflowTemplateVersion.Version, 10)
if workflowTemplate.Resource != nil && workflowTemplate.ResourceUID != nil {
if *workflowTemplate.Resource == TypeWorkspaceTemplate {
argoWft.Labels[label.WorkspaceTemplateVersionUid] = *workflowTemplate.ResourceUID
}
}
argoWft, err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(argoWft)
if err != nil {
return nil, nil, err
}
if err = tx.Commit(); err != nil {
if errDelete := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(argoWft.Name, &v1.DeleteOptions{}); errDelete != nil {
err = fmt.Errorf("%w; %s", err, errDelete)
}
return nil, nil, err
}
workflowTemplate.ArgoWorkflowTemplate = argoWft
workflowTemplate.Version = workflowTemplateVersion.Version
return workflowTemplate, workflowTemplateVersion, nil
}
// baseWorkflowTemplatesSelectBuilder returns a SelectBuilder selecting a WorkflowTemplate from the database for a given namespace
// no columns are selected.
func (c *Client) baseWorkflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select().
From("workflow_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
})
return sb
}
// workflowTemplatesSelectBuilder returns a SelectBuilder selecting a WorkflowTemplate from the database for a given namespace
// with all of the columns of a WorkflowTemplate
func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := c.baseWorkflowTemplatesSelectBuilder(namespace).
Columns(getWorkflowTemplateColumns("wt")...)
return sb
}
// countWorkflowTemplateSelectBuilder returns a select builder selecting the total count of WorkflowTemplates
// given the namespace
func (c *Client) countWorkflowTemplateSelectBuilder(namespace string) sq.SelectBuilder {
sb := c.baseWorkflowTemplatesSelectBuilder(namespace).
Columns("COUNT(*)")
return sb
}
// workflowTemplatesVersionSelectBuilder selects data from workflow template versions joined to a workflow template
// the versions/template are filtered by the workflow template's namespace.
func (c *Client) workflowTemplatesVersionSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select(getWorkflowTemplateVersionColumns("wtv")...).
From("workflow_template_versions wtv").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{
"wt.namespace": namespace,
})
return sb
}
// workflowTemplateVersionSelectBuilderAll
func (c *Client) workflowTemplateVersionSelectBuilderAll() sq.SelectBuilder {
sb := sb.Select(getWorkflowTemplateVersionColumns("wtv")...).
From("workflow_template_versions wtv")
return sb
}
// GetWorkflowTemplateDB returns a WorkflowTemplate from the database that is not archived, should one exist.
func (c *Client) getWorkflowTemplateDB(namespace, name string) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate = &WorkflowTemplate{}
sb := c.workflowTemplatesSelectBuilder(namespace).
Where(sq.Eq{
"wt.name": name,
"wt.is_archived": false,
})
err = c.DB.Getx(workflowTemplate, sb)
return
}
// getWorkflowTemplateVersionDB will return a WorkflowTemplateVersion given the arguments.
// version can be a number as a string, or the string "latest" to get the latest.
func (c *Client) getWorkflowTemplateVersionDB(namespace, name, version string) (workflowTemplateVersion *WorkflowTemplateVersion, err error) {
workflowTemplateVersion = &WorkflowTemplateVersion{}
whereMap := sq.Eq{
"wt.name": name,
"wt.is_archived": false,
}
if version == "latest" {
whereMap["wtv.is_latest"] = "true"
} else {
whereMap["wtv.version"] = version
}
sb := c.workflowTemplatesVersionSelectBuilder(namespace).
Where(whereMap)
err = c.DB.Getx(workflowTemplateVersion, sb)
if err != nil {
return
}
workflowTemplateVersion.Parameters = make([]Parameter, 0)
if err := json.Unmarshal(workflowTemplateVersion.ParametersBytes, &workflowTemplateVersion.Parameters); err != nil {
return nil, err
}
return
}
// GetLatestWorkflowTemplateVersionDB returns the latest WorkflowTemplateVersion
func (c *Client) getLatestWorkflowTemplateVersionDB(namespace, name string) (workflowTemplateVersion *WorkflowTemplateVersion, err error) {
return c.getWorkflowTemplateVersionDB(namespace, name, "latest")
}
func (c *Client) getWorkflowTemplateById(id uint64) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate = &WorkflowTemplate{}
query, args, err := sb.Select(getWorkflowTemplateColumns("wt", "")...).
From("workflow_templates wt").
Where(sq.Eq{"id": id}).
ToSql()
if err != nil {
return nil, err
}
err = c.DB.Get(workflowTemplate, query, args...)
return
}
// getWorkflowTemplate gets the workflowtemplate given the input data.
// it also loads the argo workflow and labels data.
// If version is <= 0, the latest workflow template is fetched.
// If not found, (nil, nil) is returned
func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate = &WorkflowTemplate{
WorkflowExecutionStatisticReport: &WorkflowExecutionStatisticReport{},
}
// A new workflow template version is created upon a change, so we use it's created_at
// as a modified_at for the workflow template.
sb := c.workflowTemplatesSelectBuilder(namespace).
Columns("wtv.manifest", "wtv.version", "wtv.id workflow_template_version_id", "wtv.created_at modified_at").
Join("workflow_template_versions wtv ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{
"wt.uid": uid,
"wt.is_archived": false,
})
if version <= 0 {
sb = sb.Where(sq.Eq{"wtv.is_latest": true})
} else {
sb = sb.Where(sq.Eq{"wtv.version": version})
}
if err = c.Getx(workflowTemplate, sb); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
versionAsString := "latest"
if version != 0 {
versionAsString = fmt.Sprintf("%v", version)
}
argoWft, err := c.getArgoWorkflowTemplate(namespace, uid, versionAsString)
if err != nil {
return nil, err
}
workflowTemplate.ArgoWorkflowTemplate = argoWft
templateVersion, err := strconv.ParseInt(argoWft.Labels[label.Version], 10, 64)
if err != nil {
return nil, err
}
workflowTemplate.Version = templateVersion
wtv, err := c.getWorkflowTemplateVersionDB(namespace, workflowTemplate.Name, versionAsString)
if err != nil {
return workflowTemplate, err
}
workflowTemplate.Parameters, err = c.replaceSysNodePoolOptions(wtv.Parameters)
if err != nil {
return nil, err
}
return workflowTemplate, nil
}
// listWorkflowTemplateVersions grabs WorkflowTemplateVersions and returns them as WorkflowTemplates.
func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
dbVersions, err := c.selectWorkflowTemplateVersionsDB(namespace, uid)
if err != nil {
return nil, err
}
for _, version := range dbVersions {
if version.ParametersBytes != nil {
_, err := version.LoadParametersFromBytes()
if err != nil {
return nil, err
}
updatedParameters, err := c.replaceSysNodePoolOptions(version.Parameters)
if err != nil {
return nil, err
}
version.Parameters = updatedParameters
}
newItem := WorkflowTemplate{
ID: version.WorkflowTemplate.ID,
CreatedAt: version.CreatedAt.UTC(),
UID: version.UID,
Name: version.WorkflowTemplate.Name,
Manifest: version.Manifest,
Version: version.Version,
IsLatest: version.IsLatest,
IsArchived: version.WorkflowTemplate.IsArchived,
Labels: version.Labels,
Parameters: version.Parameters,
}
workflowTemplateVersions = append(workflowTemplateVersions, &newItem)
}
return
}
func (c *Client) selectWorkflowTemplatesQuery(namespace string, request *request.Request) (sb sq.SelectBuilder, err error) {
sb = c.workflowTemplatesSelectBuilder(namespace).
Column("COUNT(wtv.*) versions, MAX(wtv.id) workflow_template_version_id").
Join("workflow_template_versions wtv ON wtv.workflow_template_id = wt.id").
GroupBy("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived").
OrderBy("wt.created_at DESC")
sb, err = applyWorkflowTemplateFilter(sb, request)
if err != nil {
return
}
sb = *request.ApplyPaginationToSelect(&sb)
return
}
// selectWorkflowTemplatesDB loads non-archived and non-system workflow templates from the database for the input namespace
// it also selects the total number of versions and latest version id
func (c *Client) selectWorkflowTemplatesDB(namespace string, request *request.Request) (workflowTemplates []*WorkflowTemplate, err error) {
workflowTemplates = make([]*WorkflowTemplate, 0)
sb, err := c.selectWorkflowTemplatesQuery(namespace, request)
if err != nil {
return nil, err
}
sb = sb.Where(sq.Eq{
"wt.is_archived": false,
"wt.is_system": false,
})
err = c.DB.Selectx(&workflowTemplates, sb)
return
}
// selectAllWorkflowTemplatesDB loads all workflow templates from the database for the input namespace
// it also selects the total number of versions and latest version id
func (c *Client) selectAllWorkflowTemplatesDB(namespace string, request *request.Request) (workflowTemplates []*WorkflowTemplate, err error) {
workflowTemplates = make([]*WorkflowTemplate, 0)
sb, err := c.selectWorkflowTemplatesQuery(namespace, request)
if err != nil {
return nil, err
}
err = c.DB.Selectx(&workflowTemplates, sb)
return
}
// CountWorkflowTemplates counts the total number of workflow templates for the given namespace
// archived, and system templates are ignored.
func (c *Client) CountWorkflowTemplates(namespace string, request *request.Request) (count int, err error) {
sb := sb.Select("COUNT(*)").
From("workflow_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
"wt.is_archived": false,
"wt.is_system": false,
})
sb, err = applyWorkflowTemplateFilter(sb, request)
if err != nil {
return 0, err
}
err = sb.RunWith(c.DB).
QueryRow().
Scan(&count)
return
}
func (c *Client) validateWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (err error) {
// validate workflow template
finalBytes, err := workflowTemplate.WrapSpec()
if err != nil {
return
}
err = c.ValidateWorkflowExecution(namespace, finalBytes)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Workflow could not be validated.")
}
return
}
func (c *Client) CreateWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
// validate workflow template
if err := c.validateWorkflowTemplate(namespace, workflowTemplate); err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
newWorkflowTemplate, _, err := c.createWorkflowTemplate(namespace, workflowTemplate)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not create workflow template.")
return nil, util.NewUserErrorWrap(err, "Workflow template")
}
return newWorkflowTemplate, nil
}
// CreateWorkflowTemplateVersion creates a new workflow template version including argo resources.
// It marks any older workflow template versions as not latest
//
// Pre-condition: a Workflow Template version already exists
// Post-condition: the input workflow template will have it's fields updated so it matches the new version data.
func (c *Client) CreateWorkflowTemplateVersion(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
if workflowTemplate.UID == "" {
return nil, fmt.Errorf("uid required for CreateWorkflowTemplateVersion")
}
// validate workflow template
if err := c.validateWorkflowTemplate(namespace, workflowTemplate); err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
wftSb := c.workflowTemplatesSelectBuilder(namespace).
Where(sq.Eq{
"wt.uid": workflowTemplate.UID,
"wt.is_archived": false,
})
workflowTemplateDB := &WorkflowTemplate{}
if err = c.DB.Getx(workflowTemplateDB, wftSb); err != nil {
return nil, err
}
workflowTemplateVersion := &WorkflowTemplateVersion{
WorkflowTemplate: workflowTemplateDB,
Manifest: workflowTemplate.Manifest,
Labels: workflowTemplate.Labels,
}
err = createLatestWorkflowTemplateVersionDB(tx, workflowTemplateVersion)
if err != nil {
return nil, err
}
workflowTemplate.WorkflowTemplateVersionID = workflowTemplateVersion.ID
updatedTemplate, err := createArgoWorkflowTemplate(workflowTemplate, workflowTemplateVersion.Version)
if err != nil {
return nil, err
}
updatedTemplate.TypeMeta = v1.TypeMeta{}
updatedTemplate.ObjectMeta.ResourceVersion = ""
updatedTemplate.ObjectMeta.SetSelfLink("")
updatedTemplate.Labels[label.WorkflowTemplateVersionUid] = strconv.FormatInt(workflowTemplateVersion.Version, 10)
parametersMap, err := workflowTemplate.GetParametersKeyString()
if err != nil {
return nil, err
}
if updatedTemplate.Annotations == nil {
updatedTemplate.Annotations = make(map[string]string)
}
for key, value := range parametersMap {
updatedTemplate.Annotations[key] = value
}
latest, err := c.getArgoWorkflowTemplate(namespace, workflowTemplate.UID, "latest")
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not get latest argo workflow template")
return nil, err
}
delete(latest.Labels, label.VersionLatest)
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(updatedTemplate); err != nil {
return nil, err
}
latest, err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(latest)
if err != nil {
return nil, err
}
// Make sure the associated workflow template has the latest labels
_, err = sb.Update("workflow_templates").
Set("labels", workflowTemplate.Labels).
Where(sq.Eq{
"id": workflowTemplateDB.ID,
}).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
workflowTemplate.ID = workflowTemplateDB.ID
workflowTemplate.Version = workflowTemplateVersion.Version
return workflowTemplate, nil
}
// UpdateWorkflowTemplateVersion will update a given WorkflowTemplateVersion in the database.
// The intent is to change specific database values for a WorkflowTemplateVersion.
// - wtv.ID has to be set and greater than 0
func (c *Client) UpdateWorkflowTemplateVersion(wtv *WorkflowTemplateVersion) error {
if wtv.ID <= 0 {
return fmt.Errorf("id required for UpdateWorkflowTemplateVersionDB")
}
return updateWorkflowTemplateVersionDB(c.DB, wtv)
}
// GetWorkflowTemplate returns a WorkflowTemplate with data loaded from various sources
// If version is 0, it returns the latest version data.
//
// Data loaded includes
// * Database Information
// * ArgoWorkflowTemplate
// * Labels
func (c *Client) GetWorkflowTemplate(namespace, uid string, version int64) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate, err = c.getWorkflowTemplate(namespace, uid, version)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Get Workflow Template failed.")
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
}
if workflowTemplate == nil {
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
return
}
// GetLatestWorkflowTemplate returns a workflow template with the latest version data.
func (c *Client) GetLatestWorkflowTemplate(namespace, uid string) (workflowTemplate *WorkflowTemplate, err error) {
return c.GetWorkflowTemplate(namespace, uid, 0)
}
// CountWorkflowTemplatesByName returns the number of WorkflowTemplates given the arguments.
// If archived is nil, it is not considered.
func (c *Client) CountWorkflowTemplatesByName(namespace, name string, archived *bool) (count uint64, err error) {
sb := c.countWorkflowTemplateSelectBuilder(namespace).
Where(sq.Eq{"wt.name": name})
if archived != nil {
sb = sb.Where(sq.Eq{"wt.is_archived": *archived})
}
err = sb.RunWith(c.DB).
QueryRow().
Scan(&count)
return
}
// CountWorkflowTemplateVersions returns the number of versions a non-archived WorkflowTemplate has.
func (c *Client) CountWorkflowTemplateVersions(namespace, uid string) (count uint64, err error) {
err = sb.Select("COUNT(*)").
From("workflow_templates wt").
Join("workflow_template_versions wtv ON wtv.workflow_template_id = wt.id").
Where(sq.Eq{
"wt.namespace": namespace,
"wt.uid": uid,
"wt.is_archived": false,
}).
RunWith(c.DB).
QueryRow().
Scan(&count)
return
}
// ListWorkflowTemplateVersions returns all the WorkflowTemplates for a given namespace and uid.
func (c *Client) ListWorkflowTemplateVersions(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
workflowTemplateVersions, err = c.listWorkflowTemplateVersions(namespace, uid)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Workflow template versions not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
}
return
}
// ListWorkflowTemplateVersionsModels returns all the WorkflowTemplateVersions from the database.
// This function is a work-around for ListWorkflowTemplateVersions. Once that function is refactored,
// this function should no longer be necessary and removed.
// See: https://github.com/onepanelio/core/issues/436
func (c *Client) ListWorkflowTemplateVersionsModels(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplateVersion, err error) {
workflowTemplateVersions, err = c.selectWorkflowTemplateVersionsDB(namespace, uid)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Workflow template versions not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
}
return
}
// ListWorkflowTemplateVersionsAll returns all WorkflowTemplateVersions with no filtering.
func (c *Client) ListWorkflowTemplateVersionsAll(paginator *pagination.PaginationRequest) (workflowTemplateVersions []*WorkflowTemplateVersion, err error) {
workflowTemplateVersions, err = c.selectAllWorkflowTemplateVersionsDB(paginator)
if err != nil {
log.WithFields(log.Fields{
"Error": err.Error(),
}).Error("Workflow template versions not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
}
return
}
// ListAllWorkflowTemplates lists all of the workflow templates, including archived and system specific
func (c *Client) ListAllWorkflowTemplates(namespace string, request *request.Request) (workflowTemplateVersions []*WorkflowTemplate, err error) {
workflowTemplateVersions, err = c.selectAllWorkflowTemplatesDB(namespace, request)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Workflow templates not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow templates not found.")
}
err = c.appendExtraWorkflowTemplateData(namespace, workflowTemplateVersions)
return
}
// ListWorkflowTemplates returns all WorkflowTemplates where the results
// are filtered by is_archived and is_System is false.
func (c *Client) ListWorkflowTemplates(namespace string, request *request.Request) (workflowTemplateVersions []*WorkflowTemplate, err error) {
workflowTemplateVersions, err = c.selectWorkflowTemplatesDB(namespace, request)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Workflow templates not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow templates not found.")
}
err = c.appendExtraWorkflowTemplateData(namespace, workflowTemplateVersions)
return
}
// appendExtraWorkflowTemplateData adds extra information to workflow templates
// * execution statistics (including cron)
func (c *Client) appendExtraWorkflowTemplateData(namespace string, workflowTemplateVersions []*WorkflowTemplate) (err error) {
err = c.GetWorkflowExecutionStatisticsForTemplates(workflowTemplateVersions...)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Unable to get Workflow Execution Statistic for Templates.")
return util.NewUserError(codes.NotFound, "Unable to get Workflow Execution Statistic for Templates.")
}
err = c.GetCronWorkflowStatisticsForTemplates(workflowTemplateVersions...)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Unable to get Cron Workflow Statistic for Templates.")
return util.NewUserError(codes.NotFound, "Unable to get Cron Workflow Statistic for Templates.")
}
return
}
func (c *Client) getLatestWorkflowTemplate(namespace, uid string) (*WorkflowTemplate, error) {
return c.getWorkflowTemplate(namespace, uid, 0) //version=0 means latest
}
func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool, err error) {
workflowTemplate, err := c.getLatestWorkflowTemplate(namespace, uid)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Workflow Template failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
if workflowTemplate == nil {
return false, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
wftVersions, err := c.listWorkflowTemplateVersions(namespace, uid)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Workflow Template Versions failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
//cron workflows
cronWorkflows := []*CronWorkflow{}
cwfSB := c.cronWorkflowSelectBuilder(namespace, uid).
OrderBy("cw.created_at DESC")
if err := c.DB.Selectx(&cronWorkflows, cwfSB); err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Cron Workflows failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
for _, cwf := range cronWorkflows {
err = c.ArchiveCronWorkflow(namespace, cwf.Name)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Archive Cron Workflow failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
}
//workflow executions
paginator := pagination.NewRequest(0, 100)
for _, wftVer := range wftVersions {
wfTempVer := strconv.FormatInt(wftVer.Version, 10)
workflowTemplateName := uid + "-v" + wfTempVer
for {
req := &request.Request{Pagination: &paginator}
wfs, err := c.ListWorkflowExecutions(namespace, uid, wfTempVer, true, req)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Workflow Executions failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
if len(wfs) == 0 {
break
}
for _, wf := range wfs {
err = c.ArchiveWorkflowExecution(namespace, wf.UID)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Archive Workflow Execution Failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
}
}
err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(workflowTemplateName, nil)
if err != nil {
if strings.Contains(err.Error(), "not found") {
return true, nil
}
return false, err
}
}
_, err = sb.Update("workflow_templates").
Set("is_archived", true).
Where(sq.Eq{
"uid": uid,
"namespace": namespace,
}).RunWith(c.DB).
Exec()
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Archive Workflow Template DB failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
return true, nil
}
// createArgoWorkflowTemplate creates an argo workflow template from the workflowTemplate struct
// the argo template stores the version information.
func createArgoWorkflowTemplate(workflowTemplate *WorkflowTemplate, version int64) (*v1alpha1.WorkflowTemplate, error) {
var argoWft *v1alpha1.WorkflowTemplate
var jsonOpts []argojson.JSONOpt
jsonOpts = append(jsonOpts, argojson.DisallowUnknownFields)
finalBytes, err := workflowTemplate.WrapSpec()
if err != nil {
return nil, err
}
err = yaml.Unmarshal(finalBytes, &argoWft)
if err != nil {
return nil, err
}
if err := workflowTemplate.GenerateUID(workflowTemplate.Name); err != nil {
return nil, err
}
argoWft.Name = fmt.Sprintf("%v-v%v", workflowTemplate.UID, version)
labels := map[string]string{
label.WorkflowTemplate: workflowTemplate.UID,
label.WorkflowTemplateUid: workflowTemplate.UID,
label.Version: fmt.Sprintf("%v", version),
label.VersionLatest: "true",
}
label.MergeLabelsPrefix(labels, workflowTemplate.Labels, label.TagPrefix)
argoWft.Labels = labels
return argoWft, nil
}
// getArgoWorkflowTemplate will load the argo workflow template.
// version "latest" will get the latest version, otherwise a number (as a string) will be used.
func (c *Client) getArgoWorkflowTemplate(namespace, workflowTemplateUID, version string) (*v1alpha1.WorkflowTemplate, error) {
labelSelect := fmt.Sprintf("%v=%v", label.WorkflowTemplateUid, workflowTemplateUID)
if version == "latest" {
labelSelect += "," + label.VersionLatest + "=true"
} else {
labelSelect += fmt.Sprintf(",%v=%v", label.Version, version)
}
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
LabelSelector: labelSelect,
})
if err != nil {
return nil, err
}
templates := workflowTemplates.Items
if templates.Len() == 0 {
return nil, errors.New("not found")
}
if templates.Len() > 1 {
return nil, errors.New("not unique result")
}
return &templates[0], nil
}
func (c *Client) listArgoWorkflowTemplates(namespace, workflowTemplateUid string) (*[]v1alpha1.WorkflowTemplate, error) {
labelSelect := fmt.Sprintf("%v=%v", label.WorkflowTemplateUid, workflowTemplateUid)
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
LabelSelector: labelSelect,
})
if err != nil {
return nil, err
}
templates := []v1alpha1.WorkflowTemplate(workflowTemplates.Items)
return &templates, nil
}
// listDBWorkflowTemplateVersions gets all of the workflow template versions for a specified workflow template uid
// archived ones are ignored. Returned in created_at desc order.
func (c *Client) selectWorkflowTemplateVersionsDB(namespace, workflowTemplateUID string) (versions []*WorkflowTemplateVersion, err error) {
versions = make([]*WorkflowTemplateVersion, 0)
sb := c.workflowTemplatesVersionSelectBuilder(namespace).
Columns(getWorkflowTemplateColumns("wt", "workflow_template")...).
Where(sq.Eq{
"wt.uid": workflowTemplateUID,
"wt.is_archived": false,
}).
OrderBy("wtv.created_at DESC")
err = c.DB.Selectx(&versions, sb)
return
}
// selectAllWorkflowTemplateVersionsDB returns all WorkflowTemplateVersions from the database without filter.
func (c *Client) selectAllWorkflowTemplateVersionsDB(paginator *pagination.PaginationRequest) (versions []*WorkflowTemplateVersion, err error) {
versions = make([]*WorkflowTemplateVersion, 0)
sb := c.workflowTemplateVersionSelectBuilderAll()
sb = *paginator.ApplyToSelect(&sb)
err = c.DB.Selectx(&versions, sb)
return
}
// prefix is the label prefix.
// e.g. prefix/my-label-key: my-label-value
// if version is 0, latest is used.
func (c *Client) GetWorkflowTemplateLabels(namespace, name, prefix string, version int64) (labels map[string]string, err error) {
versionAsString := "latest"
if version != 0 {
versionAsString = fmt.Sprintf("%v", version)
}
wf, err := c.getArgoWorkflowTemplate(namespace, name, versionAsString)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Workflow Template not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow Template not found.")
}
labels = label.FilterByPrefix(prefix, wf.Labels)
labels = label.RemovePrefix(prefix, labels)
return
}
// GenerateWorkflowTemplateManifest replaces any special parameters with runtime values
func (c *Client) GenerateWorkflowTemplateManifest(manifest string) (string, error) {
root := &yaml3.Node{}
if err := yaml3.Unmarshal([]byte(manifest), root); err != nil {
return "", err
}
parametersIndex := extensions.CreateYamlIndex("arguments", "parameters")
if extensions.HasNode(root, parametersIndex) {
resultNode, err := extensions.GetNode(root, parametersIndex)
if err != nil {
return "", err
}
nodePoolOptions, err := c.systemConfig.NodePoolOptionsAsParameters()
if err != nil {
return "", err
}
legalNodePoolValues := []string{env.Get("ONEPANEL_TEMPLATE_NODEPOOL_DEFAULT_VALUE", "default")}
for _, opt := range nodePoolOptions {
legalNodePoolValues = append(legalNodePoolValues, opt.Value)
}
if len(nodePoolOptions) == 0 {
return "", fmt.Errorf("no node pool options")
}
for _, child := range resultNode.Content {
hasKey, err := extensions.HasKeyValue(child, "type", "select.nodepool")
if err != nil {
return "", err
}
if hasKey {
isValid, err := extensions.HasKeyValue(child, "value", legalNodePoolValues...)
if err != nil {
return "", err
}
if !isValid {
return "", util.NewUserError(codes.InvalidArgument, fmt.Sprintf("select.nodepool has an invalid value. Valid values are: %v", strings.Join(legalNodePoolValues, ", ")))
}
if err := extensions.SetKeyValue(child, "value", nodePoolOptions[0].Value); err != nil {
return "", err
}
optionsIndex := extensions.CreateYamlIndex("options")
if extensions.HasNode(child, optionsIndex) {
return "", util.NewUserError(codes.InvalidArgument, "select.nodepool must not have any options")
}
child.Content = append(child.Content, &yaml3.Node{
Kind: yaml3.ScalarNode,
Value: "options",
}, &yaml3.Node{
Kind: yaml3.SequenceNode,
})
optionsNode, err := extensions.GetNode(child, optionsIndex)
if err != nil {
return "", err
}
optionsNode.Kind = yaml3.SequenceNode
optionsNode.Content = parameterOptionsToNodes(nodePoolOptions)
}
}
}
finalManifest, err := yaml3.Marshal(root)
if err != nil {
return "", err
}
return string(finalManifest), err
}
// ListWorkflowTemplatesField loads all of the distinct field values for workflow templates
func (c *Client) ListWorkflowTemplatesField(namespace, field string, isSystem bool) (value []string, err error) {
if field != "name" {
return nil, fmt.Errorf("unsupported field '%v'", field)
}
columnName := fmt.Sprintf("wt.%v", field)
sb := sb.Select(columnName).
Distinct().
From("workflow_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
"wt.is_system": isSystem,
}).OrderBy(columnName)
err = c.DB.Selectx(&value, sb)
return
}