make argo.Client a parameter

This commit is contained in:
rushtehrani
2019-12-10 19:32:58 -08:00
parent b421c63073
commit b83fe89f6e
6 changed files with 32 additions and 37 deletions

View File

@@ -2,28 +2,31 @@ package argo
import ( import (
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
type Client struct { type Client struct {
v1alpha1.WorkflowInterface wfclientset.Clientset
} }
func NewClient(namespace string, configPath ...string) (client *Client, err error) { func NewClient(configPath ...string) (client *Client) {
var config *rest.Config var (
err error
config *rest.Config
)
if len(configPath) == 0 { if len(configPath) == 0 {
config, err = rest.InClusterConfig() config, err = rest.InClusterConfig()
} else { } else {
config, err = clientcmd.BuildConfigFromFlags("", configPath[0]) config, err = clientcmd.BuildConfigFromFlags("", configPath[0])
} }
if err != nil { if err != nil {
return panic(err)
} }
wfclient := wfclientset.NewForConfigOrDie(config).ArgoprojV1alpha1().Workflows(namespace) wfclient := wfclientset.NewForConfigOrDie(config)
client = &Client{WorkflowInterface: wfclient} client = &Client{Clientset: *wfclient}
return return
} }

View File

@@ -13,6 +13,7 @@ type Parameter = wfv1.Parameter
type Options struct { type Options struct {
Name string Name string
Namespace string
GeneratedName string GeneratedName string
Entrypoint string Entrypoint string
Parameters []Parameter Parameters []Parameter
@@ -72,7 +73,7 @@ func (c *Client) create(wf *Workflow, opts *Options) (createdWorkflow *Workflow,
wf.Spec.Arguments.Parameters = newParams wf.Spec.Arguments.Parameters = newParams
} }
createdWorkflow, err = c.WorkflowInterface.Create(wf) createdWorkflow, err = c.Clientset.ArgoprojV1alpha1().Workflows(opts.Namespace).Create(wf)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -130,6 +130,7 @@ spec:
var ( var (
namespace = flag.String("namespace", "default", "namespace of workflows") namespace = flag.String("namespace", "default", "namespace of workflows")
options = &Options{ options = &Options{
Namespace: *namespace,
Parameters: []Parameter{ Parameters: []Parameter{
{ {
Name: "name", Name: "name",
@@ -158,11 +159,7 @@ func TestUnmarshalWorkflows(t *testing.T) {
} }
func TestCreateOrResumeInstance(t *testing.T) { func TestCreateOrResumeInstance(t *testing.T) {
c, err := NewClient(*namespace, os.Getenv("KUBECONFIG")) c := NewClient(os.Getenv("KUBECONFIG"))
if err != nil {
t.Error(err)
return
}
options.Parameters = append(options.Parameters, Parameter{ options.Parameters = append(options.Parameters, Parameter{
Name: "action", Name: "action",
@@ -179,11 +176,7 @@ func TestCreateOrResumeInstance(t *testing.T) {
} }
func TestPauseInstance(t *testing.T) { func TestPauseInstance(t *testing.T) {
c, err := NewClient(*namespace, os.Getenv("KUBECONFIG")) c := NewClient(os.Getenv("KUBECONFIG"))
if err != nil {
t.Error(err)
return
}
options.Parameters = append(options.Parameters, Parameter{ options.Parameters = append(options.Parameters, Parameter{
Name: "action", Name: "action",
@@ -200,11 +193,7 @@ func TestPauseInstance(t *testing.T) {
} }
func TestChangeInstanceMachineType(t *testing.T) { func TestChangeInstanceMachineType(t *testing.T) {
c, err := NewClient(*namespace, os.Getenv("KUBECONFIG")) c := NewClient(os.Getenv("KUBECONFIG"))
if err != nil {
t.Error(err)
return
}
options.Parameters = append(options.Parameters, Parameter{ options.Parameters = append(options.Parameters, Parameter{
Name: "action", Name: "action",

17
main.go
View File

@@ -10,6 +10,7 @@ import (
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/onepanelio/core/api" "github.com/onepanelio/core/api"
"github.com/onepanelio/core/argo"
"github.com/onepanelio/core/manager" "github.com/onepanelio/core/manager"
"github.com/onepanelio/core/repository" "github.com/onepanelio/core/repository"
"github.com/onepanelio/core/server" "github.com/onepanelio/core/server"
@@ -30,14 +31,14 @@ func main() {
initConfig() initConfig()
db := repository.NewDB(viper.GetString("db.driverName"), viper.GetString("DB_DATASOURCE")) db := repository.NewDB(viper.GetString("db.driverName"), viper.GetString("DB_DATASOURCE"))
log.Print("Connected to database") if err := goose.Run("status", db.BaseConnection(), "db"); err != nil {
if err := goose.Run("up", db.BaseConnection(), "db"); err != nil {
log.Fatalf("goose up: %v", err) log.Fatalf("goose up: %v", err)
} }
go startRPCServer(db) argoClient := argo.NewClient(viper.GetString("KUBECONFIG"))
startHTTPServer()
go startRPCServer(db, argoClient)
startHTTPProxy()
} }
func initConfig() { func initConfig() {
@@ -58,8 +59,8 @@ func initConfig() {
}) })
} }
func startRPCServer(db *repository.DB) { func startRPCServer(db *repository.DB, argoClient *argo.Client) {
resourceManager := manager.NewResourceManager(db) resourceManager := manager.NewResourceManager(db, argoClient)
log.Print("Starting RPC server") log.Print("Starting RPC server")
lis, err := net.Listen("tcp", *rpcPort) lis, err := net.Listen("tcp", *rpcPort)
@@ -76,7 +77,7 @@ func startRPCServer(db *repository.DB) {
log.Print("RPC server started") log.Print("RPC server started")
} }
func startHTTPServer() { func startHTTPProxy() {
endpoint := "localhost" + *rpcPort endpoint := "localhost" + *rpcPort
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)

View File

@@ -6,12 +6,13 @@ import (
) )
type ResourceManager struct { type ResourceManager struct {
workflowClient *argo.Client argClient *argo.Client
workflowRepository *repository.WorkflowRepository workflowRepository *repository.WorkflowRepository
} }
func NewResourceManager(db *repository.DB) *ResourceManager { func NewResourceManager(db *repository.DB, argoClient *argo.Client) *ResourceManager {
return &ResourceManager{ return &ResourceManager{
argClient: argoClient,
workflowRepository: repository.NewWorkflowRepository(db), workflowRepository: repository.NewWorkflowRepository(db),
} }
} }

View File

@@ -3,16 +3,16 @@ package manager
import ( import (
"github.com/onepanelio/core/argo" "github.com/onepanelio/core/argo"
"github.com/onepanelio/core/model" "github.com/onepanelio/core/model"
"github.com/spf13/viper"
) )
func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workflow) (createdWorkflow *model.Workflow, err error) { func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workflow) (createdWorkflow *model.Workflow, err error) {
r.workflowClient, err = argo.NewClient(namespace, viper.GetString("KUBECONFIG"))
if err != nil { if err != nil {
return return
} }
opts := &argo.Options{} opts := &argo.Options{
Namespace: namespace,
}
for _, param := range workflow.Parameters { for _, param := range workflow.Parameters {
opts.Parameters = append(opts.Parameters, argo.Parameter{ opts.Parameters = append(opts.Parameters, argo.Parameter{
Name: param.Name, Name: param.Name,
@@ -20,7 +20,7 @@ func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workf
}) })
} }
createdWorkflows, err := r.workflowClient.Create(workflow.WorkflowTemplate.GetManifest(), opts) createdWorkflows, err := r.argClient.Create(workflow.WorkflowTemplate.GetManifest(), opts)
if err != nil { if err != nil {
return return
} }