combine argo and kube clients

This commit is contained in:
rushtehrani
2020-01-08 11:01:02 -08:00
parent 32e8b51251
commit cec8b34bdf
8 changed files with 44 additions and 76 deletions

View File

@@ -1,29 +0,0 @@
package argo
import (
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type Client struct {
*wfclientset.Clientset
}
func NewClient(configPath ...string) *Client {
var (
err error
config *rest.Config
)
if len(configPath) == 0 {
config, err = rest.InClusterConfig()
} else {
config, err = clientcmd.BuildConfigFromFlags("", configPath[0])
}
if err != nil {
panic(err)
}
return &Client{Clientset: wfclientset.NewForConfigOrDie(config)}
}

View File

@@ -1,13 +1,18 @@
package kube
import (
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type ListOptions = v1.ListOptions
type Client struct {
*kubernetes.Clientset
argoprojV1alpha1 *argoprojv1alpha1.ArgoprojV1alpha1Client
}
func NewClient(configPath ...string) (client *Client) {
@@ -25,5 +30,5 @@ func NewClient(configPath ...string) (client *Client) {
panic(err)
}
return &Client{Clientset: kubernetes.NewForConfigOrDie(config)}
return &Client{Clientset: kubernetes.NewForConfigOrDie(config), argoprojV1alpha1: argoprojv1alpha1.NewForConfigOrDie(config)}
}

View File

@@ -1,4 +1,4 @@
package argo
package kube
import (
"fmt"
@@ -15,16 +15,14 @@ import (
type Workflow = wfv1.Workflow
type Parameter = wfv1.Parameter
type WorkflowParameter = wfv1.Parameter
type ListOptions = v1.ListOptions
type Options struct {
type WorkflowOptions struct {
Name string
Namespace string
GeneratedName string
Entrypoint string
Parameters []Parameter
Parameters []WorkflowParameter
ServiceAccount string
Labels *map[string]string
ListOptions *ListOptions
@@ -48,9 +46,9 @@ func unmarshalWorkflows(wfBytes []byte, strict bool) (wfs []Workflow, err error)
return
}
func (c *Client) create(wf *Workflow, opts *Options) (createdWorkflow *Workflow, err error) {
func (c *Client) create(wf *Workflow, opts *WorkflowOptions) (createdWorkflow *Workflow, err error) {
if opts == nil {
opts = &Options{}
opts = &WorkflowOptions{}
}
if opts.Name != "" {
wf.ObjectMeta.Name = opts.Name
@@ -85,7 +83,7 @@ func (c *Client) create(wf *Workflow, opts *Options) (createdWorkflow *Workflow,
wf.ObjectMeta.Labels = *opts.Labels
}
createdWorkflow, err = c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).Create(wf)
createdWorkflow, err = c.argoprojV1alpha1.Workflows(opts.Namespace).Create(wf)
if err != nil {
return nil, err
}
@@ -99,7 +97,7 @@ func (c *Client) ValidateWorkflow(manifest []byte) (err error) {
return
}
func (c *Client) CreateWorkflow(manifest []byte, opts *Options) (createdWorkflows []*Workflow, err error) {
func (c *Client) CreateWorkflow(manifest []byte, opts *WorkflowOptions) (createdWorkflows []*Workflow, err error) {
workflows, err := unmarshalWorkflows(manifest, true)
if err != nil {
return nil, err
@@ -116,17 +114,17 @@ func (c *Client) CreateWorkflow(manifest []byte, opts *Options) (createdWorkflow
return
}
func (c *Client) GetWorkflow(name string, opts *Options) (workflow *Workflow, err error) {
workflow, err = c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).Get(name, v1.GetOptions{})
func (c *Client) GetWorkflow(name string, opts *WorkflowOptions) (workflow *Workflow, err error) {
workflow, err = c.argoprojV1alpha1.Workflows(opts.Namespace).Get(name, v1.GetOptions{})
return
}
func (c *Client) ListWorkflows(opts *Options) (workflows []*Workflow, err error) {
func (c *Client) ListWorkflows(opts *WorkflowOptions) (workflows []*Workflow, err error) {
if opts.ListOptions == nil {
opts.ListOptions = &ListOptions{}
}
workflowList, err := c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).List(*opts.ListOptions)
workflowList, err := c.argoprojV1alpha1.Workflows(opts.Namespace).List(*opts.ListOptions)
if err != nil {
return
}
@@ -138,12 +136,12 @@ func (c *Client) ListWorkflows(opts *Options) (workflows []*Workflow, err error)
return
}
func (c *Client) WatchWorkflow(name string, opts *Options) (watcher watch.Interface, err error) {
func (c *Client) WatchWorkflow(name string, opts *WorkflowOptions) (watcher watch.Interface, err error) {
fieldSelector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", name))
if err != nil {
return
}
watcher, err = c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).Watch(metav1.ListOptions{
watcher, err = c.argoprojV1alpha1.Workflows(opts.Namespace).Watch(metav1.ListOptions{
FieldSelector: fieldSelector.String(),
})

View File

@@ -1,4 +1,4 @@
package argo
package kube
import (
"flag"
@@ -129,9 +129,9 @@ spec:
var (
namespace = flag.String("namespace", "default", "namespace of workflows")
options = &Options{
options = &WorkflowOptions{
Namespace: *namespace,
Parameters: []Parameter{
Parameters: []WorkflowParameter{
{
Name: "name",
Value: ptr.String("vscode"),
@@ -161,7 +161,7 @@ func TestUnmarshalWorkflows(t *testing.T) {
func TestCreateOrResumeInstance(t *testing.T) {
c := NewClient(os.Getenv("KUBECONFIG"))
options.Parameters = append(options.Parameters, Parameter{
options.Parameters = append(options.Parameters, WorkflowParameter{
Name: "action",
Value: ptr.String("create"),
})
@@ -178,7 +178,7 @@ func TestCreateOrResumeInstance(t *testing.T) {
func TestPauseInstance(t *testing.T) {
c := NewClient(os.Getenv("KUBECONFIG"))
options.Parameters = append(options.Parameters, Parameter{
options.Parameters = append(options.Parameters, WorkflowParameter{
Name: "action",
Value: ptr.String("delete"),
})
@@ -195,10 +195,10 @@ func TestPauseInstance(t *testing.T) {
func TestChangeInstanceMachineType(t *testing.T) {
c := NewClient(os.Getenv("KUBECONFIG"))
options.Parameters = append(options.Parameters, Parameter{
options.Parameters = append(options.Parameters, WorkflowParameter{
Name: "action",
Value: ptr.String("apply"),
}, Parameter{
}, WorkflowParameter{
Name: "machine-type",
Value: ptr.String("cpu-1-4"),
})

View File

@@ -11,7 +11,6 @@ import (
"github.com/gorilla/handlers"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/onepanelio/core/api"
"github.com/onepanelio/core/argo"
"github.com/onepanelio/core/kube"
"github.com/onepanelio/core/manager"
"github.com/onepanelio/core/repository"
@@ -38,11 +37,9 @@ func main() {
log.Fatalf("goose up: %v", err)
}
argoClient := argo.NewClient(viper.GetString("KUBECONFIG"))
kubeClient := kube.NewClient(viper.GetString("KUBECONFIG"))
go startRPCServer(db, argoClient, kubeClient)
go startRPCServer(db, kubeClient)
startHTTPProxy()
}
@@ -64,8 +61,8 @@ func initConfig() {
})
}
func startRPCServer(db *repository.DB, argoClient *argo.Client, kubeClient *kube.Client) {
resourceManager := manager.NewResourceManager(db, argoClient, kubeClient)
func startRPCServer(db *repository.DB, kubeClient *kube.Client) {
resourceManager := manager.NewResourceManager(db, kubeClient)
log.Printf("Starting RPC server on port %v", *rpcPort)
lis, err := net.Listen("tcp", *rpcPort)

View File

@@ -1,20 +1,17 @@
package manager
import (
"github.com/onepanelio/core/argo"
"github.com/onepanelio/core/kube"
"github.com/onepanelio/core/repository"
)
type ResourceManager struct {
argClient *argo.Client
kubeClient *kube.Client
workflowRepository *repository.WorkflowRepository
}
func NewResourceManager(db *repository.DB, argoClient *argo.Client, kubeClient *kube.Client) *ResourceManager {
func NewResourceManager(db *repository.DB, kubeClient *kube.Client) *ResourceManager {
return &ResourceManager{
argClient: argoClient,
kubeClient: kubeClient,
workflowRepository: repository.NewWorkflowRepository(db),
}

View File

@@ -6,7 +6,7 @@ import (
"strconv"
"time"
"github.com/onepanelio/core/argo"
"github.com/onepanelio/core/kube"
"github.com/onepanelio/core/model"
"github.com/onepanelio/core/util"
"github.com/spf13/viper"
@@ -20,11 +20,11 @@ func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workf
}
// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
opts := &argo.Options{
opts := &kube.WorkflowOptions{
Namespace: namespace,
}
for _, param := range workflow.Parameters {
opts.Parameters = append(opts.Parameters, argo.Parameter{
opts.Parameters = append(opts.Parameters, kube.WorkflowParameter{
Name: param.Name,
Value: param.Value,
})
@@ -34,7 +34,7 @@ func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workf
}
(*opts.Labels)[viper.GetString("k8s.labelKeyPrefix")+"workflow-template-uid"] = workflowTemplate.UID
(*opts.Labels)[viper.GetString("k8s.labelKeyPrefix")+"workflow-template-version"] = fmt.Sprint(workflowTemplate.Version)
createdWorkflows, err := r.argClient.CreateWorkflow(workflowTemplate.GetManifestBytes(), opts)
createdWorkflows, err := r.kubeClient.CreateWorkflow(workflowTemplate.GetManifestBytes(), opts)
if err != nil {
return nil, err
}
@@ -49,7 +49,7 @@ func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workf
}
func (r *ResourceManager) GetWorkflow(namespace, name string) (workflow *model.Workflow, err error) {
wf, err := r.argClient.GetWorkflow(name, &argo.Options{Namespace: namespace})
wf, err := r.kubeClient.GetWorkflow(name, &kube.WorkflowOptions{Namespace: namespace})
if err != nil {
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
@@ -89,19 +89,19 @@ func (r *ResourceManager) WatchWorkflow(namespace, name string) (<-chan *model.W
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
watcher, err := r.argClient.WatchWorkflow(name, &argo.Options{Namespace: namespace})
watcher, err := r.kubeClient.WatchWorkflow(name, &kube.WorkflowOptions{Namespace: namespace})
if err != nil {
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
}
var workflow *argo.Workflow
var workflow *kube.Workflow
workflowWatcher := make(chan *model.Workflow)
ticker := time.NewTicker(time.Second)
go func() {
for {
select {
case next := <-watcher.ResultChan():
workflow, _ = next.Object.(*argo.Workflow)
workflow, _ = next.Object.(*kube.Workflow)
case <-ticker.C:
}
@@ -131,15 +131,15 @@ func (r *ResourceManager) WatchWorkflow(namespace, name string) (<-chan *model.W
}
func (r *ResourceManager) ListWorkflows(namespace, workflowTemplateUID string) (workflows []*model.Workflow, err error) {
opts := &argo.Options{
opts := &kube.WorkflowOptions{
Namespace: namespace,
}
if workflowTemplateUID != "" {
opts.ListOptions = &argo.ListOptions{
opts.ListOptions = &kube.ListOptions{
LabelSelector: fmt.Sprintf("%sworkflow-template-uid=%s", viper.GetString("k8s.labelKeyPrefix"), workflowTemplateUID),
}
}
wfs, err := r.argClient.ListWorkflows(opts)
wfs, err := r.kubeClient.ListWorkflows(opts)
if err != nil {
return nil, util.NewUserError(codes.NotFound, "Workflows not found.")
}
@@ -156,7 +156,7 @@ func (r *ResourceManager) ListWorkflows(namespace, workflowTemplateUID string) (
func (r *ResourceManager) CreateWorkflowTemplate(namespace string, workflowTemplate *model.WorkflowTemplate) (*model.WorkflowTemplate, error) {
// validate workflow template
if err := r.argClient.ValidateWorkflow(workflowTemplate.GetManifestBytes()); err != nil {
if err := r.kubeClient.ValidateWorkflow(workflowTemplate.GetManifestBytes()); err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
@@ -170,7 +170,7 @@ func (r *ResourceManager) CreateWorkflowTemplate(namespace string, workflowTempl
func (r *ResourceManager) CreateWorkflowTemplateVersion(namespace string, workflowTemplate *model.WorkflowTemplate) (*model.WorkflowTemplate, error) {
// validate workflow template
if err := r.argClient.ValidateWorkflow(workflowTemplate.GetManifestBytes()); err != nil {
if err := r.kubeClient.ValidateWorkflow(workflowTemplate.GetManifestBytes()); err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}