* updated method names to fit go conventions of uppercase. Db -> DB
* updated some method calls to use updated column select functions to remove extraneous empty string
* updated methods to use new label function that does nothing if no labels are passed in, this simplifies the code.
* updated some methods to use new Selectx function, reducing code in caller.
This commit is contained in:
Andrey Melnikov
2020-06-18 17:07:32 -07:00
committed by rushtehrani
parent 1e3a5d4faf
commit d209423100
15 changed files with 837 additions and 713 deletions

View File

@@ -373,7 +373,7 @@ func (c *Client) ListCronWorkflows(namespace, workflowTemplateUID string, pagina
if err := c.DB.Select(&cronWorkflows, query, args...); err != nil { if err := c.DB.Select(&cronWorkflows, query, args...); err != nil {
return nil, err return nil, err
} }
labelsMap, err := c.GetDbLabelsMapped(TypeCronWorkflow, CronWorkflowsToIds(cronWorkflows)...) labelsMap, err := c.GetDBLabelsMapped(TypeCronWorkflow, CronWorkflowsToIDs(cronWorkflows)...)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"Namespace": namespace, "Namespace": namespace,
@@ -526,7 +526,7 @@ func (c *Client) TerminateCronWorkflow(namespace, uid string) (err error) {
//workflow executions //workflow executions
var workflows []*WorkflowExecution var workflows []*WorkflowExecution
query, args, err := sb.Select(). query, args, err := sb.Select().
Columns(getWorkflowExecutionColumns("we", "")...). Columns(getWorkflowExecutionColumns("we")...).
From("workflow_executions we"). From("workflow_executions we").
Where(sq.Eq{ Where(sq.Eq{
"cron_workflow_id": cronWorkflow.ID, "cron_workflow_id": cronWorkflow.ID,
@@ -662,8 +662,8 @@ func (c *Client) GetCronWorkflowStatisticsForTemplates(workflowTemplates ...*Wor
return return
} }
func (c *Client) selectCronWorkflowWithWorkflowTemplateVersion(namespace, uid string, extraColumns ...string) (*CronWorkflow, error) { func (c *Client) selectCronWorkflowWithWorkflowTemplateVersion(namespace, uid string) (*CronWorkflow, error) {
query, args, err := sb.Select(getCronWorkflowColumns(extraColumns...)...). query, args, err := sb.Select(getCronWorkflowColumns("cw")...).
From("cron_workflows cw"). From("cron_workflows cw").
Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id"). Join("workflow_template_versions wtv ON wtv.id = cw.workflow_template_version_id").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id"). Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").

125
pkg/cron_workflow_types.go Normal file
View File

@@ -0,0 +1,125 @@
package v1
import (
"encoding/json"
"github.com/onepanelio/core/pkg/util/mapping"
"github.com/onepanelio/core/sqlutil"
"gopkg.in/yaml.v2"
"time"
)
type CronWorkflow struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
ModifiedAt *time.Time `db:"modified_at"`
UID string
Name string
GenerateName string
WorkflowExecution *WorkflowExecution
Labels map[string]string
Version int64
WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"`
Manifest string
Namespace string `db:"namespace"`
}
func (cw *CronWorkflow) GetParametersFromWorkflowSpec() ([]Parameter, error) {
var parameters []Parameter
mappedData := make(map[string]interface{})
if err := yaml.Unmarshal([]byte(cw.Manifest), mappedData); err != nil {
return nil, err
}
workflowSpec, ok := mappedData["workflowSpec"]
if !ok {
return parameters, nil
}
workflowSpecMap := workflowSpec.(map[interface{}]interface{})
arguments, ok := workflowSpecMap["arguments"]
if !ok {
return parameters, nil
}
argumentsMap := arguments.(map[interface{}]interface{})
parametersRaw, ok := argumentsMap["parameters"]
if !ok {
return parameters, nil
}
parametersArray, ok := parametersRaw.([]interface{})
for _, parameter := range parametersArray {
paramMap, ok := parameter.(map[interface{}]interface{})
if !ok {
continue
}
workflowParameter := ParameterFromMap(paramMap)
parameters = append(parameters, *workflowParameter)
}
return parameters, nil
}
func (cw *CronWorkflow) GetParametersFromWorkflowSpecJson() ([]byte, error) {
parameters, err := cw.GetParametersFromWorkflowSpec()
if err != nil {
return nil, err
}
parametersJson, err := json.Marshal(parameters)
if err != nil {
return nil, err
}
return parametersJson, nil
}
func (cw *CronWorkflow) AddToManifestSpec(key, manifest string) error {
currentManifestMapping, err := mapping.NewFromYamlString(cw.Manifest)
if err != nil {
return err
}
additionalManifest, err := mapping.NewFromYamlString(manifest)
if err != nil {
return err
}
currentManifestMapping[key] = additionalManifest
updatedManifest, err := currentManifestMapping.ToYamlBytes()
if err != nil {
return err
}
cw.Manifest = string(updatedManifest)
return nil
}
// getCronWorkflowColumns returns all of the columns for cronWorkflow modified by alias, destination.
// see formatColumnSelect
func getCronWorkflowColumns(aliasAndDestination ...string) []string {
columns := []string{"cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id", "cw.manifest", "cw.namespace"}
return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
}
// CronWorkflowsToIDs returns an array of ids from the input CronWorkflow with no duplicates.
func CronWorkflowsToIDs(resources []*CronWorkflow) (ids []uint64) {
mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates
for _, resource := range resources {
mappedIds[resource.ID] = true
}
for id := range mappedIds {
ids = append(ids, id)
}
return
}

68
pkg/file_types.go Normal file
View File

@@ -0,0 +1,68 @@
package v1
import (
"strings"
"time"
)
type File struct {
Path string
Name string
Size int64
Extension string
ContentType string
LastModified time.Time
Directory bool
}
// FilePathToParentPath given a path, returns the parent path, assuming a '/' delimiter
// Result does not have a trailing slash.
// -> a/b/c/d would return a/b/c
// -> a/b/c/d/ would return a/b/c
// If path is empty string, it is returned.
// If path is '/' (root) it is returned as is.
// If there is no '/', '/' is returned.
func FilePathToParentPath(path string) string {
separator := "/"
if path == "" || path == separator {
return path
}
if strings.HasSuffix(path, "/") {
path = path[0 : len(path)-1]
}
lastIndexOfForwardSlash := strings.LastIndex(path, separator)
if lastIndexOfForwardSlash <= 0 {
return separator
}
return path[0:lastIndexOfForwardSlash]
}
func FilePathToExtension(path string) string {
dotIndex := strings.LastIndex(path, ".")
if dotIndex == -1 {
return ""
}
if dotIndex == (len(path) - 1) {
return ""
}
return path[dotIndex+1:]
}
func FilePathToName(path string) string {
if strings.HasSuffix(path, "/") {
path = path[:len(path)-1]
}
lastSlashIndex := strings.LastIndex(path, "/")
if lastSlashIndex < 0 {
return path
}
return path[lastSlashIndex+1:]
}

22
pkg/label_types.go Normal file
View File

@@ -0,0 +1,22 @@
package v1
import "time"
type Label struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
Key string
Value string
Resource string
ResourceID uint64 `db:"resource_id"`
}
func LabelsToMapping(labels ...*Label) map[string]string {
result := make(map[string]string)
for _, label := range labels {
result[label.Key] = label.Value
}
return result
}

View File

@@ -272,17 +272,24 @@ func (c *Client) InsertLabelsBuilder(resource string, resourceID uint64, keyValu
return sb return sb
} }
// Inserts the labels for the resource. If no labels are provided, does nothing and returns nil, nil. // InsertLabelsRunning inserts the labels for the resource into the db using the provided runner.
func (c *Client) InsertLabels(resource string, resourceID uint64, keyValues map[string]string) (sql.Result, error) { // If no labels are provided, does nothing and returns nil, nil.
func (c *Client) InsertLabelsRunner(runner sq.BaseRunner, resource string, resourceID uint64, keyValues map[string]string) (sql.Result, error) {
if len(keyValues) == 0 { if len(keyValues) == 0 {
return nil, nil return nil, nil
} }
return c.InsertLabelsBuilder(resource, resourceID, keyValues). return c.InsertLabelsBuilder(resource, resourceID, keyValues).
RunWith(c.DB). RunWith(runner).
Exec() Exec()
} }
// InsertLabels inserts the labels for the resource into the db using the client's DB.
// If no labels are provided, does nothing and returns nil, nil.
func (c *Client) InsertLabels(resource string, resourceID uint64, keyValues map[string]string) (sql.Result, error) {
return c.InsertLabelsRunner(c.DB, resource, resourceID, keyValues)
}
func (c *Client) GetDbLabels(resource string, ids ...uint64) (labels []*Label, err error) { func (c *Client) GetDbLabels(resource string, ids ...uint64) (labels []*Label, err error) {
if len(ids) == 0 { if len(ids) == 0 {
return make([]*Label, 0), nil return make([]*Label, 0), nil
@@ -316,10 +323,9 @@ func (c *Client) GetDbLabels(resource string, ids ...uint64) (labels []*Label, e
return return
} }
// TODO rename Db to be DB per go conventions // GetDBLabelsMapped returns a map where the key is the id of the resource
// Returns a map where the key is the id of the resource
// and the value is the labels as a map[string]string // and the value is the labels as a map[string]string
func (c *Client) GetDbLabelsMapped(resource string, ids ...uint64) (result map[uint64]map[string]string, err error) { func (c *Client) GetDBLabelsMapped(resource string, ids ...uint64) (result map[uint64]map[string]string, err error) {
dbLabels, err := c.GetDbLabels(resource, ids...) dbLabels, err := c.GetDbLabels(resource, ids...)
if err != nil { if err != nil {
return return
@@ -327,11 +333,11 @@ func (c *Client) GetDbLabelsMapped(resource string, ids ...uint64) (result map[u
result = make(map[uint64]map[string]string) result = make(map[uint64]map[string]string)
for _, dbLabel := range dbLabels { for _, dbLabel := range dbLabels {
_, ok := result[dbLabel.ResourceId] _, ok := result[dbLabel.ResourceID]
if !ok { if !ok {
result[dbLabel.ResourceId] = make(map[string]string) result[dbLabel.ResourceID] = make(map[string]string)
} }
result[dbLabel.ResourceId][dbLabel.Key] = dbLabel.Value result[dbLabel.ResourceID][dbLabel.Key] = dbLabel.Value
} }
return return

View File

@@ -1,12 +1,6 @@
package v1 package v1
import ( import (
"encoding/json"
"fmt"
"github.com/onepanelio/core/pkg/util/mapping"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"strings"
"time" "time"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
@@ -70,131 +64,6 @@ type Metric struct {
Format string `json:"omitempty"` Format string `json:"omitempty"`
} }
type CronWorkflow struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
ModifiedAt *time.Time `db:"modified_at"`
UID string
Name string
GenerateName string
WorkflowExecution *WorkflowExecution
Labels map[string]string
Version int64
WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"`
Manifest string
Namespace string `db:"namespace"`
}
func (cw *CronWorkflow) GetParametersFromWorkflowSpec() ([]Parameter, error) {
var parameters []Parameter
mappedData := make(map[string]interface{})
if err := yaml.Unmarshal([]byte(cw.Manifest), mappedData); err != nil {
return nil, err
}
workflowSpec, ok := mappedData["workflowSpec"]
if !ok {
return parameters, nil
}
workflowSpecMap := workflowSpec.(map[interface{}]interface{})
arguments, ok := workflowSpecMap["arguments"]
if !ok {
return parameters, nil
}
argumentsMap := arguments.(map[interface{}]interface{})
parametersRaw, ok := argumentsMap["parameters"]
if !ok {
return parameters, nil
}
parametersArray, ok := parametersRaw.([]interface{})
for _, parameter := range parametersArray {
paramMap, ok := parameter.(map[interface{}]interface{})
if !ok {
continue
}
workflowParameter := ParameterFromMap(paramMap)
parameters = append(parameters, *workflowParameter)
}
return parameters, nil
}
func (cw *CronWorkflow) GetParametersFromWorkflowSpecJson() ([]byte, error) {
parameters, err := cw.GetParametersFromWorkflowSpec()
if err != nil {
return nil, err
}
parametersJson, err := json.Marshal(parameters)
if err != nil {
return nil, err
}
return parametersJson, nil
}
func (cw *CronWorkflow) AddToManifestSpec(key, manifest string) error {
currentManifestMapping, err := mapping.NewFromYamlString(cw.Manifest)
if err != nil {
return err
}
additionalManifest, err := mapping.NewFromYamlString(manifest)
if err != nil {
return err
}
currentManifestMapping[key] = additionalManifest
updatedManifest, err := currentManifestMapping.ToYamlBytes()
if err != nil {
return err
}
cw.Manifest = string(updatedManifest)
return nil
}
type WorkflowTemplate struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
ModifiedAt *time.Time `db:"modified_at"`
UID string
Namespace string
Name string
Manifest string
Version int64 // The latest version, unix timestamp
Versions int64 `db:"versions"` // How many versions there are of this template total.
IsLatest bool
IsArchived bool `db:"is_archived"`
IsSystem bool `db:"is_system"`
ArgoWorkflowTemplate *wfv1.WorkflowTemplate
Labels map[string]string
WorkflowExecutionStatisticReport *WorkflowExecutionStatisticReport
CronWorkflowsStatisticsReport *CronWorkflowStatisticReport
// todo rename to have ID suffix
WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"` // Reference to the associated workflow template version.
Resource *string // utility in case we are specifying a workflow template for a specific resource
ResourceUID *string // see Resource field
}
type Label struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
Key string
Value string
Resource string
ResourceId uint64 `db:"resource_id"`
}
type WorkflowExecutionStatisticReport struct { type WorkflowExecutionStatisticReport struct {
WorkflowTemplateId uint64 `db:"workflow_template_id"` WorkflowTemplateId uint64 `db:"workflow_template_id"`
Total int32 Total int32
@@ -210,355 +79,10 @@ type CronWorkflowStatisticReport struct {
Total int32 Total int32
} }
type WorkflowTemplateVersion struct {
ID uint64
UID string
Version int64
IsLatest bool `db:"is_latest"`
Manifest string
CreatedAt time.Time `db:"created_at"`
WorkflowTemplate *WorkflowTemplate `db:"workflow_template"`
Labels map[string]string
}
type WorkflowExecutionStatistic struct {
ID uint64
WorkflowTemplateId uint64
Name string
Namespace string
//Interface to support null values for timestamps, when scanning from db into structs
CreatedAt *time.Time `db:"created_at"`
FinishedAt *time.Time `db:"finished_at"`
FailedAt *time.Time `db:"failed_at"`
}
func (wt *WorkflowTemplate) GetManifestBytes() []byte {
return []byte(wt.Manifest)
}
func (wt *WorkflowTemplate) GetParametersKeyString() (map[string]string, error) {
root := make(map[interface{}]interface{})
if err := yaml.Unmarshal(wt.GetManifestBytes(), root); err != nil {
return nil, err
}
arguments, ok := root["arguments"]
if !ok {
return nil, nil
}
argumentsMap, ok := arguments.(map[interface{}]interface{})
if !ok {
return nil, nil
}
parameters, ok := argumentsMap["parameters"]
if !ok {
return nil, nil
}
parametersAsArray, ok := parameters.([]interface{})
if !ok {
return nil, nil
}
if len(parametersAsArray) == 0 {
delete(root, arguments)
}
result := make(map[string]string)
for index, parameter := range parametersAsArray {
parameterMap, ok := parameter.(map[interface{}]interface{})
if !ok {
continue
}
key := parameterMap["name"]
keyAsString, ok := key.(string)
if !ok {
continue
}
parameterMap["order"] = index
remainingParameters, err := yaml.Marshal(parameterMap)
if err != nil {
continue
}
result[keyAsString] = string(remainingParameters)
}
return result, nil
}
func (wt *WorkflowTemplate) UpdateManifestParameters(params []Parameter) error {
manifestMap, err := mapping.NewFromYamlString(wt.Manifest)
if err != nil {
return err
}
arguments, err := manifestMap.GetChildMap("arguments")
if err != nil {
return err
}
arguments["parameters"] = params
manifestBytes, err := manifestMap.ToYamlBytes()
if err != nil {
return err
}
wt.Manifest = string(manifestBytes)
return nil
}
func (wt *WorkflowTemplate) GetWorkflowManifestBytes() ([]byte, error) {
if wt.ArgoWorkflowTemplate == nil {
return []byte{}, nil
}
wt.ArgoWorkflowTemplate.TypeMeta.Kind = "Workflow"
wt.ArgoWorkflowTemplate.ObjectMeta = metav1.ObjectMeta{
GenerateName: wt.ArgoWorkflowTemplate.ObjectMeta.GenerateName,
Labels: wt.ArgoWorkflowTemplate.ObjectMeta.Labels,
}
return json.Marshal(wt.ArgoWorkflowTemplate)
}
func (wt *WorkflowTemplate) FormatManifest() (string, error) {
manifestMap, err := mapping.NewFromYamlString(wt.Manifest)
if err != nil {
log.WithFields(log.Fields{
"Method": "FormatManifest",
"Step": "NewFromYamlString",
"Error": err.Error(),
}).Error("FormatManifest Workflow Template failed.")
return "", nil
}
manifestMap, err = manifestMap.GetChildMap("spec")
if err != nil {
log.WithFields(log.Fields{
"Method": "FormatManifest",
"Step": "GetChildMap",
"Error": err.Error(),
}).Error("GetChildMap Workflow Template failed.")
return "", nil
}
manifestMap.PruneEmpty()
wt.AddWorkflowTemplateParametersFromAnnotations(manifestMap)
manifestBytes, err := manifestMap.ToYamlBytes()
if err != nil {
log.WithFields(log.Fields{
"Method": "FormatManifest",
"Step": "ToYamlBytes",
"Error": err.Error(),
}).Error("ToYamlBytes Workflow Template failed.")
}
return string(manifestBytes), nil
}
// Take the manifest from the workflow template, which is just the "spec" contents
// and wrap it so we have
// {
// metadata: {},
// spec: spec_data
// }
// the above wrapping is what is returned.
func (wt *WorkflowTemplate) WrapSpec() ([]byte, error) {
data := wt.GetManifestBytes()
mapping := make(map[interface{}]interface{})
if err := yaml.Unmarshal(data, mapping); err != nil {
return nil, err
}
contentMap := map[interface{}]interface{}{
"metadata": make(map[interface{}]interface{}),
"spec": mapping,
}
finalBytes, err := yaml.Marshal(contentMap)
if err != nil {
return nil, nil
}
return finalBytes, nil
}
func (wt *WorkflowTemplate) AddWorkflowTemplateParametersFromAnnotations(spec mapping.Mapping) {
if wt.ArgoWorkflowTemplate == nil {
return
}
annotations := wt.ArgoWorkflowTemplate.Annotations
if spec == nil || len(annotations) == 0 {
return
}
arguments, err := spec.GetChildMap("arguments")
if err != nil {
return
}
arguments["parameters"] = make([]interface{}, 0)
parameters := make([]interface{}, len(annotations))
for _, value := range annotations {
data, err := mapping.NewFromYamlString(value)
if err != nil {
log.WithFields(log.Fields{
"Method": "AddWorkflowTemplateParametersFromAnnotations",
"Step": "NewFromYamlString",
"Error": err.Error(),
}).Error("Error with AddWorkflowTemplateParametersFromAnnotations")
continue
}
order := 0
orderValue, ok := data["order"]
if ok {
order = orderValue.(int)
delete(data, "order")
if order >= 0 && order < len(parameters) {
parameters[order] = data
}
}
}
arguments["parameters"] = parameters
}
type WorkflowExecution struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
UID string
Name string
GenerateName string
Parameters []Parameter
ParametersBytes []byte `db:"parameters"` // to load from database
Manifest string
Phase wfv1.NodePhase
StartedAt *time.Time `db:"started_at"`
FinishedAt *time.Time `db:"finished_at"`
WorkflowTemplate *WorkflowTemplate `db:"workflow_template"`
Labels map[string]string
}
// TODO: reference this in WorkflowExecution
type WorkflowExecutionStatus struct {
Phase wfv1.NodePhase `json:"phase"`
StartedAt *time.Time `db:"started_at" json:"startedAt"`
FinishedAt *time.Time `db:"finished_at" json:"finishedAt"`
}
func (we *WorkflowExecution) LoadParametersFromBytes() ([]Parameter, error) {
loadedParameters := make([]Parameter, 0)
err := json.Unmarshal(we.ParametersBytes, &loadedParameters)
if err != nil {
return we.Parameters, err
}
// It might be nil because the value "null" is stored in db if there are no parameters.
// for consistency, we return an empty array.
if loadedParameters == nil {
loadedParameters = make([]Parameter, 0)
}
we.Parameters = loadedParameters
return we.Parameters, err
}
type ListOptions = metav1.ListOptions type ListOptions = metav1.ListOptions
type PodGCStrategy = wfv1.PodGCStrategy type PodGCStrategy = wfv1.PodGCStrategy
type WorkflowExecutionOptions struct {
Name string
GenerateName string
Entrypoint string
Parameters []Parameter
ServiceAccount string
Labels *map[string]string
ListOptions *ListOptions
PodGCStrategy *PodGCStrategy
}
type File struct {
Path string
Name string
Size int64
Extension string
ContentType string
LastModified time.Time
Directory bool
}
// Given a path, returns the parent path, asssuming a '/' delimitor
// Result does not have a trailing slash.
// -> a/b/c/d would return a/b/c
// -> a/b/c/d/ would return a/b/c
// If path is empty string, it is returned.
// If path is '/' (root) it is returned as is.
// If there is no '/', '/' is returned.
func FilePathToParentPath(path string) string {
separator := "/"
if path == "" || path == separator {
return path
}
if strings.HasSuffix(path, "/") {
path = path[0 : len(path)-1]
}
lastIndexOfForwardSlash := strings.LastIndex(path, separator)
if lastIndexOfForwardSlash <= 0 {
return separator
}
return path[0:lastIndexOfForwardSlash]
}
func FilePathToExtension(path string) string {
dotIndex := strings.LastIndex(path, ".")
if dotIndex == -1 {
return ""
}
if dotIndex == (len(path) - 1) {
return ""
}
return path[dotIndex+1:]
}
func FilePathToName(path string) string {
if strings.HasSuffix(path, "/") {
path = path[:len(path)-1]
}
lastSlashIndex := strings.LastIndex(path, "/")
if lastSlashIndex < 0 {
return path
}
return path[lastSlashIndex+1:]
}
func WorkflowTemplatesToIds(workflowTemplates []*WorkflowTemplate) (ids []uint64) { func WorkflowTemplatesToIds(workflowTemplates []*WorkflowTemplate) (ids []uint64) {
mappedIds := make(map[uint64]bool) mappedIds := make(map[uint64]bool)
@@ -574,7 +98,7 @@ func WorkflowTemplatesToIds(workflowTemplates []*WorkflowTemplate) (ids []uint64
return return
} }
func WorkflowTemplatesToVersionIds(workflowTemplates []*WorkflowTemplate) (ids []uint64) { func WorkflowTemplatesToVersionIDs(workflowTemplates []*WorkflowTemplate) (ids []uint64) {
mappedIds := make(map[uint64]bool) mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates // This is to make sure we don't have duplicates
@@ -588,117 +112,3 @@ func WorkflowTemplatesToVersionIds(workflowTemplates []*WorkflowTemplate) (ids [
return return
} }
func WorkflowTemplateVersionsToIds(resources []*WorkflowTemplateVersion) (ids []uint64) {
mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates
for _, resource := range resources {
mappedIds[resource.ID] = true
}
for id := range mappedIds {
ids = append(ids, id)
}
return
}
func CronWorkflowsToIds(resources []*CronWorkflow) (ids []uint64) {
mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates
for _, resource := range resources {
mappedIds[resource.ID] = true
}
for id := range mappedIds {
ids = append(ids, id)
}
return
}
func WorkspacesToIds(resources []*Workspace) (ids []uint64) {
mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates
for _, resource := range resources {
mappedIds[resource.ID] = true
}
for id := range mappedIds {
ids = append(ids, id)
}
return
}
// Returns a list of column names prefixed with alias, and named to destination. Extra columns are added to the end of the list.
// Setting destination to empty string will not apply any destination.
// Example - with destination
//
// Input: ([id, name], "w", "workflow")
// Output: [w.id "workflow.id", w.name "workflow.name"]
//
// Example - no destination
// Input: ([id, name], "w", "")
// Output: [w.id, w.name]
func formatColumnSelect(columns []string, alias, destination string, extraColumns ...string) []string {
results := make([]string, 0)
for _, str := range columns {
result := alias + "." + str
if destination != "" {
result += fmt.Sprintf(` "%v.%v"`, destination, str)
}
results = append(results, result)
}
results = append(results, extraColumns...)
return results
}
// returns all of the columns for workflowTemplate modified by alias, destination.
// see formatColumnSelect
func getWorkflowTemplateColumns(alias string, destination string, extraColumns ...string) []string {
columns := []string{"id", "created_at", "uid", "name", "namespace", "modified_at", "is_archived"}
return formatColumnSelect(columns, alias, destination, extraColumns...)
}
// returns all of the columns for workflow template versions modified by alias, destination.
// see formatColumnSelect
func getWorkflowTemplateVersionColumns(alias string, destination string, extraColumns ...string) []string {
columns := []string{"id", "created_at", "version", "is_latest", "manifest"}
return formatColumnSelect(columns, alias, destination, extraColumns...)
}
// returns all of the columns for workflowExecution modified by alias, destination.
// see formatColumnSelect
func getWorkflowExecutionColumns(alias string, destination string, extraColumns ...string) []string {
columns := []string{"id", "created_at", "uid", "name", "parameters", "phase", "started_at", "finished_at"}
return formatColumnSelect(columns, alias, destination, extraColumns...)
}
// returns all of the columns for cronWorkflow modified by alias, destination.
// see formatColumnSelect
func getCronWorkflowColumns(extraColumns ...string) []string {
results := []string{"cw.id", "cw.created_at", "cw.uid", "cw.name", "cw.workflow_template_version_id", "cw.manifest", "cw.namespace"}
for _, str := range extraColumns {
results = append(results, str)
}
return results
}
func LabelsToMapping(labels ...*Label) map[string]string {
result := make(map[string]string)
for _, label := range labels {
result[label.Key] = label.Value
}
return result
}

View File

@@ -0,0 +1,79 @@
package v1
import (
"encoding/json"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/sqlutil"
"time"
)
type WorkflowExecution struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
UID string
Name string
GenerateName string
Parameters []Parameter
ParametersBytes []byte `db:"parameters"` // to load from database
Manifest string
Phase wfv1.NodePhase
StartedAt *time.Time `db:"started_at"`
FinishedAt *time.Time `db:"finished_at"`
WorkflowTemplate *WorkflowTemplate `db:"workflow_template"`
Labels map[string]string
}
type WorkflowExecutionOptions struct {
Name string
GenerateName string
Entrypoint string
Parameters []Parameter
ServiceAccount string
Labels *map[string]string
ListOptions *ListOptions
PodGCStrategy *PodGCStrategy
}
type WorkflowExecutionStatistic struct {
ID uint64
WorkflowTemplateId uint64
Name string
Namespace string
//Interface to support null values for timestamps, when scanning from db into structs
CreatedAt *time.Time `db:"created_at"`
FinishedAt *time.Time `db:"finished_at"`
FailedAt *time.Time `db:"failed_at"`
}
// TODO: reference this in WorkflowExecution
type WorkflowExecutionStatus struct {
Phase wfv1.NodePhase `json:"phase"`
StartedAt *time.Time `db:"started_at" json:"startedAt"`
FinishedAt *time.Time `db:"finished_at" json:"finishedAt"`
}
func (we *WorkflowExecution) LoadParametersFromBytes() ([]Parameter, error) {
loadedParameters := make([]Parameter, 0)
err := json.Unmarshal(we.ParametersBytes, &loadedParameters)
if err != nil {
return we.Parameters, err
}
// It might be nil because the value "null" is stored in db if there are no parameters.
// for consistency, we return an empty array.
if loadedParameters == nil {
loadedParameters = make([]Parameter, 0)
}
we.Parameters = loadedParameters
return we.Parameters, err
}
// getWorkflowExecutionColumns returns all of the columns for workflowExecution modified by alias, destination.
// see formatColumnSelect
func getWorkflowExecutionColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "created_at", "uid", "name", "parameters", "phase", "started_at", "finished_at"}
return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
}

View File

@@ -66,14 +66,9 @@ func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *Work
return nil, nil, err return nil, nil, err
} }
if len(workflowTemplate.Labels) > 0 { _, err = c.InsertLabelsRunner(tx, TypeWorkflowTemplateVersion, workflowTemplateVersion.ID, workflowTemplate.Labels)
_, err = c.InsertLabelsBuilder(TypeWorkflowTemplateVersion, workflowTemplateVersion.ID, workflowTemplate.Labels). if err != nil {
RunWith(tx). return nil, nil, err
Exec()
if err != nil {
return nil, nil, err
}
} }
argoWft, err := createArgoWorkflowTemplate(workflowTemplate, versionUnix) argoWft, err := createArgoWorkflowTemplate(workflowTemplate, versionUnix)
@@ -95,8 +90,8 @@ func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *Work
} }
if err = tx.Commit(); err != nil { if err = tx.Commit(); err != nil {
if err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(argoWft.Name, &v1.DeleteOptions{}); err != nil { if errDelete := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(argoWft.Name, &v1.DeleteOptions{}); errDelete != nil {
log.Printf("Unable to delete argo workflow template") err = fmt.Errorf("%w; %s", err, errDelete)
} }
return nil, nil, err return nil, nil, err
} }
@@ -107,7 +102,7 @@ func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *Work
} }
func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder { func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived"). sb := sb.Select(getWorkflowTemplateColumns("wt", "")...).
From("workflow_templates wt"). From("workflow_templates wt").
Where(sq.Eq{ Where(sq.Eq{
"wt.namespace": namespace, "wt.namespace": namespace,
@@ -117,7 +112,7 @@ func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuild
} }
func (c *Client) workflowTemplatesVersionSelectBuilder(namespace string) sq.SelectBuilder { func (c *Client) workflowTemplatesVersionSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select("wtv.id", "wtv.version", "wtv.is_latest", "wtv.manifest", "wtv.created_at"). sb := sb.Select(getWorkflowTemplateVersionColumns("wtv", "")...).
From("workflow_template_versions wtv"). From("workflow_template_versions wtv").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id"). Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{ Where(sq.Eq{
@@ -239,7 +234,7 @@ func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (work
workflowTemplate.Version = templateVersion workflowTemplate.Version = templateVersion
labelsMap, err := c.GetDbLabelsMapped(TypeWorkflowTemplateVersion, workflowTemplate.WorkflowTemplateVersionId) labelsMap, err := c.GetDBLabelsMapped(TypeWorkflowTemplateVersion, workflowTemplate.WorkflowTemplateVersionId)
if err != nil { if err != nil {
return workflowTemplate, err return workflowTemplate, err
} }
@@ -249,6 +244,7 @@ func (c *Client) getWorkflowTemplate(namespace, uid string, version int64) (work
return workflowTemplate, nil return workflowTemplate, nil
} }
// TODO version = is latest?
func (c *Client) getWorkflowTemplateByName(namespace, name string, version int64) (workflowTemplate *WorkflowTemplate, err error) { func (c *Client) getWorkflowTemplateByName(namespace, name string, version int64) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate = &WorkflowTemplate{} workflowTemplate = &WorkflowTemplate{}
@@ -278,7 +274,7 @@ func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTe
return nil, err return nil, err
} }
labelsMap, err := c.GetDbLabelsMapped(TypeWorkflowTemplateVersion, WorkflowTemplateVersionsToIds(dbVersions)...) labelsMap, err := c.GetDBLabelsMapped(TypeWorkflowTemplateVersion, WorkflowTemplateVersionsToIDs(dbVersions)...)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"Namespace": namespace, "Namespace": namespace,
@@ -521,6 +517,7 @@ func (c *Client) GetWorkflowTemplate(namespace, uid string, version int64) (work
return return
} }
// TODO version 0 = latest?
func (c *Client) GetWorkflowTemplateByName(namespace, name string, version int64) (workflowTemplate *WorkflowTemplate, err error) { func (c *Client) GetWorkflowTemplateByName(namespace, name string, version int64) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate, err = c.getWorkflowTemplateByName(namespace, name, version) workflowTemplate, err = c.getWorkflowTemplateByName(namespace, name, version)
if err != nil { if err != nil {
@@ -599,7 +596,7 @@ func (c *Client) ListWorkflowTemplates(namespace string, paginator *pagination.P
return nil, util.NewUserError(codes.NotFound, "Unable to get Cron Workflow Statistic for Templates.") return nil, util.NewUserError(codes.NotFound, "Unable to get Cron Workflow Statistic for Templates.")
} }
labelsMap, err := c.GetDbLabelsMapped(TypeWorkflowTemplateVersion, WorkflowTemplatesToVersionIds(workflowTemplateVersions)...) labelsMap, err := c.GetDBLabelsMapped(TypeWorkflowTemplateVersion, WorkflowTemplatesToVersionIDs(workflowTemplateVersions)...)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"Namespace": namespace, "Namespace": namespace,

View File

@@ -0,0 +1,250 @@
package v1
import (
"encoding/json"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/pkg/util/mapping"
"github.com/onepanelio/core/sqlutil"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
)
type WorkflowTemplate struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
ModifiedAt *time.Time `db:"modified_at"`
UID string
Namespace string
Name string
Manifest string
Version int64 // The latest version, unix timestamp
Versions int64 `db:"versions"` // How many versions there are of this template total.
IsLatest bool
IsArchived bool `db:"is_archived"`
IsSystem bool `db:"is_system"`
ArgoWorkflowTemplate *wfv1.WorkflowTemplate
Labels map[string]string
WorkflowExecutionStatisticReport *WorkflowExecutionStatisticReport
CronWorkflowsStatisticsReport *CronWorkflowStatisticReport
// todo rename to have ID suffix
WorkflowTemplateVersionId uint64 `db:"workflow_template_version_id"` // Reference to the associated workflow template version.
Resource *string // utility in case we are specifying a workflow template for a specific resource
ResourceUID *string // see Resource field
}
func (wt *WorkflowTemplate) GetManifestBytes() []byte {
return []byte(wt.Manifest)
}
func (wt *WorkflowTemplate) GetParametersKeyString() (map[string]string, error) {
root := make(map[interface{}]interface{})
if err := yaml.Unmarshal(wt.GetManifestBytes(), root); err != nil {
return nil, err
}
arguments, ok := root["arguments"]
if !ok {
return nil, nil
}
argumentsMap, ok := arguments.(map[interface{}]interface{})
if !ok {
return nil, nil
}
parameters, ok := argumentsMap["parameters"]
if !ok {
return nil, nil
}
parametersAsArray, ok := parameters.([]interface{})
if !ok {
return nil, nil
}
if len(parametersAsArray) == 0 {
delete(root, arguments)
}
result := make(map[string]string)
for index, parameter := range parametersAsArray {
parameterMap, ok := parameter.(map[interface{}]interface{})
if !ok {
continue
}
key := parameterMap["name"]
keyAsString, ok := key.(string)
if !ok {
continue
}
parameterMap["order"] = index
remainingParameters, err := yaml.Marshal(parameterMap)
if err != nil {
continue
}
result[keyAsString] = string(remainingParameters)
}
return result, nil
}
func (wt *WorkflowTemplate) UpdateManifestParameters(params []Parameter) error {
manifestMap, err := mapping.NewFromYamlString(wt.Manifest)
if err != nil {
return err
}
arguments, err := manifestMap.GetChildMap("arguments")
if err != nil {
return err
}
arguments["parameters"] = params
manifestBytes, err := manifestMap.ToYamlBytes()
if err != nil {
return err
}
wt.Manifest = string(manifestBytes)
return nil
}
func (wt *WorkflowTemplate) GetWorkflowManifestBytes() ([]byte, error) {
if wt.ArgoWorkflowTemplate == nil {
return []byte{}, nil
}
wt.ArgoWorkflowTemplate.TypeMeta.Kind = "Workflow"
wt.ArgoWorkflowTemplate.ObjectMeta = metav1.ObjectMeta{
GenerateName: wt.ArgoWorkflowTemplate.ObjectMeta.GenerateName,
Labels: wt.ArgoWorkflowTemplate.ObjectMeta.Labels,
}
return json.Marshal(wt.ArgoWorkflowTemplate)
}
func (wt *WorkflowTemplate) FormatManifest() (string, error) {
manifestMap, err := mapping.NewFromYamlString(wt.Manifest)
if err != nil {
log.WithFields(log.Fields{
"Method": "FormatManifest",
"Step": "NewFromYamlString",
"Error": err.Error(),
}).Error("FormatManifest Workflow Template failed.")
return "", nil
}
manifestMap, err = manifestMap.GetChildMap("spec")
if err != nil {
log.WithFields(log.Fields{
"Method": "FormatManifest",
"Step": "GetChildMap",
"Error": err.Error(),
}).Error("GetChildMap Workflow Template failed.")
return "", nil
}
manifestMap.PruneEmpty()
wt.AddWorkflowTemplateParametersFromAnnotations(manifestMap)
manifestBytes, err := manifestMap.ToYamlBytes()
if err != nil {
log.WithFields(log.Fields{
"Method": "FormatManifest",
"Step": "ToYamlBytes",
"Error": err.Error(),
}).Error("ToYamlBytes Workflow Template failed.")
}
return string(manifestBytes), nil
}
// Take the manifest from the workflow template, which is just the "spec" contents
// and wrap it so we have
// {
// metadata: {},
// spec: spec_data
// }
// the above wrapping is what is returned.
func (wt *WorkflowTemplate) WrapSpec() ([]byte, error) {
data := wt.GetManifestBytes()
spec := make(map[interface{}]interface{})
if err := yaml.Unmarshal(data, spec); err != nil {
return nil, err
}
contentMap := map[interface{}]interface{}{
"metadata": make(map[interface{}]interface{}),
"spec": spec,
}
finalBytes, err := yaml.Marshal(contentMap)
if err != nil {
return nil, nil
}
return finalBytes, nil
}
func (wt *WorkflowTemplate) AddWorkflowTemplateParametersFromAnnotations(spec mapping.Mapping) {
if wt.ArgoWorkflowTemplate == nil {
return
}
annotations := wt.ArgoWorkflowTemplate.Annotations
if spec == nil || len(annotations) == 0 {
return
}
arguments, err := spec.GetChildMap("arguments")
if err != nil {
return
}
arguments["parameters"] = make([]interface{}, 0)
parameters := make([]interface{}, len(annotations))
for _, value := range annotations {
data, err := mapping.NewFromYamlString(value)
if err != nil {
log.WithFields(log.Fields{
"Method": "AddWorkflowTemplateParametersFromAnnotations",
"Step": "NewFromYamlString",
"Error": err.Error(),
}).Error("Error with AddWorkflowTemplateParametersFromAnnotations")
continue
}
order := 0
orderValue, ok := data["order"]
if ok {
order = orderValue.(int)
delete(data, "order")
if order >= 0 && order < len(parameters) {
parameters[order] = data
}
}
}
arguments["parameters"] = parameters
}
// getWorkflowTemplateColumns returns all of the columns for workflowTemplate modified by alias, destination.
// see formatColumnSelect
func getWorkflowTemplateColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "created_at", "uid", "name", "namespace", "modified_at", "is_archived"}
return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
}

View File

@@ -0,0 +1,40 @@
package v1
import (
"github.com/onepanelio/core/sqlutil"
"time"
)
type WorkflowTemplateVersion struct {
ID uint64
UID string
Version int64
IsLatest bool `db:"is_latest"`
Manifest string
CreatedAt time.Time `db:"created_at"`
WorkflowTemplate *WorkflowTemplate `db:"workflow_template"`
Labels map[string]string
}
// WorkflowTemplateVersionsToIDs returns an array of ids from the input WorkflowTemplateVersion with no duplicates.
func WorkflowTemplateVersionsToIDs(resources []*WorkflowTemplateVersion) (ids []uint64) {
mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates
for _, resource := range resources {
mappedIds[resource.ID] = true
}
for id := range mappedIds {
ids = append(ids, id)
}
return
}
// getWorkflowTemplateVersionColumns returns all of the columns for workflow template versions modified by alias, destination.
// see formatColumnSelect
func getWorkflowTemplateVersionColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "created_at", "version", "is_latest", "manifest"}
return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
}

View File

@@ -16,7 +16,7 @@ import (
) )
func (c *Client) workspacesSelectBuilder(namespace string) sq.SelectBuilder { func (c *Client) workspacesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select(getWorkspaceColumns("w", "")...). sb := sb.Select(getWorkspaceColumns("w")...).
Columns(getWorkspaceStatusColumns("w", "status")...). Columns(getWorkspaceStatusColumns("w", "status")...).
Columns(getWorkspaceTemplateColumns("wt", "workspace_template")...). Columns(getWorkspaceTemplateColumns("wt", "workspace_template")...).
Columns(getWorkflowTemplateVersionColumns("wftv", "workflow_template_version")...). Columns(getWorkflowTemplateVersionColumns("wftv", "workflow_template_version")...).
@@ -235,7 +235,7 @@ func (c *Client) GetWorkspace(namespace, uid string) (workspace *Workspace, err
return return
} }
labelsMap, err := c.GetDbLabelsMapped(TypeWorkspace, workspace.ID) labelsMap, err := c.GetDBLabelsMapped(TypeWorkspace, workspace.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -291,7 +291,7 @@ func (c *Client) UpdateWorkspaceStatus(namespace, uid string, status *WorkspaceS
// ListWorkspacesByTemplateID will return all the workspaces for a given workspace template id. // ListWorkspacesByTemplateID will return all the workspaces for a given workspace template id.
// Sourced from database. // Sourced from database.
func (c *Client) ListWorkspacesByTemplateID(namespace string, templateID uint64) (workspaces []*Workspace, err error) { func (c *Client) ListWorkspacesByTemplateID(namespace string, templateID uint64) (workspaces []*Workspace, err error) {
sb := sb.Select(getWorkspaceColumns("w", "")...). sb := sb.Select(getWorkspaceColumns("w")...).
From("workspaces w"). From("workspaces w").
Where(sq.And{ Where(sq.And{
sq.Eq{ sq.Eq{
@@ -311,7 +311,7 @@ func (c *Client) ListWorkspacesByTemplateID(namespace string, templateID uint64)
return nil, err return nil, err
} }
labelMap, err := c.GetDbLabelsMapped(TypeWorkspace, WorkspacesToIds(workspaces)...) labelMap, err := c.GetDBLabelsMapped(TypeWorkspace, WorkspacesToIDs(workspaces)...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -323,7 +323,7 @@ func (c *Client) ListWorkspacesByTemplateID(namespace string, templateID uint64)
} }
func (c *Client) ListWorkspaces(namespace string, paginator *pagination.PaginationRequest) (workspaces []*Workspace, err error) { func (c *Client) ListWorkspaces(namespace string, paginator *pagination.PaginationRequest) (workspaces []*Workspace, err error) {
sb := sb.Select(getWorkspaceColumns("w", "")...). sb := sb.Select(getWorkspaceColumns("w")...).
Columns(getWorkspaceStatusColumns("w", "status")...). Columns(getWorkspaceStatusColumns("w", "status")...).
Columns(getWorkspaceTemplateColumns("wt", "workspace_template")...). Columns(getWorkspaceTemplateColumns("wt", "workspace_template")...).
From("workspaces w"). From("workspaces w").
@@ -339,16 +339,11 @@ func (c *Client) ListWorkspaces(namespace string, paginator *pagination.Paginati
}) })
sb = *paginator.ApplyToSelect(&sb) sb = *paginator.ApplyToSelect(&sb)
query, args, err := sb.ToSql() if err := c.DB.Selectx(&workspaces, sb); err != nil {
if err != nil {
return nil, err return nil, err
} }
if err := c.DB.Select(&workspaces, query, args...); err != nil { labelMap, err := c.GetDBLabelsMapped(TypeWorkspace, WorkspacesToIDs(workspaces)...)
return nil, err
}
labelMap, err := c.GetDbLabelsMapped(TypeWorkspace, WorkspacesToIds(workspaces)...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -22,13 +22,165 @@ import (
"strings" "strings"
) )
// createWorkspaceTemplateVersionDB creates a workspace template version in the database.
func createWorkspaceTemplateVersionDB(tx *sql.Tx, workspaceTemplateID uint64, version int64, manifest string, isLatest bool) (id uint64, err error) {
err = sb.Insert("workspace_template_versions").
SetMap(sq.Eq{
"version": version,
"is_latest": isLatest,
"manifest": manifest,
"workspace_template_id": workspaceTemplateID,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&id)
return
}
// markWorkspaceTemplateVersionsOutdatedDB updates all of the workspace template versions in db so is_latest is false
// given the workspaceTemplateID
func markWorkspaceTemplateVersionsOutdatedDB(tx *sql.Tx, workspaceTemplateID uint64) (err error) {
_, err = sb.Update("workspace_template_versions").
SetMap(sq.Eq{"is_latest": false}).
Where(sq.Eq{
"workspace_template_id": workspaceTemplateID,
"is_latest": true,
}).
RunWith(tx).
Exec()
return
}
// createLatestWorkspaceTemplateVersionDB creates a new workspace template version and marks all previous versions as not latest.
func createLatestWorkspaceTemplateVersionDB(tx *sql.Tx, workspaceTemplateID uint64, version int64, manifest string) (id uint64, err error) {
id, err = createWorkspaceTemplateVersionDB(tx, workspaceTemplateID, version, manifest, true)
if err != nil {
return
}
err = markWorkspaceTemplateVersionsOutdatedDB(tx, workspaceTemplateID)
return
}
func parseWorkspaceSpec(template string) (spec *WorkspaceSpec, err error) { func parseWorkspaceSpec(template string) (spec *WorkspaceSpec, err error) {
err = yaml.UnmarshalStrict([]byte(template), &spec) err = yaml.UnmarshalStrict([]byte(template), &spec)
return return
} }
func generateArguments(spec *WorkspaceSpec, config map[string]string, withRuntimeVars bool) (err error) { func generateRuntimeParamters(config SystemConfig) (parameters []Parameter, err error) {
parameters = make([]Parameter, 0)
// Host
parameters = append(parameters, Parameter{
Name: "sys-host",
Value: config.Domain(),
Type: "input.hidden",
})
// Node pool parameter and options
options, err := config.ParsedNodePoolOptions()
if err != nil {
return nil, err
}
if len(options) == 0 {
return nil, fmt.Errorf("no node pool options in config")
}
parameters = append(parameters, Parameter{
Name: "sys-node-pool",
Value: ptr.String(options[0].Value),
Type: "select.select",
Options: options,
DisplayName: ptr.String("Node pool"),
Hint: ptr.String("Name of node pool or group"),
Required: true,
})
return
}
func generateStaticParameters() (parameters []Parameter, err error) {
parameters = make([]Parameter, 0)
// Resource action parameter
parameters = append(parameters, Parameter{
Name: "sys-name",
Type: "input.text",
Value: ptr.String("name"),
DisplayName: ptr.String("Workspace name"),
Hint: ptr.String("Must be between 3-30 characters, contain only alphanumeric or `-` characters"),
Required: true,
})
// TODO: These can be removed when lint validation of workflows work
// Resource action parameter
parameters = append(parameters, Parameter{
Name: "sys-resource-action",
Value: ptr.String("apply"),
Type: "input.hidden",
})
// Workspace action
parameters = append(parameters, Parameter{
Name: "sys-workspace-action",
Value: ptr.String("create"),
Type: "input.hidden",
})
// UID placeholder
parameters = append(parameters, Parameter{
Name: "sys-uid",
Value: ptr.String("uid"),
Type: "input.hidden",
})
return
}
func generateVolumeParameters(spec *WorkspaceSpec) (parameters []Parameter, err error) {
if spec == nil {
return nil, fmt.Errorf("workspaceSpec is nil")
}
parameters = make([]Parameter, 0)
// Map all the volumeClaimTemplates that have storage set
volumeStorageQuantityIsSet := make(map[string]bool)
for _, v := range spec.VolumeClaimTemplates {
if v.Spec.Resources.Requests != nil {
volumeStorageQuantityIsSet[v.ObjectMeta.Name] = true
}
}
// Volume size parameters
volumeClaimsMapped := make(map[string]bool)
for _, c := range spec.Containers {
for _, v := range c.VolumeMounts {
// Skip if already mapped or storage size is set
if volumeClaimsMapped[v.Name] || volumeStorageQuantityIsSet[v.Name] {
continue
}
parameters = append(parameters, Parameter{
Name: fmt.Sprintf("sys-%v-volume-size", v.Name),
Type: "input.number",
Value: ptr.String("20480"),
DisplayName: ptr.String(fmt.Sprintf("Disk size for \"%v\"", v.Name)),
Hint: ptr.String(fmt.Sprintf("Disk size in MB for volume mounted at `%v`", v.MountPath)),
Required: true,
})
volumeClaimsMapped[v.Name] = true
}
}
return
}
func generateArguments(spec *WorkspaceSpec, config SystemConfig, withRuntimeVars bool) (err error) {
systemParameters := make([]Parameter, 0) systemParameters := make([]Parameter, 0)
// Resource action parameter // Resource action parameter
systemParameters = append(systemParameters, Parameter{ systemParameters = append(systemParameters, Parameter{
@@ -65,7 +217,7 @@ func generateArguments(spec *WorkspaceSpec, config map[string]string, withRuntim
// Host // Host
systemParameters = append(systemParameters, Parameter{ systemParameters = append(systemParameters, Parameter{
Name: "sys-host", Name: "sys-host",
Value: ptr.String(config["ONEPANEL_DOMAIN"]), Value: config.Domain(),
Type: "input.hidden", Type: "input.hidden",
}) })
@@ -590,12 +742,6 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
} }
workspaceTemplate.UID = uid workspaceTemplate.UID = uid
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
workspaceTemplate.WorkflowTemplate.IsSystem = true workspaceTemplate.WorkflowTemplate.IsSystem = true
workspaceTemplate.WorkflowTemplate.Resource = ptr.String(TypeWorkspaceTemplate) workspaceTemplate.WorkflowTemplate.Resource = ptr.String(TypeWorkspaceTemplate)
workspaceTemplate.WorkflowTemplate.ResourceUID = ptr.String(uid) workspaceTemplate.WorkflowTemplate.ResourceUID = ptr.String(uid)
@@ -606,6 +752,11 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
workspaceTemplate.Version = workspaceTemplate.WorkflowTemplate.Version workspaceTemplate.Version = workspaceTemplate.WorkflowTemplate.Version
workspaceTemplate.IsLatest = true workspaceTemplate.IsLatest = true
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("workspace_templates"). err = sb.Insert("workspace_templates").
SetMap(sq.Eq{ SetMap(sq.Eq{
"uid": uid, "uid": uid,
@@ -615,7 +766,8 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
}). }).
Suffix("RETURNING id, created_at"). Suffix("RETURNING id, created_at").
RunWith(tx). RunWith(tx).
QueryRow().Scan(&workspaceTemplate.ID, &workspaceTemplate.CreatedAt) QueryRow().
Scan(&workspaceTemplate.ID, &workspaceTemplate.CreatedAt)
if err != nil { if err != nil {
_, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID) _, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
errorMsg := "Error with insert into workspace_templates. " errorMsg := "Error with insert into workspace_templates. "
@@ -626,39 +778,27 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
return nil, util.NewUserErrorWrap(err, errorMsg) //return the source error return nil, util.NewUserErrorWrap(err, errorMsg) //return the source error
} }
workspaceTemplateVersionID := uint64(0) workspaceTemplateVersionID, err := createWorkspaceTemplateVersionDB(tx, workspaceTemplate.ID, workspaceTemplate.Version, workspaceTemplate.Manifest, true)
err = sb.Insert("workspace_template_versions").
SetMap(sq.Eq{
"version": workspaceTemplate.Version,
"is_latest": workspaceTemplate.IsLatest,
"manifest": workspaceTemplate.Manifest,
"workspace_template_id": workspaceTemplate.ID,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&workspaceTemplateVersionID)
if err != nil { if err != nil {
_, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
errorMsg := "Error with insert into workspace_templates_versions. " errorMsg := "Error with insert into workspace_templates_versions. "
_, errCleanUp := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
if errCleanUp != nil { if errCleanUp != nil {
err = fmt.Errorf("%w; %s", err, errCleanUp)
errorMsg += "Error with clean-up: ArchiveWorkflowTemplate. " errorMsg += "Error with clean-up: ArchiveWorkflowTemplate. "
errorMsg += errCleanUp.Error()
} }
return nil, util.NewUserErrorWrap(err, errorMsg) //return the source error return nil, util.NewUserErrorWrap(err, errorMsg) // return the source error
} }
if len(workspaceTemplate.Labels) != 0 { _, err = c.InsertLabelsRunner(tx, TypeWorkspaceTemplateVersion, workspaceTemplateVersionID, workspaceTemplate.Labels)
_, err = c.InsertLabelsBuilder(TypeWorkspaceTemplateVersion, workspaceTemplateVersionID, workspaceTemplate.Labels). if err != nil {
RunWith(tx). return nil, err
Exec()
if err != nil {
return nil, err
}
} }
if err = tx.Commit(); err != nil { if err = tx.Commit(); err != nil {
_, err := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID) _, errArchive := c.ArchiveWorkflowTemplate(namespace, workspaceTemplate.WorkflowTemplate.UID)
if errArchive != nil {
err = fmt.Errorf("%w; %s", err, errArchive)
}
return nil, err return nil, err
} }
@@ -666,7 +806,7 @@ func (c *Client) createWorkspaceTemplate(namespace string, workspaceTemplate *Wo
} }
func (c *Client) workspaceTemplatesSelectBuilder(namespace string) sq.SelectBuilder { func (c *Client) workspaceTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select(getWorkspaceTemplateColumns("wt", "")...). sb := sb.Select(getWorkspaceTemplateColumns("wt")...).
From("workspace_templates wt"). From("workspace_templates wt").
Where(sq.Eq{ Where(sq.Eq{
"wt.namespace": namespace, "wt.namespace": namespace,
@@ -775,8 +915,8 @@ func (c *Client) GenerateWorkspaceTemplateWorkflowTemplate(workspaceTemplate *Wo
// CreateWorkspaceTemplate creates a template for Workspaces // CreateWorkspaceTemplate creates a template for Workspaces
func (c *Client) CreateWorkspaceTemplate(namespace string, workspaceTemplate *WorkspaceTemplate) (*WorkspaceTemplate, error) { func (c *Client) CreateWorkspaceTemplate(namespace string, workspaceTemplate *WorkspaceTemplate) (*WorkspaceTemplate, error) {
valid, err := govalidator.ValidateStruct(workspaceTemplate) _, err := govalidator.ValidateStruct(workspaceTemplate)
if err != nil || !valid { if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error()) return nil, util.NewUserError(codes.InvalidArgument, err.Error())
} }
@@ -784,6 +924,7 @@ func (c *Client) CreateWorkspaceTemplate(namespace string, workspaceTemplate *Wo
if err != nil { if err != nil {
return nil, err return nil, err
} }
if existingWorkspaceTemplate != nil { if existingWorkspaceTemplate != nil {
message := fmt.Sprintf("Workspace template with the name '%v' already exists", workspaceTemplate.Name) message := fmt.Sprintf("Workspace template with the name '%v' already exists", workspaceTemplate.Name)
if existingWorkspaceTemplate.IsArchived { if existingWorkspaceTemplate.IsArchived {
@@ -864,57 +1005,32 @@ func (c *Client) UpdateWorkspaceTemplate(namespace string, workspaceTemplate *Wo
updatedWorkflowTemplate.ID = existingWorkspaceTemplate.WorkflowTemplate.ID updatedWorkflowTemplate.ID = existingWorkspaceTemplate.WorkflowTemplate.ID
updatedWorkflowTemplate.UID = existingWorkspaceTemplate.WorkflowTemplate.UID updatedWorkflowTemplate.UID = existingWorkspaceTemplate.WorkflowTemplate.UID
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
updatedWorkflowTemplate.Labels = workspaceTemplate.Labels updatedWorkflowTemplate.Labels = workspaceTemplate.Labels
workflowTemplateVersion, err := c.CreateWorkflowTemplateVersion(namespace, updatedWorkflowTemplate) workflowTemplateVersion, err := c.CreateWorkflowTemplateVersion(namespace, updatedWorkflowTemplate)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO - this might not be needed with recent changes made.
workspaceTemplate.Version = workflowTemplateVersion.Version workspaceTemplate.Version = workflowTemplateVersion.Version
workspaceTemplate.IsLatest = true workspaceTemplate.IsLatest = true
_, err = sb.Update("workspace_template_versions"). tx, err := c.DB.Begin()
SetMap(sq.Eq{"is_latest": false}). if err != nil {
Where(sq.Eq{ return nil, err
"workspace_template_id": workspaceTemplate.ID, }
}). defer tx.Rollback()
RunWith(tx).
Exec() workspaceTemplateVersionID, err := createLatestWorkspaceTemplateVersionDB(tx, workspaceTemplate.ID, workspaceTemplate.Version, workspaceTemplate.Manifest)
if err != nil { if err != nil {
return nil, err return nil, err
} }
workspaceTemplateVersionID := uint64(0) _, err = c.InsertLabelsRunner(tx, TypeWorkspaceTemplateVersion, workspaceTemplateVersionID, workspaceTemplate.Labels)
err = sb.Insert("workspace_template_versions").
SetMap(sq.Eq{
"version": workspaceTemplate.Version,
"is_latest": workspaceTemplate.IsLatest,
"manifest": workspaceTemplate.Manifest,
"workspace_template_id": workspaceTemplate.ID,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().
Scan(&workspaceTemplateVersionID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(workspaceTemplate.Labels) != 0 {
_, err = c.InsertLabelsBuilder(TypeWorkspaceTemplateVersion, workspaceTemplateVersionID, workspaceTemplate.Labels).
RunWith(tx).
Exec()
if err != nil {
return nil, err
}
}
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return nil, err return nil, err
} }
@@ -958,9 +1074,7 @@ func (c *Client) ListWorkspaceTemplateVersions(namespace, uid string) (workspace
return return
} }
ids := WorkspaceTemplatesToVersionIds(workspaceTemplates) labelsMap, err := c.GetDBLabelsMapped(TypeWorkspaceTemplateVersion, WorkspaceTemplatesToVersionIDs(workspaceTemplates)...)
labelsMap, err := c.GetDbLabelsMapped(TypeWorkspaceTemplateVersion, ids...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/pkg/util/ptr" "github.com/onepanelio/core/pkg/util/ptr"
"github.com/onepanelio/core/sqlutil"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"time" "time"
) )
@@ -175,7 +176,7 @@ func (wt *WorkspaceTemplate) InjectRuntimeVariables(config SystemConfig) error {
return nil return nil
} }
func WorkspaceTemplatesToVersionIds(resources []*WorkspaceTemplate) (ids []uint64) { func WorkspaceTemplatesToVersionIDs(resources []*WorkspaceTemplate) (ids []uint64) {
mappedIds := make(map[uint64]bool) mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates // This is to make sure we don't have duplicates
@@ -192,7 +193,7 @@ func WorkspaceTemplatesToVersionIds(resources []*WorkspaceTemplate) (ids []uint6
// getWorkspaceTemplateColumns returns all of the columns for workspace template modified by alias, destination. // getWorkspaceTemplateColumns returns all of the columns for workspace template modified by alias, destination.
// see formatColumnSelect // see formatColumnSelect
func getWorkspaceTemplateColumns(alias string, destination string, extraColumns ...string) []string { func getWorkspaceTemplateColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "uid", "created_at", "modified_at", "name", "namespace", "is_archived", "workflow_template_id"} columns := []string{"id", "uid", "created_at", "modified_at", "name", "namespace", "is_archived", "workflow_template_id"}
return formatColumnSelect(columns, alias, destination, extraColumns...) return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
} }

View File

@@ -3,6 +3,7 @@ package v1
import ( import (
"fmt" "fmt"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/sqlutil"
networking "istio.io/api/networking/v1alpha3" networking "istio.io/api/networking/v1alpha3"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"time" "time"
@@ -62,16 +63,32 @@ func (w *Workspace) GetURL(protocol, domain string) string {
return fmt.Sprintf("%v%v--%v.%v", protocol, w.UID, w.Namespace, domain) return fmt.Sprintf("%v%v--%v.%v", protocol, w.UID, w.Namespace, domain)
} }
// returns all of the columns for workspace modified by alias, destination. // getWorkspaceColumns returns all of the columns for workspace modified by alias, destination.
// see formatColumnSelect // see formatColumnSelect
func getWorkspaceColumns(alias string, destination string, extraColumns ...string) []string { func getWorkspaceColumns(aliasAndDestination ...string) []string {
columns := []string{"id", "created_at", "modified_at", "uid", "name", "namespace", "parameters", "workspace_template_id", "workspace_template_version"} columns := []string{"id", "created_at", "modified_at", "uid", "name", "namespace", "parameters", "workspace_template_id", "workspace_template_version"}
return formatColumnSelect(columns, alias, destination, extraColumns...) return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
} }
// returns all of the columns for WorkspaceStatus modified by alias, destination. // getWorkspaceStatusColumns returns all of the columns for WorkspaceStatus modified by alias, destination.
// see formatColumnSelect // see formatColumnSelect
func getWorkspaceStatusColumns(alias string, destination string, extraColumns ...string) []string { func getWorkspaceStatusColumns(aliasAndDestination ...string) []string {
columns := []string{"phase", "started_at", "paused_at", "terminated_at"} columns := []string{"phase", "started_at", "paused_at", "terminated_at"}
return formatColumnSelect(columns, alias, destination, extraColumns...) return sqlutil.FormatColumnSelect(columns, aliasAndDestination...)
}
// WorkspacesToIDs returns an array of ids from the input Workspaces with no duplicates.
func WorkspacesToIDs(resources []*Workspace) (ids []uint64) {
mappedIds := make(map[uint64]bool)
// This is to make sure we don't have duplicates
for _, resource := range resources {
mappedIds[resource.ID] = true
}
for id := range mappedIds {
ids = append(ids, id)
}
return
} }

View File

@@ -159,7 +159,7 @@ func (s *WorkflowServer) GetWorkflowExecution(ctx context.Context, req *api.GetW
return nil, err return nil, err
} }
mappedLabels, err := client.GetDbLabelsMapped(v1.TypeWorkflowExecution, wf.ID) mappedLabels, err := client.GetDBLabelsMapped(v1.TypeWorkflowExecution, wf.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }