code cleanup + listworkflows (wip)

This commit is contained in:
rushtehrani
2019-12-15 18:47:58 -08:00
parent 9562999aab
commit e933edeff5
5 changed files with 69 additions and 20 deletions

View File

@@ -1,10 +1,16 @@
package argo
import (
"fmt"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/pkg/json"
"github.com/spf13/viper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
@@ -85,7 +91,7 @@ func (c *Client) create(wf *Workflow, opts *Options) (createdWorkflow *Workflow,
return
}
func (c *Client) Create(manifest []byte, opts *Options) (createdWorkflows []*Workflow, err error) {
func (c *Client) CreateWorkflow(manifest []byte, opts *Options) (createdWorkflows []*Workflow, err error) {
workflows, err := unmarshalWorkflows(manifest, true)
if err != nil {
return nil, err
@@ -102,8 +108,35 @@ func (c *Client) Create(manifest []byte, opts *Options) (createdWorkflows []*Wor
return
}
func (c *Client) Get(name string, opts *Options) (workflow *Workflow, err error) {
func (c *Client) GetWorkflow(name string, opts *Options) (workflow *Workflow, err error) {
workflow, err = c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).Get(name, v1.GetOptions{})
return
}
func (c *Client) ListWorkflows(workflowTemplateUID string, opts *Options) (workflows []*Workflow, err error) {
workflowList, err := c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).List(v1.ListOptions{
LabelSelector: fmt.Sprintf("%s/workflow-template-uid=%s", viper.GetString("k8s.labelKeyPrefix"), workflowTemplateUID),
})
if err != nil {
return
}
for _, item := range workflowList.Items {
workflows = append(workflows, &item)
}
return
}
func (c *Client) WatchWorkflow(name string, opts *Options) (watch watch.Interface, err error) {
fieldSelector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", name))
if err != nil {
return
}
watch, err = c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).Watch(metav1.ListOptions{
FieldSelector: fieldSelector.String(),
})
return
}

View File

@@ -166,7 +166,7 @@ func TestCreateOrResumeInstance(t *testing.T) {
Value: ptr.String("create"),
})
wf, err := c.Create(TestInstanceWorkflowManifest, options)
wf, err := c.CreateWorkflow(TestInstanceWorkflowManifest, options)
if err != nil {
t.Error(err)
return
@@ -183,7 +183,7 @@ func TestPauseInstance(t *testing.T) {
Value: ptr.String("delete"),
})
wf, err := c.Create(TestInstanceWorkflowManifest, options)
wf, err := c.CreateWorkflow(TestInstanceWorkflowManifest, options)
if err != nil {
t.Error(err)
return
@@ -203,7 +203,7 @@ func TestChangeInstanceMachineType(t *testing.T) {
Value: ptr.String("cpu-1-4"),
})
wf, err := c.Create(TestInstanceWorkflowManifest, options)
wf, err := c.CreateWorkflow(TestInstanceWorkflowManifest, options)
if err != nil {
t.Error(err)
return

View File

@@ -33,7 +33,7 @@ func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workf
}
(*opts.Labels)[viper.GetString("k8s.labelKeyPrefix")+"workflow-template-uid"] = workflowTemplate.UID
(*opts.Labels)[viper.GetString("k8s.labelKeyPrefix")+"workflow-template-version"] = fmt.Sprint(workflowTemplate.Version)
createdWorkflows, err := r.argClient.Create(workflowTemplate.GetManifestBytes(), opts)
createdWorkflows, err := r.argClient.CreateWorkflow(workflowTemplate.GetManifestBytes(), opts)
if err != nil {
return nil, err
}
@@ -48,7 +48,7 @@ func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workf
}
func (r *ResourceManager) GetWorkflow(namespace, name string) (workflow *model.Workflow, err error) {
wf, err := r.argClient.Get(name, &argo.Options{Namespace: namespace})
wf, err := r.argClient.GetWorkflow(name, &argo.Options{Namespace: namespace})
if err != nil {
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
@@ -82,6 +82,22 @@ func (r *ResourceManager) GetWorkflow(namespace, name string) (workflow *model.W
return
}
func (r *ResourceManager) ListWorkflows(namespace, workflowTemplateUID string) (workflows []*model.Workflow, err error) {
wfs, err := r.argClient.ListWorkflows(workflowTemplateUID, &argo.Options{Namespace: namespace})
if err != nil {
return nil, util.NewUserError(codes.NotFound, "Workflows not found.")
}
for _, wf := range wfs {
workflows = append(workflows, &model.Workflow{
Name: wf.ObjectMeta.Name,
UID: string(wf.ObjectMeta.UID),
})
}
return
}
func (r *ResourceManager) CreateWorkflowTemplate(namespace string, workflowTemplate *model.WorkflowTemplate) (*model.WorkflowTemplate, error) {
workflowTemplate, err := r.workflowRepository.CreateWorkflowTemplate(namespace, workflowTemplate)
if err != nil {

View File

@@ -61,7 +61,7 @@ func (r *WorkflowRepository) CreateWorkflowTemplate(namespace string, workflowTe
}
func (r *WorkflowRepository) workflowTemplatesSelectBuilder(namespace, uid string) sq.SelectBuilder {
sb := r.sb.Select("wt.uid", "wt.name", "wtv.version", "wtv.manifest").
sb := r.sb.Select("wt.uid", "wt.name", "wtv.version").
From("workflow_template_versions wtv").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{
@@ -76,7 +76,7 @@ func (r *WorkflowRepository) workflowTemplatesSelectBuilder(namespace, uid strin
func (r *WorkflowRepository) GetWorkflowTemplate(namespace, uid string, version int32) (workflowTemplate *model.WorkflowTemplate, err error) {
workflowTemplate = &model.WorkflowTemplate{}
sb := r.workflowTemplatesSelectBuilder(namespace, uid).Limit(1)
sb := r.workflowTemplatesSelectBuilder(namespace, uid).Columns("wtv.manifest").Limit(1)
if version != 0 {
sb = sb.Where(sq.Eq{"wtv.version": version})
}

View File

@@ -40,6 +40,15 @@ func apiWorkflow(wf *model.Workflow) (workflow *api.Workflow) {
return
}
func apiWorkflowTemplate(wft *model.WorkflowTemplate) *api.WorkflowTemplate {
return &api.WorkflowTemplate{
Uid: wft.UID,
Name: wft.Name,
Version: wft.Version,
Manifest: wft.Manifest,
}
}
func (s *WorkflowServer) CreateWorkflow(ctx context.Context, req *api.CreateWorkflowRequest) (*api.Workflow, error) {
workflow := &model.Workflow{
WorkflowTemplate: &model.WorkflowTemplate{
@@ -96,11 +105,7 @@ func (s *WorkflowServer) GetWorkflowTemplate(ctx context.Context, req *api.GetWo
return nil, userError.GRPCError()
}
return &api.WorkflowTemplate{
Uid: workflowTemplate.UID,
Version: workflowTemplate.Version,
Manifest: workflowTemplate.Manifest,
}, nil
return apiWorkflowTemplate(workflowTemplate), nil
}
func (s *WorkflowServer) ListWorkflowTemplateVersions(ctx context.Context, req *api.ListWorkflowTemplateVersionsRequest) (*api.ListWorkflowTemplateVersionsResponse, error) {
@@ -111,12 +116,7 @@ func (s *WorkflowServer) ListWorkflowTemplateVersions(ctx context.Context, req *
workflowTemplates := []*api.WorkflowTemplate{}
for _, wtv := range workflowTemplateVersions {
workflowTemplates = append(workflowTemplates, &api.WorkflowTemplate{
Uid: wtv.UID,
Name: wtv.Name,
Version: wtv.Version,
Manifest: wtv.Manifest,
})
workflowTemplates = append(workflowTemplates, apiWorkflowTemplate(wtv))
}
return &api.ListWorkflowTemplateVersionsResponse{