Prototyping removing workflow versions and updating workflow templates to always create a k8 resources.

This commit is contained in:
Andrey Melnikov
2020-04-07 15:50:41 -07:00
parent 2f35f877f2
commit a520f3c6c3
8 changed files with 217 additions and 45 deletions

View File

@@ -0,0 +1,5 @@
-- +goose Up
ALTER TABLE workflow_template_versions DROP COLUMN manifest;
-- +goose Down
ALTER TABLE workflow_template_versions ADD COLUMN manifest TEXT NOT NULL;

View File

@@ -0,0 +1,17 @@
-- +goose Up
DROP TABLE workflow_template_versions;
-- +goose Down
CREATE TABLE workflow_template_versions
(
id serial PRIMARY KEY,
workflow_template_id integer NOT NULL REFERENCES workflow_templates ON DELETE CASCADE,
version integer NOT NULL,
is_latest boolean NOT NULL,
manifest text NOT NULL,
-- auditing info
created_at timestamp NOT NULL DEFAULT (NOW() at time zone 'utc'),
modified_at timestamp
);

1
go.mod
View File

@@ -32,6 +32,7 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc
google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24
google.golang.org/grpc v1.28.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.16.4
k8s.io/apimachinery v0.16.7-beta.0
k8s.io/client-go v0.16.4

View File

@@ -1,6 +1,7 @@
package v1
import (
"gopkg.in/yaml.v2"
"strings"
"time"
@@ -182,3 +183,22 @@ func FilePathToExtension(path string) string {
return path[dotIndex+1:]
}
func WrapSpecInK8s(data []byte) ([]byte, error) {
mapping := make(map[interface{}]interface{})
if err := yaml.Unmarshal(data, mapping); err != nil {
return nil, err
}
contentMap := map[interface{}]interface{}{
"metadata": make(map[interface{}]interface{}),
"spec": mapping,
}
finalBytes, err := yaml.Marshal(contentMap)
if err != nil {
return nil, nil
}
return finalBytes, nil
}

20
pkg/util/number/number.go Normal file
View File

@@ -0,0 +1,20 @@
package number
import (
"fmt"
"strconv"
)
// Takes the input value, increments it, and formats it as a string.
func IncrementStringInt(value string) (string, error) {
numericValue, err := strconv.Atoi(value)
if err != nil {
return value, err
}
numericValue++
stringValue := fmt.Sprintf("%v", numericValue)
return stringValue, nil
}

View File

@@ -916,7 +916,7 @@ func (c *Client) GetWorkflowExecutionLabels(namespace, name, prefix string) (lab
// prefix is the label prefix.
// e.g. prefix/my-label-key: my-label-value
func (c *Client) GetWorkflowTemplateLabels(namespace, name, prefix string) (labels map[string]string, err error) {
wf, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Get(name, metav1.GetOptions{})
wf, err := c.getArgoWorkflowTemplate(namespace, name, "latest")
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
@@ -999,7 +999,7 @@ func (c *Client) SetWorkflowExecutionLabels(namespace, name, prefix string, keyV
// we delete all labels with that prefix and set the new ones
// e.g. prefix/my-label-key: my-label-value
func (c *Client) SetWorkflowTemplateLabels(namespace, name, prefix string, keyValues map[string]string, deleteOld bool) (workflowLabels map[string]string, err error) {
wf, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Get(name, metav1.GetOptions{})
wf, err := c.getArgoWorkflowTemplate(namespace, name, "latest")
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,

View File

@@ -2,7 +2,13 @@ package v1
import (
"database/sql"
"errors"
"fmt"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
argojson "github.com/argoproj/pkg/json"
"github.com/ghodss/yaml"
"github.com/google/uuid"
"github.com/onepanelio/core/pkg/util/number"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
@@ -14,11 +20,11 @@ import (
var sb = sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
// Deprecated
func (c *Client) insertWorkflowTemplateVersion(workflowTemplate *WorkflowTemplate, runner sq.BaseRunner) (err error) {
err = sb.Insert("workflow_template_versions").
SetMap(sq.Eq{
"workflow_template_id": workflowTemplate.ID,
"manifest": workflowTemplate.Manifest,
"version": int32(time.Now().Unix()),
"is_latest": workflowTemplate.IsLatest,
}).
@@ -54,24 +60,12 @@ func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *Work
return nil, err
}
argoWft := &v1alpha1.WorkflowTemplate{
ObjectMeta: v1.ObjectMeta{
Name: uid,
},
}
argoWft, err := createArgoWorkflowTemplate(workflowTemplate, "1")
argoWft, err = c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(argoWft)
if err != nil {
return nil, err
}
if err = c.insertWorkflowTemplateVersion(workflowTemplate, tx); err != nil {
if err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(argoWft.Name, &v1.DeleteOptions{}); err != nil {
log.Printf("Unable to delete argo workflow template")
}
return nil, err
}
if err = tx.Commit(); err != nil {
if err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Delete(argoWft.Name, &v1.DeleteOptions{}); err != nil {
log.Printf("Unable to delete argo workflow template")
@@ -145,9 +139,8 @@ func (c *Client) updateWorkflowTemplateVersion(workflowTemplate *WorkflowTemplat
}
func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived", "wtv.version", "wtv.is_latest").
From("workflow_template_versions wtv").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
sb := sb.Select("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived").
From("workflow_templates wt").
Where(sq.Eq{
"wt.namespace": namespace,
})
@@ -155,16 +148,12 @@ func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuild
return sb
}
// todo get version here?
// todo what is the version supposed to provide here? What information?
func (c *Client) getWorkflowTemplate(namespace, uid string, version int32) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate = &WorkflowTemplate{}
sb := c.workflowTemplatesSelectBuilder(namespace).Where(sq.Eq{"wt.uid": uid}).
Columns("wtv.manifest").
OrderBy("wtv.version desc").
Limit(1)
if version != 0 {
sb = sb.Where(sq.Eq{"wtv.version": version})
}
sb := c.workflowTemplatesSelectBuilder(namespace).Where(sq.Eq{"wt.uid": uid})
query, args, err := sb.ToSql()
if err != nil {
return
@@ -175,6 +164,26 @@ func (c *Client) getWorkflowTemplate(namespace, uid string, version int32) (work
workflowTemplate = nil
}
if workflowTemplate == nil {
return workflowTemplate, nil
}
versionAsString := "latest"
if version != 0 {
versionAsString = fmt.Sprintf("%v", version)
}
argoWft, err := c.getArgoWorkflowTemplate(namespace, uid, versionAsString)
if err != nil {
return nil, err
}
manifest, err := yaml.Marshal(argoWft)
if err != nil {
return nil, err
}
workflowTemplate.Manifest = string(manifest)
return
}
@@ -205,7 +214,6 @@ func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTe
workflowTemplateVersions = []*WorkflowTemplate{}
query, args, err := c.workflowTemplatesSelectBuilder(namespace).Where(sq.Eq{"wt.uid": uid}).
Columns("wtv.manifest").
OrderBy("wtv.version desc").ToSql()
if err != nil {
return
@@ -256,7 +264,12 @@ func (c *Client) archiveWorkflowTemplate(namespace, uid string) (bool, error) {
func (c *Client) CreateWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
// validate workflow template
if err := c.ValidateWorkflowExecution(namespace, workflowTemplate.GetManifestBytes()); err != nil {
finalBytes, err := WrapSpecInK8s(workflowTemplate.GetManifestBytes())
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
if err := c.ValidateWorkflowExecution(namespace, finalBytes); err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
@@ -265,7 +278,7 @@ func (c *Client) CreateWorkflowTemplate(namespace string, workflowTemplate *Work
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
workflowTemplate, err := c.createWorkflowTemplate(namespace, workflowTemplate)
workflowTemplate, err = c.createWorkflowTemplate(namespace, workflowTemplate)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
@@ -280,7 +293,12 @@ func (c *Client) CreateWorkflowTemplate(namespace string, workflowTemplate *Work
func (c *Client) CreateWorkflowTemplateVersion(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
// validate workflow template
if err := c.ValidateWorkflowExecution(namespace, workflowTemplate.GetManifestBytes()); err != nil {
finalBytes, err := WrapSpecInK8s(workflowTemplate.GetManifestBytes())
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
if err := c.ValidateWorkflowExecution(namespace, finalBytes); err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
@@ -289,28 +307,53 @@ func (c *Client) CreateWorkflowTemplateVersion(namespace string, workflowTemplat
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
if err := c.removeIsLatestFromWorkflowTemplateVersions(workflowTemplate); err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not remove IsLatest from workflow template versions.")
return nil, util.NewUserError(codes.Unknown, "Unable to Create Workflow Template Version.")
}
workflowTemplate, err := c.createWorkflowTemplateVersion(namespace, workflowTemplate)
latest, err := c.getArgoWorkflowTemplate(namespace, workflowTemplate.UID, "latest")
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not create workflow template version.")
return nil, util.NewUserErrorWrap(err, "Workflow template")
}).Error("Could not get latest argo workflow template")
return nil, err
}
if workflowTemplate == nil {
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
incrementedVersion, err := number.IncrementStringInt(latest.Labels["onepanel.io/version"])
delete(latest.Labels, "onepanel.io/version_latest")
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Update(latest); err != nil {
return nil, err
}
updatedTemplate, err := createArgoWorkflowTemplate(workflowTemplate, incrementedVersion)
if err != nil {
return nil, err
}
updatedTemplate.TypeMeta = v1.TypeMeta{}
updatedTemplate.ObjectMeta.ResourceVersion = ""
updatedTemplate.ObjectMeta.SetSelfLink("")
// todo - all the error messages
if _, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).Create(updatedTemplate); err != nil {
return nil, err
}
//
//workflowTemplate, err := c.createWorkflowTemplateVersion(namespace, workflowTemplate)
//if err != nil {
// log.WithFields(log.Fields{
// "Namespace": namespace,
// "WorkflowTemplate": workflowTemplate,
// "Error": err.Error(),
// }).Error("Could not create workflow template version.")
// return nil, util.NewUserErrorWrap(err, "Workflow template")
//}
//if workflowTemplate == nil {
// return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
//}
return workflowTemplate, nil
}
@@ -352,6 +395,7 @@ func (c *Client) UpdateWorkflowTemplateVersion(namespace string, workflowTemplat
return workflowTemplate, nil
}
// If version is 0, it returns the latest.
func (c *Client) GetWorkflowTemplate(namespace, uid string, version int32) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate, err = c.getWorkflowTemplate(namespace, uid, version)
if err != nil {
@@ -441,3 +485,57 @@ func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool,
return
}
func createArgoWorkflowTemplate(workflowTemplate *WorkflowTemplate, version string) (*v1alpha1.WorkflowTemplate, error) {
var argoWft *v1alpha1.WorkflowTemplate
var jsonOpts []argojson.JSONOpt
jsonOpts = append(jsonOpts, argojson.DisallowUnknownFields)
finalBytes, err := WrapSpecInK8s(workflowTemplate.GetManifestBytes())
if err != nil {
return nil, err
}
err = yaml.Unmarshal(finalBytes, &argoWft)
if err != nil {
return nil, err
}
newUuid, err := uuid.NewRandom()
if err != nil {
return nil, err
}
argoWft.Name = newUuid.String()
argoWft.Labels = map[string]string{
"onepanel.io/workflow_template": workflowTemplate.Name,
"onepanel.io/workflow_template_uid": workflowTemplate.UID,
"onepanel.io/version": version,
"onepanel.io/version_latest": "true",
}
return argoWft, nil
}
// version "latest" will get the latest version.
func (c *Client) getArgoWorkflowTemplate(namespace, workflowTemplateUid, version string) (*v1alpha1.WorkflowTemplate, error) {
labelSelect := fmt.Sprintf("onepanel.io/workflow_template_uid=%v", workflowTemplateUid)
if version == "latest" {
labelSelect += ",onepanel.io/version_latest=true"
} else {
labelSelect += fmt.Sprintf(",onepanel.io/version=%v", version)
}
workflowTemplates, err := c.ArgoprojV1alpha1().WorkflowTemplates(namespace).List(v1.ListOptions{
LabelSelector: labelSelect,
})
if err != nil {
return nil, err
}
templates := workflowTemplates.Items
if templates.Len() > 1 {
return nil, errors.New("not unique result")
}
return &templates[0], nil
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"github.com/onepanelio/core/pkg/util"
"google.golang.org/grpc/codes"
"gopkg.in/yaml.v2"
"math"
"sort"
"strings"
@@ -59,12 +60,22 @@ func apiWorkflowExecution(wf *v1.WorkflowExecution) (workflow *api.WorkflowExecu
}
func apiWorkflowTemplate(wft *v1.WorkflowTemplate) *api.WorkflowTemplate {
mapping := make(map[interface{}]interface{})
if err := yaml.Unmarshal([]byte(wft.Manifest), mapping); err != nil {
return nil
}
finalManifest, err := yaml.Marshal(mapping["spec"])
if err != nil {
return nil
}
return &api.WorkflowTemplate{
Uid: wft.UID,
CreatedAt: wft.CreatedAt.UTC().Format(time.RFC3339),
Name: wft.Name,
Version: wft.Version,
Manifest: wft.Manifest,
Manifest: string(finalManifest),
IsLatest: wft.IsLatest,
IsArchived: wft.IsArchived,
}