move reusable components to pkg

This commit is contained in:
rushtehrani
2020-02-18 21:51:04 -08:00
parent 42f0ff173f
commit 2e0ae10d56
19 changed files with 2070 additions and 112 deletions

View File

@@ -646,7 +646,7 @@ func (m *GetSecretRequest) GetName() string {
type Secret struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Data map[string]string `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Data map[string][]byte `protobuf:"bytes,2,rep,name=data,proto3" json:"data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@@ -684,7 +684,7 @@ func (m *Secret) GetName() string {
return ""
}
func (m *Secret) GetData() map[string]string {
func (m *Secret) GetData() map[string][]byte {
if m != nil {
return m.Data
}
@@ -707,12 +707,13 @@ func init() {
proto.RegisterType((*CreateSecretRequest)(nil), "api.CreateSecretRequest")
proto.RegisterType((*GetSecretRequest)(nil), "api.GetSecretRequest")
proto.RegisterType((*Secret)(nil), "api.Secret")
proto.RegisterMapType((map[string]string)(nil), "api.Secret.DataEntry")
proto.RegisterMapType((map[string][]byte)(nil), "api.Secret.DataEntry")
}
func init() { proto.RegisterFile("secret.proto", fileDescriptor_6acf428160d7a216) }
var fileDescriptor_6acf428160d7a216 = []byte{
<<<<<<< HEAD
// 713 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcd, 0x4e, 0xdb, 0x4a,
0x14, 0x96, 0x13, 0x08, 0xe4, 0x90, 0xab, 0xcb, 0x1d, 0x72, 0xc1, 0x98, 0x5c, 0x14, 0x06, 0x5d,
@@ -759,6 +760,52 @@ var fileDescriptor_6acf428160d7a216 = []byte{
0xbf, 0x19, 0x50, 0xcd, 0x7a, 0x73, 0x48, 0x5d, 0x32, 0xdf, 0xf1, 0xe0, 0x59, 0x2b, 0x77, 0xfc,
0x31, 0x2a, 0xaf, 0xf9, 0x50, 0x79, 0xa7, 0x25, 0x79, 0x9d, 0xb7, 0x7e, 0x06, 0x00, 0x00, 0xff,
0xff, 0x9d, 0x7b, 0x21, 0x46, 0xdb, 0x08, 0x00, 0x00,
=======
// 678 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0x5f, 0x4f, 0xd4, 0x40,
0x10, 0x4f, 0x0f, 0x38, 0xb8, 0xe1, 0x8c, 0xb8, 0x9c, 0x50, 0xca, 0x69, 0x8e, 0x25, 0x26, 0x80,
0x49, 0xcb, 0x1d, 0x0f, 0x20, 0x2f, 0xc6, 0x08, 0xe1, 0x01, 0x9f, 0x4a, 0x34, 0x26, 0xc6, 0xc4,
0xe5, 0x6e, 0x24, 0x0d, 0xd0, 0xd6, 0xee, 0x96, 0x78, 0x51, 0x12, 0x63, 0xe2, 0x27, 0xc0, 0x6f,
0xe6, 0x57, 0xf0, 0x73, 0x18, 0xd3, 0xdd, 0x6d, 0xaf, 0xc7, 0x15, 0x28, 0xfe, 0x79, 0xdb, 0xd9,
0xdf, 0xee, 0xfc, 0x66, 0x7e, 0x33, 0x3b, 0x0b, 0x75, 0x8e, 0xdd, 0x08, 0x85, 0x1d, 0x46, 0x81,
0x08, 0xc8, 0x18, 0x0b, 0x3d, 0x6b, 0xf1, 0x28, 0x08, 0x8e, 0x4e, 0xd0, 0x91, 0x5b, 0x87, 0xf1,
0x7b, 0x07, 0x4f, 0x43, 0xd1, 0x57, 0x27, 0xac, 0xa6, 0x06, 0x59, 0xe8, 0x39, 0xcc, 0xf7, 0x03,
0xc1, 0x84, 0x17, 0xf8, 0x5c, 0xa1, 0xf4, 0x2d, 0x98, 0xcf, 0x7a, 0xbd, 0x03, 0xe9, 0x72, 0x1f,
0xfb, 0xaf, 0xd8, 0x49, 0x8c, 0x2e, 0x7e, 0x88, 0x91, 0x0b, 0xd2, 0x84, 0x9a, 0xcf, 0x4e, 0x91,
0x87, 0xac, 0x8b, 0xa6, 0xd1, 0x32, 0x56, 0x6a, 0xee, 0x60, 0x83, 0x2c, 0x43, 0x55, 0x45, 0x62,
0x56, 0x5a, 0xc6, 0xca, 0x74, 0x67, 0xda, 0x66, 0xa1, 0x67, 0x2b, 0x4f, 0xae, 0x86, 0xe8, 0x26,
0x2c, 0x14, 0xb8, 0xe7, 0x61, 0xe0, 0x73, 0x24, 0x16, 0x4c, 0x79, 0x3e, 0xc7, 0x48, 0x60, 0x4f,
0xba, 0x9f, 0x72, 0x33, 0x9b, 0xee, 0xc1, 0xac, 0xba, 0xb5, 0xfb, 0xd1, 0xe3, 0x82, 0x97, 0x0b,
0x89, 0xc0, 0x78, 0x62, 0xc8, 0x80, 0x6a, 0xae, 0x5c, 0x53, 0x1b, 0x1a, 0xc3, 0x8e, 0x34, 0xf9,
0x1c, 0x54, 0x51, 0xee, 0x68, 0x6a, 0x6d, 0xd1, 0x77, 0xb0, 0xf8, 0x32, 0xec, 0x31, 0x81, 0xff,
0x4d, 0x93, 0x2d, 0x68, 0x16, 0x33, 0xe8, 0xc8, 0x4c, 0x98, 0x8c, 0x25, 0x9e, 0xaa, 0x92, 0x9a,
0x89, 0x28, 0x3b, 0x78, 0x82, 0xe9, 0xcd, 0x3f, 0x17, 0xe5, 0x0d, 0xcc, 0xe5, 0x1d, 0xed, 0x63,
0xff, 0x1f, 0xe6, 0xb7, 0x01, 0xf3, 0x23, 0xce, 0x07, 0xa9, 0xf5, 0x24, 0x94, 0xa5, 0xa6, 0x4d,
0xba, 0x0e, 0x8d, 0xe1, 0xd4, 0x6e, 0xbc, 0xd1, 0x01, 0xf2, 0xc2, 0xe3, 0x42, 0x9d, 0x2f, 0xd7,
0x20, 0xd4, 0x85, 0xd9, 0xa1, 0x3b, 0x9a, 0xa4, 0x01, 0x13, 0xdd, 0x20, 0xf6, 0x85, 0xbc, 0x30,
0xe1, 0x2a, 0x83, 0x3c, 0x82, 0x49, 0x95, 0x11, 0x37, 0x2b, 0xad, 0xb1, 0xcb, 0xd9, 0xa6, 0x18,
0x7d, 0x0d, 0xb3, 0xcf, 0x23, 0x64, 0xb7, 0x2b, 0x4a, 0x29, 0x21, 0x77, 0x60, 0x66, 0x0f, 0xc5,
0xdf, 0xd6, 0xfa, 0x8b, 0x01, 0x55, 0xe5, 0x23, 0x83, 0x8d, 0x01, 0x4c, 0x56, 0x61, 0xbc, 0xc7,
0x04, 0xd3, 0x29, 0xde, 0xcf, 0xc5, 0x61, 0xef, 0x30, 0xc1, 0x76, 0x7d, 0x11, 0xf5, 0x5d, 0x79,
0xc4, 0xda, 0x84, 0x5a, 0xb6, 0x45, 0x66, 0x60, 0xec, 0x18, 0xfb, 0xda, 0x55, 0xb2, 0x4c, 0x54,
0x3c, 0x4b, 0x1a, 0x59, 0xb2, 0xd7, 0x5d, 0x65, 0x6c, 0x57, 0xb6, 0x8c, 0xce, 0xaf, 0x49, 0xb8,
0xa3, 0x7c, 0x1e, 0x60, 0x74, 0xe6, 0x75, 0x91, 0x70, 0xa8, 0xe7, 0x45, 0x23, 0xa6, 0xe4, 0x2d,
0xd0, 0xd1, 0x9a, 0xb3, 0xd5, 0xfc, 0xb2, 0xd3, 0xe1, 0x66, 0xef, 0x26, 0xc3, 0x8d, 0xb6, 0xbf,
0xfe, 0xf8, 0x79, 0x51, 0x79, 0x4c, 0x97, 0x92, 0xc1, 0xc6, 0x9d, 0xb3, 0xf6, 0x21, 0x0a, 0xd6,
0x76, 0x3e, 0x65, 0x62, 0x9c, 0x3b, 0xba, 0x42, 0xdb, 0x5a, 0x4f, 0xf2, 0x19, 0xea, 0xf9, 0x51,
0xa0, 0x49, 0x0b, 0xc6, 0x8c, 0xb5, 0x50, 0x80, 0xa8, 0x5e, 0xa1, 0x9b, 0x92, 0xb7, 0x4d, 0x9c,
0x1b, 0x79, 0xd5, 0xde, 0xb9, 0xa3, 0x06, 0x0b, 0x61, 0x50, 0xcb, 0xaa, 0x49, 0x94, 0xce, 0x97,
0xab, 0x6b, 0xe5, 0xdb, 0x80, 0xae, 0x4b, 0xa6, 0x35, 0xb2, 0x52, 0x96, 0x89, 0x1c, 0xc3, 0x74,
0xae, 0xbd, 0xc9, 0xbc, 0xf4, 0x36, 0xfa, 0x48, 0x2c, 0x73, 0x14, 0xd0, 0xd9, 0xad, 0x4a, 0xce,
0x65, 0x72, 0xb3, 0xaa, 0x24, 0x86, 0x7a, 0xfe, 0xc5, 0x6a, 0x35, 0x0b, 0xe6, 0x93, 0x56, 0xb3,
0xe8, 0x79, 0xa7, 0x39, 0xae, 0x95, 0xcf, 0xf1, 0x9b, 0x01, 0x77, 0x2f, 0x8d, 0x17, 0xb2, 0x38,
0x42, 0x30, 0x98, 0x68, 0x56, 0xb3, 0x18, 0x1c, 0x2e, 0xe7, 0x5a, 0x99, 0x72, 0xea, 0x8f, 0x57,
0xc5, 0x71, 0x61, 0xc0, 0xbd, 0x91, 0xaf, 0x8d, 0x3c, 0x90, 0x64, 0x57, 0xfd, 0xa8, 0xd6, 0xc3,
0xab, 0x60, 0x1d, 0xcd, 0x53, 0x19, 0xcd, 0x13, 0x7a, 0xdb, 0x68, 0xb2, 0x16, 0xff, 0x6e, 0x40,
0xa3, 0xe8, 0x73, 0x21, 0x2d, 0xc9, 0x7c, 0xcd, 0xcf, 0x66, 0x2d, 0x5d, 0x73, 0x62, 0x58, 0xac,
0xce, 0x6d, 0xc3, 0x3b, 0xac, 0xca, 0xc7, 0xbb, 0xf1, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x4f, 0xc8,
0xba, 0xde, 0xbc, 0x08, 0x00, 0x00,
>>>>>>> move reusable components to pkg
}
// Reference imports to suppress errors if they are not otherwise used.

View File

@@ -127,6 +127,6 @@ message GetSecretRequest {
message Secret {
string name = 1;
map<string, string> data = 2;
map<string, bytes> data = 2;
}

View File

@@ -316,7 +316,8 @@
"data": {
"type": "object",
"additionalProperties": {
"type": "string"
"type": "string",
"format": "byte"
}
}
}

39
main.go
View File

@@ -14,10 +14,9 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/jmoiron/sqlx"
"github.com/onepanelio/core/api"
"github.com/onepanelio/core/kube"
"github.com/onepanelio/core/manager"
"github.com/onepanelio/core/repository"
v1 "github.com/onepanelio/core/pkg"
"github.com/onepanelio/core/server"
"github.com/pressly/goose"
log "github.com/sirupsen/logrus"
@@ -34,20 +33,18 @@ var (
func main() {
flag.Parse()
db := repository.NewDB(os.Getenv("DB_DRIVER_NAME"), os.Getenv("DB_DATASOURCE_NAME"))
if err := goose.Run("up", db.Base(), "db"); err != nil {
db := sqlx.MustConnect(os.Getenv("DB_DRIVER_NAME"), os.Getenv("DB_DATASOURCE_NAME"))
if err := goose.Run("up", db.DB, "db"); err != nil {
log.Fatalf("goose up: %v", err)
}
kubeConfig := kube.NewConfig()
kubeConfig := v1.NewConfig()
go startRPCServer(db, kubeConfig)
startHTTPProxy()
}
func startRPCServer(db *repository.DB, kubeConfig *kube.Config) {
resourceManager := manager.NewResourceManager(db, kubeConfig)
func startRPCServer(db *v1.DB, kubeConfig *v1.Config) {
log.Printf("Starting RPC server on port %v", *rpcPort)
lis, err := net.Listen("tcp", *rpcPort)
if err != nil {
@@ -60,11 +57,13 @@ func startRPCServer(db *repository.DB, kubeConfig *kube.Config) {
grpc_recovery.WithRecoveryHandler(recoveryFunc),
}
s := grpc.NewServer(grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(loggingInterceptor,
grpc_recovery.UnaryServerInterceptor(opts...))))
api.RegisterWorkflowServiceServer(s, server.NewWorkflowServer(resourceManager))
api.RegisterSecretServiceServer(s, server.NewSecretServer(kubeConfig))
api.RegisterNamespaceServiceServer(s, server.NewNamespaceServer(kubeConfig))
grpc_middleware.ChainUnaryServer(authInterceptor(kubeConfig, db),
loggingInterceptor,
grpc_recovery.UnaryServerInterceptor(opts...)),
))
api.RegisterWorkflowServiceServer(s, server.NewWorkflowServer())
api.RegisterSecretServiceServer(s, server.NewSecretServer())
api.RegisterNamespaceServiceServer(s, server.NewNamespaceServer())
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve RPC server: %v", err)
@@ -129,3 +128,15 @@ func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnarySe
}).Info("handler finished")
return
}
func authInterceptor(kubeConfig *v1.Config, db *v1.DB) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
kubeConfig.BearerToken = ""
client, err := v1.NewClient(kubeConfig, db)
if err != nil {
return
}
return handler(context.WithValue(ctx, "kubeClient", client), req)
}
}

26
pkg/authorization.go Normal file
View File

@@ -0,0 +1,26 @@
package v1
import (
authorizationv1 "k8s.io/api/authorization/v1"
)
func (c *Client) IsAuthorized(namespace, verb, group, resource, name string) (allowed bool, err error) {
review, err := c.AuthorizationV1().SelfSubjectAccessReviews().Create(&authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: group,
Resource: resource,
Name: name,
},
},
})
if err != nil {
allowed = false
return
}
allowed = review.Status.Allowed
return
}

124
pkg/client.go Normal file
View File

@@ -0,0 +1,124 @@
package v1
import (
"encoding/base64"
"strconv"
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/jmoiron/sqlx"
"github.com/onepanelio/core/s3"
"github.com/onepanelio/core/util/logging"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
artifactRepositoryEndpointKey = "artifactRepositoryEndpoint"
artifactRepositoryBucketKey = "artifactRepositoryBucket"
artifactRepositoryRegionKey = "artifactRepositoryRegion"
artifactRepositoryInSecureKey = "artifactRepositoryInsecure"
artifactRepositoryAccessKeyValueKey = "artifactRepositoryAccessKey"
artifactRepositorySecretKeyValueKey = "artifactRepositorySecretKey"
)
type Config = rest.Config
type DB = sqlx.DB
type Client struct {
kubernetes.Interface
argoprojV1alpha1 argoprojv1alpha1.ArgoprojV1alpha1Interface
*DB
}
func (c *Client) ArgoprojV1alpha1() argoprojv1alpha1.ArgoprojV1alpha1Interface {
return c.argoprojV1alpha1
}
func NewConfig() (config *Config) {
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
panic(err)
}
return
}
func NewClient(config *Config, db *sqlx.DB) (client *Client, err error) {
config.BearerTokenFile = ""
config.Username = ""
config.Password = ""
config.CertData = nil
config.CertFile = ""
kubeConfig, err := kubernetes.NewForConfig(config)
if err != nil {
return
}
argoConfig, err := argoprojv1alpha1.NewForConfig(config)
if err != nil {
return
}
return &Client{Interface: kubeConfig, argoprojV1alpha1: argoConfig, DB: db}, nil
}
func (c *Client) getNamespaceConfig(namespace string) (config map[string]string, err error) {
configMap, err := c.GetConfigMap(namespace, "onepanel")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("getNamespaceConfig failed getting config map.")
return
}
config = configMap.Data
secret, err := c.GetSecret(namespace, "onepanel")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("getNamespaceConfig failed getting secret.")
return
}
accessKey, _ := base64.StdEncoding.DecodeString(string(secret.Data[artifactRepositoryAccessKeyValueKey]))
config[artifactRepositoryAccessKeyValueKey] = string(accessKey)
secretKey, _ := base64.StdEncoding.DecodeString(string(secret.Data[artifactRepositorySecretKeyValueKey]))
config[artifactRepositorySecretKeyValueKey] = string(secretKey)
return
}
func (c *Client) getS3Client(namespace string, config map[string]string) (s3Client *s3.Client, err error) {
insecure, err := strconv.ParseBool(config[artifactRepositoryInSecureKey])
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"ConfigMap": config,
"Error": err.Error(),
}).Error("getS3Client failed when parsing bool.")
return
}
s3Client, err = s3.NewClient(s3.Config{
Endpoint: config[artifactRepositoryEndpointKey],
Region: config[artifactRepositoryRegionKey],
AccessKey: config[artifactRepositoryAccessKeyValueKey],
SecretKey: config[artifactRepositorySecretKeyValueKey],
InSecure: insecure,
})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"ConfigMap": config,
"Error": err.Error(),
}).Error("getS3Client failed when initializing a new S3 client.")
return
}
return
}

7
pkg/client_test.go Normal file
View File

@@ -0,0 +1,7 @@
package v1
import "k8s.io/client-go/kubernetes/fake"
func NewTestClient() (client *Client) {
return &Client{Interface: fake.NewSimpleClientset()}
}

33
pkg/config_map.go Normal file
View File

@@ -0,0 +1,33 @@
package v1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func (c *Client) CreateConfigMap(namespace string, configMap *ConfigMap) (err error) {
_, err = c.CoreV1().ConfigMaps(namespace).Create(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMap.Name,
},
Data: configMap.Data,
})
if err != nil {
return
}
return
}
func (c *Client) GetConfigMap(namespace, name string) (configMap *ConfigMap, err error) {
cm, err := c.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return
}
configMap = &ConfigMap{
Name: name,
Data: cm.Data,
}
return
}

31
pkg/config_map_test.go Normal file
View File

@@ -0,0 +1,31 @@
package v1
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCreateConfigMap(t *testing.T) {
c := NewTestClient()
err := c.CreateConfigMap("namespace", &ConfigMap{
Name: "name",
})
assert.Nil(t, err)
}
func TestGetConfigMap(t *testing.T) {
c := NewTestClient()
err := c.CreateConfigMap("namespace", &ConfigMap{
Name: "name",
})
assert.Nil(t, err)
s, err := c.GetConfigMap("namespace", "name")
assert.Nil(t, err)
assert.NotNil(t, s)
assert.Equal(t, s.Name, "name")
}

27
pkg/namespace.go Normal file
View File

@@ -0,0 +1,27 @@
package v1
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var onepanelEnabledLabelKey = "onepanel.io/enabled"
func (c *Client) ListNamespaces() (namespaces []*Namespace, err error) {
namespaceList, err := c.CoreV1().Namespaces().List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", onepanelEnabledLabelKey, "true"),
})
if err != nil {
return
}
for _, ns := range namespaceList.Items {
namespaces = append(namespaces, &Namespace{
Name: ns.Name,
Labels: ns.Labels,
})
}
return
}

33
pkg/namespace_test.go Normal file
View File

@@ -0,0 +1,33 @@
package v1
import (
"strconv"
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func testCreateNamespace(c *Client) {
for i := 0; i < 5; i++ {
c.CoreV1().Namespaces().Create(&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "namespace-" + strconv.Itoa(i),
Labels: map[string]string{
"onepanel.io/enabled": "true",
},
},
})
}
}
func TestListNamespace(t *testing.T) {
c := NewTestClient()
testCreateNamespace(c)
n, err := c.ListNamespaces()
assert.Nil(t, err)
assert.NotEmpty(t, n)
assert.Equal(t, len(n), 5)
}

347
pkg/secret.go Normal file
View File

@@ -0,0 +1,347 @@
package v1
import (
"encoding/base64"
"encoding/json"
goerrors "errors"
"github.com/onepanelio/core/util"
"github.com/onepanelio/core/util/logging"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func (c *Client) CreateSecret(namespace string, secret *Secret) (err error) {
_, err = c.CoreV1().Secrets(namespace).Create(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secret.Name,
},
Data: secret.Data,
})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Error creating secret.")
return util.NewUserError(codes.Unknown, "Secret was not created.")
}
return
}
func (c *Client) SecretExists(namespace string, name string) (exists bool, err error) {
foundSecret, err := c.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Secret Exists error.")
var statusError *errors.StatusError
if goerrors.As(err, &statusError) {
if statusError.ErrStatus.Reason == "NotFound" {
return false, util.NewUserError(codes.NotFound, "Secret Not Found.")
}
return false, util.NewUserError(codes.Unknown, "Error when checking existence of secret.")
}
return false, util.NewUserError(codes.Unknown, "Error when checking existence of secret.")
}
if foundSecret.Name == "" {
return false, nil
}
return true, nil
}
func (c *Client) GetSecret(namespace, name string) (secret *Secret, err error) {
s, err := c.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Secret not found error.")
var statusError *errors.StatusError
if goerrors.As(err, &statusError) {
if statusError.ErrStatus.Reason == "NotFound" {
return nil, util.NewUserError(codes.NotFound, "Secret Not Found.")
}
return nil, util.NewUserError(codes.Unknown, "Error when getting secret.")
}
return nil, util.NewUserError(codes.Unknown, "Error when getting secret.")
}
if s == nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": "Secret is nil.",
}).Error("Error getting secret.")
return nil, util.NewUserError(codes.Unknown, "Error when getting secret.")
}
secret = &Secret{
Name: s.Name,
Data: s.Data,
}
return
}
func (c *Client) ListSecrets(namespace string) (secrets []*Secret, err error) {
secretsList, err := c.CoreV1().Secrets(namespace).List(metav1.ListOptions{})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("No secrets were found.")
return nil, util.NewUserError(codes.NotFound, "No secrets were found.")
}
for _, s := range secretsList.Items {
secret := Secret{
Name: s.Name,
Data: s.Data,
}
secrets = append(secrets, &secret)
}
return
}
func (c *Client) DeleteSecret(namespace string, name string) (deleted bool, err error) {
err = c.CoreV1().Secrets(namespace).Delete(name, &metav1.DeleteOptions{})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Unable to delete a secret.")
return false, util.NewUserError(codes.Unknown, "Secret unable to be deleted.")
}
return true, nil
}
func (c *Client) DeleteSecretKey(namespace string, secret *Secret) (deleted bool, err error) {
if len(secret.Data) == 0 {
return false, util.NewUserError(codes.InvalidArgument, "Data cannot be empty")
}
//Currently, support for 1 key only
key := ""
for dataKey := range secret.Data {
key = dataKey
break
}
//Check if the secret has the key to delete
secretFound, err := c.GetSecret(namespace, secret.Name)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Error with getting a secret.")
return false, util.NewUserError(codes.NotFound, "Secret not found.")
}
secretDataKeyExists := false
for secretDataKey := range secretFound.Data {
if secretDataKey == key {
secretDataKeyExists = true
break
}
}
if secretDataKeyExists {
// patchStringPath specifies a patch operation for a secret key.
type patchStringPath struct {
Op string `json:"op"`
Path string `json:"path"`
}
payload := []patchStringPath{{
Op: "remove",
Path: "/data/" + key,
}}
payloadBytes, _ := json.Marshal(payload)
_, err = c.CoreV1().Secrets(namespace).Patch(secret.Name, types.JSONPatchType, payloadBytes)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Unable to a key from a secret.")
return false, util.NewUserError(codes.Unknown, "Unable to delete key from Secret.")
}
return true, nil
}
return false, util.NewUserError(codes.NotFound, "Key not found in Secret.")
}
func (c *Client) AddSecretKeyValue(namespace string, secret *Secret) (inserted bool, err error) {
if len(secret.Data) == 0 {
return false, util.NewUserError(codes.InvalidArgument, "Data cannot be empty")
}
//Currently, support for 1 key only
var (
key string
value []byte
)
for dataKey, dataValue := range secret.Data {
key = dataKey
value = dataValue
break
}
secretFound, err := c.GetSecret(namespace, secret.Name)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Unable to find the secret.")
return false, util.NewUserError(codes.NotFound, "Secret not found.")
}
if secretFound == nil {
return false, util.NewUserError(codes.NotFound, "Secret not found.")
}
//Check if the secret has the key already
if len(secretFound.Data) > 0 {
secretDataKeyExists := false
for secretDataKey := range secretFound.Data {
if secretDataKey == key {
secretDataKeyExists = true
break
}
}
if secretDataKeyExists {
errorMsg := "Key: " + key + " already exists in secret."
return false, util.NewUserError(codes.AlreadyExists, errorMsg)
}
}
// patchStringPathAddNode specifies an add operation for a node
type patchStringPathAddNode struct {
Op string `json:"op"`
Path string `json:"path"`
Value map[string]string `json:"value"`
}
var payload []byte
// "/data" may not exist due to 0 items. Create it with our new key and value
if len(secretFound.Data) == 0 {
valMap := make(map[string]string)
valueEnc := base64.StdEncoding.EncodeToString([]byte(value))
valMap[key] = valueEnc
payloadAddNode := []patchStringPathAddNode{{
Op: "add",
Path: "/data",
Value: valMap,
}}
payload, err = json.Marshal(payloadAddNode)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Error building JSON.")
return false, util.NewUserError(codes.InvalidArgument, "Error building JSON.")
}
} else {
// patchStringPathAddKeyValue specifies an add operation, a key and value
type patchStringPathAddKeyValue struct {
Op string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}
valueEnc := base64.StdEncoding.EncodeToString([]byte(value))
payloadAddData := []patchStringPathAddKeyValue{{
Op: "add",
Path: "/data/" + key,
Value: valueEnc,
}}
payload, err = json.Marshal(payloadAddData)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Error building JSON.")
return false, util.NewUserError(codes.InvalidArgument, "Error building JSON.")
}
}
_, err = c.CoreV1().Secrets(namespace).Patch(secret.Name, types.JSONPatchType, payload)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Error adding key and value to Secret.")
return false, util.NewUserError(codes.Unknown, "Error adding key and value to Secret.")
}
return true, nil
}
func (c *Client) UpdateSecretKeyValue(namespace string, secret *Secret) (updated bool, err error) {
if len(secret.Data) == 0 {
return false, util.NewUserError(codes.InvalidArgument, "data cannot be empty.")
}
//Currently, support for 1 key only
var (
key string
value []byte
)
for dataKey, dataValue := range secret.Data {
key = dataKey
value = dataValue
break
}
//Check if the secret has the key to update
secretFound, err := c.GetSecret(namespace, secret.Name)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Unable to find secret.")
return false, util.NewUserError(codes.NotFound, "Unable to find secret.")
}
secretDataKeyExists := false
for secretDataKey := range secretFound.Data {
if secretDataKey == key {
secretDataKeyExists = true
break
}
}
if !secretDataKeyExists {
errorMsg := "Key: " + key + " not found in secret."
return false, util.NewUserError(codes.NotFound, errorMsg)
}
// patchStringPath specifies a patch operation for a secret key.
type patchStringPathWithValue struct {
Op string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}
valueEnc := base64.StdEncoding.EncodeToString([]byte(value))
payload := []patchStringPathWithValue{{
Op: "replace",
Path: "/data/" + key,
Value: valueEnc,
}}
payloadBytes, _ := json.Marshal(payload)
_, err = c.CoreV1().Secrets(namespace).Patch(secret.Name, types.JSONPatchType, payloadBytes)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Secret": secret,
"Error": err.Error(),
}).Error("Unable to update secret key value.")
return false, util.NewUserError(codes.Unknown, "Unable to update secret key value.")
}
return true, nil
}

31
pkg/secret_test.go Normal file
View File

@@ -0,0 +1,31 @@
package v1
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCreateSecret(t *testing.T) {
c := NewTestClient()
err := c.CreateSecret("namespace", &Secret{
Name: "name",
})
assert.Nil(t, err)
}
func TestGetSecret(t *testing.T) {
c := NewTestClient()
err := c.CreateSecret("namespace", &Secret{
Name: "name",
})
assert.Nil(t, err)
s, err := c.GetSecret("namespace", "name")
assert.Nil(t, err)
assert.NotNil(t, s)
assert.Equal(t, s.Name, "name")
}

105
pkg/types.go Normal file
View File

@@ -0,0 +1,105 @@
package v1
import (
"time"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type Namespace struct {
Name string
Labels map[string]string
}
type Secret struct {
Name string
Data map[string][]byte
}
type ConfigMap struct {
Name string
Data map[string]string
}
type LogEntry struct {
Timestamp time.Time
Content string
}
type Metric struct {
Name string
Value float64
Format string `json:"omitempty"`
}
type WorkflowTemplate struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
UID string
Name string
Manifest string
Version int32
IsLatest bool `db:"is_latest"`
IsArchived bool `db:"is_archived"`
}
func (wt *WorkflowTemplate) GetManifestBytes() []byte {
return []byte(wt.Manifest)
}
func (wt *WorkflowTemplate) GenerateUID() (string, error) {
uid, err := uuid.NewRandom()
if err != nil {
return "", err
}
wt.UID = uid.String()
return wt.UID, nil
}
const (
WorfklowPending WorkflowPhase = "Pending"
WorfklowRunning WorkflowPhase = "Running"
WorfklowSucceeded WorkflowPhase = "Succeeded"
WorfklowSkipped WorkflowPhase = "Skipped"
WorfklowFailed WorkflowPhase = "Failed"
WorfklowError WorkflowPhase = "Error"
)
type WorkflowPhase string
type Workflow struct {
ID uint64
CreatedAt time.Time `db:"created_at"`
UID string
Name string
GenerateName string
Parameters []WorkflowParameter
Manifest string
Phase WorkflowPhase
StartedAt time.Time
FinishedAt time.Time
WorkflowTemplate *WorkflowTemplate
}
type WorkflowParameter struct {
Name string
Value *string
}
type ListOptions = metav1.ListOptions
type PodGCStrategy = wfv1.PodGCStrategy
type WorkflowOptions struct {
Name string
GenerateName string
Entrypoint string
Parameters []WorkflowParameter
ServiceAccount string
Labels *map[string]string
ListOptions *ListOptions
PodGCStrategy *PodGCStrategy
}

692
pkg/workflow.go Normal file
View File

@@ -0,0 +1,692 @@
package v1
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"regexp"
"sort"
"strconv"
"strings"
"time"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/templateresolution"
argoutil "github.com/argoproj/argo/workflow/util"
"github.com/argoproj/argo/workflow/validate"
argojson "github.com/argoproj/pkg/json"
"github.com/onepanelio/core/s3"
"github.com/onepanelio/core/util"
"github.com/onepanelio/core/util/env"
"github.com/onepanelio/core/util/logging"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
var (
readEndOffset = env.GetEnv("ARTIFACT_RERPOSITORY_OBJECT_RANGE", "-102400")
workflowTemplateUIDLabelKey = "onepanel.io/workflow-template-uid"
workflowTemplateVersionLabelKey = "onepanel.io/workflow-template-version"
)
func typeWorkflow(wf *wfv1.Workflow) (workflow *Workflow) {
manifest, err := json.Marshal(wf)
if err != nil {
return
}
workflow = &Workflow{
UID: string(wf.UID),
CreatedAt: wf.CreationTimestamp.UTC(),
Name: wf.Name,
Manifest: string(manifest),
}
return
}
func unmarshalWorkflows(wfBytes []byte, strict bool) (wfs []wfv1.Workflow, err error) {
var wf wfv1.Workflow
var jsonOpts []argojson.JSONOpt
if strict {
jsonOpts = append(jsonOpts, argojson.DisallowUnknownFields)
}
err = argojson.Unmarshal(wfBytes, &wf, jsonOpts...)
if err == nil {
return []wfv1.Workflow{wf}, nil
}
wfs, err = common.SplitWorkflowYAMLFile(wfBytes, strict)
if err == nil {
return
}
return
}
func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts *WorkflowOptions) (err error) {
if opts.PodGCStrategy == nil {
if wf.Spec.PodGC == nil {
//TODO - Load this data from onepanel config-map or secret
podGCStrategy := env.GetEnv("ARGO_POD_GC_STRATEGY", "OnPodCompletion")
strategy := PodGCStrategy(podGCStrategy)
wf.Spec.PodGC = &wfv1.PodGC{
Strategy: strategy,
}
}
} else {
wf.Spec.PodGC = &wfv1.PodGC{
Strategy: *opts.PodGCStrategy,
}
}
addSecretValsToTemplate := true
secret, err := c.GetSecret(namespace, "onepanel-default-env")
if err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == "NotFound" {
addSecretValsToTemplate = false
} else {
return err
}
} else {
return err
}
}
// Create dev/shm volume
wf.Spec.Volumes = append(wf.Spec.Volumes, corev1.Volume{
Name: "sys-dshm",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{
Medium: corev1.StorageMediumMemory,
},
},
})
for i, template := range wf.Spec.Templates {
// Do not inject Istio sidecars in workflows
if template.Metadata.Annotations == nil {
wf.Spec.Templates[i].Metadata.Annotations = make(map[string]string)
}
wf.Spec.Templates[i].Metadata.Annotations["sidecar.istio.io/inject"] = "false"
if template.Container == nil {
continue
}
// Mount dev/shm
wf.Spec.Templates[i].Container.VolumeMounts = append(template.Container.VolumeMounts, corev1.VolumeMount{
Name: "sys-dshm",
MountPath: "/dev/shm",
})
// Always add output artifacts for metrics but make them optional
wf.Spec.Templates[i].Outputs.Artifacts = append(template.Outputs.Artifacts, wfv1.Artifact{
Name: "sys-metrics",
Path: "/tmp/sys-metrics.json",
Optional: true,
Archive: &wfv1.ArchiveStrategy{
None: &wfv1.NoneStrategy{},
},
})
if !addSecretValsToTemplate {
continue
}
//Generate ENV vars from secret, if there is a container present in the workflow
//Get template ENV vars, avoid over-writing them with secret values
for key, value := range secret.Data {
//Flag to prevent over-writing user's envs
addSecretAsEnv := true
for _, templateEnv := range template.Container.Env {
if templateEnv.Name == key {
addSecretAsEnv = false
break
}
}
if addSecretAsEnv {
template.Container.Env = append(template.Container.Env, corev1.EnvVar{
Name: key,
Value: string(value),
})
}
}
}
return
}
func (c *Client) create(namespace string, wf *wfv1.Workflow, opts *WorkflowOptions) (createdWorkflow *wfv1.Workflow, err error) {
if opts == nil {
opts = &WorkflowOptions{}
}
if opts.Name != "" {
wf.ObjectMeta.Name = opts.Name
}
if opts.GenerateName != "" {
wf.ObjectMeta.GenerateName = opts.GenerateName
}
if opts.Entrypoint != "" {
wf.Spec.Entrypoint = opts.Entrypoint
}
if opts.ServiceAccount != "" {
wf.Spec.ServiceAccountName = opts.ServiceAccount
}
if len(opts.Parameters) > 0 {
newParams := make([]wfv1.Parameter, 0)
passedParams := make(map[string]bool)
for _, param := range opts.Parameters {
newParams = append(newParams, wfv1.Parameter{
Name: param.Name,
Value: param.Value,
})
passedParams[param.Name] = true
}
for _, param := range wf.Spec.Arguments.Parameters {
if _, ok := passedParams[param.Name]; ok {
// this parameter was overridden via command line
continue
}
newParams = append(newParams, param)
}
wf.Spec.Arguments.Parameters = newParams
}
if opts.Labels != nil {
wf.ObjectMeta.Labels = *opts.Labels
}
if err = c.injectAutomatedFields(namespace, wf, opts); err != nil {
return nil, err
}
createdWorkflow, err = c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
if err != nil {
return nil, err
}
return
}
func (c *Client) ValidateWorkflow(namespace string, manifest []byte) (err error) {
workflows, err := unmarshalWorkflows(manifest, true)
if err != nil {
return
}
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(c.ArgoprojV1alpha1().WorkflowTemplates(namespace))
for _, wf := range workflows {
c.injectAutomatedFields(namespace, &wf, &WorkflowOptions{})
err = validate.ValidateWorkflow(wftmplGetter, &wf, validate.ValidateOpts{})
if err != nil {
return
}
}
return
}
func (c *Client) CreateWorkflow(namespace string, workflow *Workflow) (*Workflow, error) {
workflowTemplate, err := c.GetWorkflowTemplate(namespace, workflow.WorkflowTemplate.UID, workflow.WorkflowTemplate.Version)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error with getting workflow template.")
return nil, util.NewUserError(codes.NotFound, "Error with getting workflow template.")
}
// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
opts := &WorkflowOptions{}
re, _ := regexp.Compile(`[^a-zA-Z0-9-]{1,}`)
opts.GenerateName = strings.ToLower(re.ReplaceAllString(workflowTemplate.Name, `-`)) + "-"
for _, param := range workflow.Parameters {
opts.Parameters = append(opts.Parameters, WorkflowParameter{
Name: param.Name,
Value: param.Value,
})
}
if opts.Labels == nil {
opts.Labels = &map[string]string{}
}
(*opts.Labels)[workflowTemplateUIDLabelKey] = workflowTemplate.UID
(*opts.Labels)[workflowTemplateVersionLabelKey] = fmt.Sprint(workflowTemplate.Version)
workflows, err := unmarshalWorkflows([]byte(workflow.Manifest), true)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
var createdWorkflows []*wfv1.Workflow
for _, wf := range workflows {
createdWorkflow, err := c.create(namespace, &wf, opts)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error parsing workflow.")
return nil, err
}
createdWorkflows = append(createdWorkflows, createdWorkflow)
}
workflow.Name = createdWorkflows[0].Name
workflow.CreatedAt = createdWorkflows[0].CreationTimestamp.UTC()
workflow.UID = string(createdWorkflows[0].ObjectMeta.UID)
workflow.WorkflowTemplate = workflowTemplate
// Manifests could get big, don't return them in this case.
workflow.WorkflowTemplate.Manifest = ""
return workflow, nil
}
func (c *Client) GetWorkflow(namespace, name string) (workflow *Workflow, err error) {
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Workflow not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
uid := wf.ObjectMeta.Labels[workflowTemplateUIDLabelKey]
version, err := strconv.ParseInt(
wf.ObjectMeta.Labels[workflowTemplateVersionLabelKey],
10,
32,
)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Invalid version number.")
return nil, util.NewUserError(codes.InvalidArgument, "Invalid version number.")
}
workflowTemplate, err := c.GetWorkflowTemplate(namespace, uid, int32(version))
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Cannot get Workflow Template.")
return nil, util.NewUserError(codes.NotFound, "Cannot get Workflow Template.")
}
// TODO: Do we need to parse parameters into workflow.Parameters?
manifest, err := json.Marshal(wf)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Invalid status.")
return nil, util.NewUserError(codes.InvalidArgument, "Invalid status.")
}
workflow = &Workflow{
UID: string(wf.UID),
CreatedAt: wf.CreationTimestamp.UTC(),
Name: wf.Name,
Phase: WorkflowPhase(wf.Status.Phase),
StartedAt: wf.Status.StartedAt.UTC(),
FinishedAt: wf.Status.FinishedAt.UTC(),
Manifest: string(manifest),
WorkflowTemplate: workflowTemplate,
}
return
}
func (c *Client) ListWorkflows(namespace, workflowTemplateUID, workflowTemplateVersion string) (workflows []*Workflow, err error) {
opts := &WorkflowOptions{}
if workflowTemplateUID != "" {
labelSelect := fmt.Sprintf("%s=%s", workflowTemplateUIDLabelKey, workflowTemplateUID)
if workflowTemplateVersion != "" {
labelSelect = fmt.Sprintf("%s,%s=%s", labelSelect, workflowTemplateVersionLabelKey, workflowTemplateVersion)
}
opts.ListOptions = &ListOptions{
LabelSelector: labelSelect,
}
}
workflowList, err := c.ArgoprojV1alpha1().Workflows(namespace).List(*opts.ListOptions)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplateUID": workflowTemplateUID,
"WorkflowTemplateVersion": workflowTemplateVersion,
"Error": err.Error(),
}).Error("Workflows not found.")
return nil, util.NewUserError(codes.NotFound, "Workflows not found.")
}
wfs := workflowList.Items
sort.Slice(wfs, func(i, j int) bool {
ith := wfs[i].CreationTimestamp.Time
jth := wfs[j].CreationTimestamp.Time
//Most recent first
return ith.After(jth)
})
for _, wf := range wfs {
workflows = append(workflows, &Workflow{
Name: wf.ObjectMeta.Name,
UID: string(wf.ObjectMeta.UID),
Phase: WorkflowPhase(wf.Status.Phase),
StartedAt: wf.Status.StartedAt.UTC(),
FinishedAt: wf.Status.FinishedAt.UTC(),
CreatedAt: wf.CreationTimestamp.UTC(),
})
}
return
}
func (c *Client) WatchWorkflow(namespace, name string) (<-chan *Workflow, error) {
_, err := c.GetWorkflow(namespace, name)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Workflow template not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
fieldSelector, _ := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", name))
watcher, err := c.ArgoprojV1alpha1().Workflows(namespace).Watch(metav1.ListOptions{
FieldSelector: fieldSelector.String(),
})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Error": err.Error(),
}).Error("Watch Workflow error.")
return nil, util.NewUserError(codes.Unknown, "Error with watching workflow.")
}
var workflow *wfv1.Workflow
workflowWatcher := make(chan *Workflow)
ticker := time.NewTicker(time.Second)
go func() {
for {
select {
case next := <-watcher.ResultChan():
workflow, _ = next.Object.(*wfv1.Workflow)
case <-ticker.C:
}
if workflow == nil {
continue
}
manifest, err := json.Marshal(workflow)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error with trying to JSON Marshal workflow.Status.")
continue
}
workflowWatcher <- &Workflow{
UID: string(workflow.UID),
Name: workflow.Name,
Manifest: string(manifest),
}
if !workflow.Status.FinishedAt.IsZero() {
break
}
}
close(workflowWatcher)
watcher.Stop()
}()
return workflowWatcher, nil
}
func (c *Client) GetWorkflowLogs(namespace, name, podName, containerName string) (<-chan *LogEntry, error) {
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{})
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Workflow not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
var (
stream io.ReadCloser
s3Client *s3.Client
config map[string]string
endOffset int
)
if wf.Status.Nodes[podName].Completed() {
config, err = c.getNamespaceConfig(namespace)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't get configuration.")
return nil, util.NewUserError(codes.PermissionDenied, "Can't get configuration.")
}
s3Client, err = c.getS3Client(namespace, config)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't connect to S3 storage.")
return nil, util.NewUserError(codes.PermissionDenied, "Can't connect to S3 storage.")
}
opts := s3.GetObjectOptions{}
endOffset, err = strconv.Atoi(readEndOffset)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, "Invaild range.")
}
opts.SetRange(0, int64(endOffset))
stream, err = s3Client.GetObject(config[artifactRepositoryBucketKey], "artifacts/"+namespace+"/"+name+"/"+podName+"/"+containerName+".log", opts)
} else {
stream, err = c.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Container: containerName,
Follow: true,
Timestamps: true,
}).Stream()
}
// TODO: Catch exact kubernetes error
//Todo: Can above todo be removed with the logging error?
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Error with logs.")
return nil, util.NewUserError(codes.NotFound, "Log not found.")
}
logWatcher := make(chan *LogEntry)
go func() {
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
text := scanner.Text()
parts := strings.Split(text, " ")
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
logWatcher <- &LogEntry{Content: text}
} else {
logWatcher <- &LogEntry{
Timestamp: timestamp,
Content: strings.Join(parts[1:], " "),
}
}
}
close(logWatcher)
}()
return logWatcher, err
}
func (c *Client) GetWorkflowMetrics(namespace, name, podName string) (metrics []*Metric, err error) {
_, err = c.GetWorkflow(namespace, name)
if err != nil {
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
var (
stream io.ReadCloser
s3Client *s3.Client
config map[string]string
)
config, err = c.getNamespaceConfig(namespace)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"Error": err.Error(),
}).Error("Can't get configuration.")
return nil, util.NewUserError(codes.PermissionDenied, "Can't get configuration.")
}
s3Client, err = c.getS3Client(namespace, config)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"Error": err.Error(),
}).Error("Can't connect to S3 storage.")
return nil, util.NewUserError(codes.PermissionDenied, "Can't connect to S3 storage.")
}
opts := s3.GetObjectOptions{}
stream, err = s3Client.GetObject(config[artifactRepositoryBucketKey], "artifacts/"+namespace+"/"+name+"/"+podName+"/sys-metrics.json", opts)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"Error": err.Error(),
}).Error("Metrics do not exist.")
return nil, util.NewUserError(codes.NotFound, "Metrics do not exist.")
}
content, err := ioutil.ReadAll(stream)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"Error": err.Error(),
}).Error("Unknown.")
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
}
if err = json.Unmarshal(content, &metrics); err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Name": name,
"PodName": podName,
"Error": err.Error(),
}).Error("Error parsing metrics.")
return nil, util.NewUserError(codes.InvalidArgument, "Error parsing metrics.")
}
return
}
func (c *Client) RetryWorkflow(namespace, name string) (workflow *Workflow, err error) {
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return
}
wf, err = argoutil.RetryWorkflow(c, c.ArgoprojV1alpha1().Workflows(namespace), wf)
workflow = typeWorkflow(wf)
return
}
func (c *Client) ResubmitWorkflow(namespace, name string) (workflow *Workflow, err error) {
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return
}
wf, err = argoutil.FormulateResubmitWorkflow(wf, false)
if err != nil {
return
}
wf, err = argoutil.SubmitWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), c, namespace, wf, &argoutil.SubmitOpts{})
if err != nil {
return
}
workflow = typeWorkflow(wf)
return
}
func (c *Client) ResumeWorkflow(namespace, name string) (workflow *Workflow, err error) {
err = argoutil.ResumeWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), name)
if err != nil {
return
}
wf, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(name, metav1.GetOptions{})
workflow = typeWorkflow(wf)
return
}
func (c *Client) SuspendWorkflow(namespace, name string) (err error) {
err = argoutil.SuspendWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), name)
return
}
func (c *Client) TerminateWorkflow(namespace, name string) (err error) {
err = argoutil.TerminateWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), name)
return
}

475
pkg/workflow_template.go Normal file
View File

@@ -0,0 +1,475 @@
package v1
import (
"database/sql"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/onepanelio/core/util"
"github.com/onepanelio/core/util/logging"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
)
var sb = sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
func (c *Client) insertWorkflowTemplateVersion(workflowTemplate *WorkflowTemplate, runner sq.BaseRunner) (err error) {
err = sb.Insert("workflow_template_versions").
SetMap(sq.Eq{
"workflow_template_id": workflowTemplate.ID,
"manifest": workflowTemplate.Manifest,
"version": int32(time.Now().Unix()),
"is_latest": workflowTemplate.IsLatest,
}).
Suffix("RETURNING version").
RunWith(runner).
QueryRow().Scan(&workflowTemplate.Version)
return
}
func (c *Client) createWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
uid, err := workflowTemplate.GenerateUID()
if err != nil {
return nil, err
}
tx, err := c.DB.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
err = sb.Insert("workflow_templates").
SetMap(sq.Eq{
"uid": uid,
"name": workflowTemplate.Name,
"namespace": namespace,
}).
Suffix("RETURNING id").
RunWith(tx).
QueryRow().Scan(&workflowTemplate.ID)
if err != nil {
return nil, err
}
if err = c.insertWorkflowTemplateVersion(workflowTemplate, tx); err != nil {
return nil, err
}
if err = tx.Commit(); err != nil {
return nil, err
}
return workflowTemplate, nil
}
func (c *Client) removeIsLatestFromWorkflowTemplateVersions(workflowTemplate *WorkflowTemplate) error {
query, args, err := sb.Update("workflow_template_versions").
Set("is_latest", true).
Where(sq.Eq{
"workflow_template_id": workflowTemplate.ID,
"is_latest": false,
}).
ToSql()
if err != nil {
return err
}
if _, err := c.DB.Exec(query, args...); err != nil {
return err
}
return nil
}
func (c *Client) createWorkflowTemplateVersion(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
query, args, err := sb.Select("id, name").
From("workflow_templates").
Where(sq.Eq{
"namespace": namespace,
"uid": workflowTemplate.UID,
}).
Limit(1).ToSql()
if err != nil {
return nil, err
}
if err = c.DB.Get(workflowTemplate, query, args...); err == sql.ErrNoRows {
return nil, nil
}
if err = c.insertWorkflowTemplateVersion(workflowTemplate, c.DB); err != nil {
return nil, err
}
return workflowTemplate, nil
}
func (c *Client) updateWorkflowTemplateVersion(workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
query, args, err := sb.Update("workflow_template_versions").
Set("manifest", workflowTemplate.Manifest).
Where(sq.Eq{
"workflow_template_id": workflowTemplate.ID,
"version": workflowTemplate.Version,
}).
ToSql()
if err != nil {
return nil, err
}
if _, err := c.DB.Exec(query, args...); err != nil {
return nil, err
}
return workflowTemplate, nil
}
func (c *Client) workflowTemplatesSelectBuilder(namespace string) sq.SelectBuilder {
sb := sb.Select("wt.id", "wt.created_at", "wt.uid", "wt.name", "wt.is_archived", "wtv.version", "wtv.is_latest").
From("workflow_template_versions wtv").
Join("workflow_templates wt ON wt.id = wtv.workflow_template_id").
Where(sq.Eq{
"wt.namespace": namespace,
})
return sb
}
func (c *Client) getWorkflowTemplate(namespace, uid string, version int32) (workflowTemplate *WorkflowTemplate, err error) {
workflowTemplate = &WorkflowTemplate{}
sb := c.workflowTemplatesSelectBuilder(namespace).Where(sq.Eq{"wt.uid": uid}).
Columns("wtv.manifest").
OrderBy("wtv.version desc").
Limit(1)
if version != 0 {
sb = sb.Where(sq.Eq{"wtv.version": version})
}
query, args, err := sb.ToSql()
if err != nil {
return
}
if err = c.DB.Get(workflowTemplate, query, args...); err == sql.ErrNoRows {
err = nil
workflowTemplate = nil
}
return
}
func (c *Client) listWorkflowTemplateVersions(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
workflowTemplateVersions = []*WorkflowTemplate{}
query, args, err := c.workflowTemplatesSelectBuilder(namespace).Where(sq.Eq{"wt.uid": uid}).
Columns("wtv.manifest").
OrderBy("wtv.version desc").ToSql()
if err != nil {
return
}
err = c.DB.Select(&workflowTemplateVersions, query, args...)
return
}
func (c *Client) listWorkflowTemplates(namespace string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
workflowTemplateVersions = []*WorkflowTemplate{}
query, args, err := c.workflowTemplatesSelectBuilder(namespace).
Options("DISTINCT ON (wt.id) wt.id,").
Where(sq.Eq{
"wt.is_archived": false,
}).
OrderBy("wt.id desc").ToSql()
if err != nil {
return
}
err = c.DB.Select(&workflowTemplateVersions, query, args...)
return
}
func (c *Client) archiveWorkflowTemplate(namespace, uid string) (bool, error) {
query, args, err := sb.Update("workflow_templates").
Set("is_archived", true).
Where(sq.Eq{
"uid": uid,
"namespace": namespace,
}).
ToSql()
if err != nil {
return false, err
}
if _, err := c.DB.Exec(query, args...); err != nil {
return false, err
}
return true, nil
}
func (c *Client) CreateWorkflowTemplate(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
allowed, err := c.IsAuthorized(namespace, "create", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return nil, util.NewUserError(codes.Unknown, "Could not create workflow template.")
}
if !allowed {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
// validate workflow template
if err := c.ValidateWorkflow(namespace, workflowTemplate.GetManifestBytes()); err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Workflow could not be validated.")
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
workflowTemplate, err = c.createWorkflowTemplate(namespace, workflowTemplate)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not create workflow template.")
return nil, util.NewUserErrorWrap(err, "Workflow template")
}
return workflowTemplate, nil
}
func (c *Client) CreateWorkflowTemplateVersion(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
allowed, err := c.IsAuthorized(namespace, "create", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return nil, util.NewUserError(codes.Unknown, "Could not create template version.")
}
if !allowed {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
// validate workflow template
if err := c.ValidateWorkflow(namespace, workflowTemplate.GetManifestBytes()); err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Workflow could not be validated.")
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
if err := c.removeIsLatestFromWorkflowTemplateVersions(workflowTemplate); err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not remove IsLatest from workflow template versions.")
return nil, util.NewUserError(codes.Unknown, "Unable to Create Workflow Template Version.")
}
workflowTemplate, err = c.createWorkflowTemplateVersion(namespace, workflowTemplate)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not create workflow template version.")
return nil, util.NewUserErrorWrap(err, "Workflow template")
}
if workflowTemplate == nil {
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
return workflowTemplate, nil
}
func (c *Client) UpdateWorkflowTemplateVersion(namespace string, workflowTemplate *WorkflowTemplate) (*WorkflowTemplate, error) {
allowed, err := c.IsAuthorized(namespace, "update", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return nil, util.NewUserError(codes.Unknown, "Could not update workflow template version.")
}
if !allowed {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
// validate workflow template
if err := c.ValidateWorkflow(namespace, workflowTemplate.GetManifestBytes()); err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Workflow could not be validated.")
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}
originalWorkflowTemplate, err := c.getWorkflowTemplate(namespace, workflowTemplate.UID, workflowTemplate.Version)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not get workflow template.")
return nil, util.NewUserError(codes.Unknown, "Could not update workflow template version.")
}
workflowTemplate.ID = originalWorkflowTemplate.ID
workflowTemplate, err = c.updateWorkflowTemplateVersion(workflowTemplate)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Could not update workflow template version.")
return nil, util.NewUserErrorWrap(err, "Workflow template")
}
if workflowTemplate == nil {
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
return workflowTemplate, nil
}
func (c *Client) GetWorkflowTemplate(namespace, uid string, version int32) (workflowTemplate *WorkflowTemplate, err error) {
allowed, err := c.IsAuthorized(namespace, "get", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
}
if !allowed {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
workflowTemplate, err = c.getWorkflowTemplate(namespace, uid, version)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"WorkflowTemplate": workflowTemplate,
"Error": err.Error(),
}).Error("Get Workflow Template failed.")
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
}
if workflowTemplate == nil {
return nil, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
return
}
func (c *Client) ListWorkflowTemplateVersions(namespace, uid string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
allowed, err := c.IsAuthorized(namespace, "list", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return nil, util.NewUserError(codes.Unknown, "Unknown error.")
}
if !allowed {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
workflowTemplateVersions, err = c.listWorkflowTemplateVersions(namespace, uid)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Workflow template versions not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow template versions not found.")
}
return
}
func (c *Client) ListWorkflowTemplates(namespace string) (workflowTemplateVersions []*WorkflowTemplate, err error) {
allowed, err := c.IsAuthorized(namespace, "list", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return nil, util.NewUserError(codes.Unknown, "Unable to list workflow templates.")
}
if !allowed {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
workflowTemplateVersions, err = c.listWorkflowTemplates(namespace)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"Error": err.Error(),
}).Error("Workflow templates not found.")
return nil, util.NewUserError(codes.NotFound, "Workflow templates not found.")
}
return
}
func (c *Client) ArchiveWorkflowTemplate(namespace, uid string) (archived bool, err error) {
allowed, err := c.IsAuthorized(namespace, "delete", "argoproj.io", "workflow", "")
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("IsAuthorized failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
if !allowed {
return false, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
workflowTemplate, err := c.getWorkflowTemplate(namespace, uid, 0)
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Get Workflow Template failed.")
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
if workflowTemplate == nil {
return false, util.NewUserError(codes.NotFound, "Workflow template not found.")
}
archived, err = c.archiveWorkflowTemplate(namespace, uid)
if !archived || err != nil {
if err != nil {
logging.Logger.Log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Archive Workflow Template failed.")
}
return false, util.NewUserError(codes.Unknown, "Unable to archive workflow template.")
}
return
}

View File

@@ -7,16 +7,12 @@ import (
"github.com/golang/protobuf/ptypes/empty"
"github.com/onepanelio/core/api"
v1 "github.com/onepanelio/core/pkg"
"github.com/onepanelio/core/util"
"google.golang.org/grpc/codes"
)
type NamespaceServer struct {
kubeConfig *v1.Config
}
type NamespaceServer struct{}
func NewNamespaceServer(kubeConfig *v1.Config) *NamespaceServer {
return &NamespaceServer{kubeConfig: kubeConfig}
func NewNamespaceServer() *NamespaceServer {
return &NamespaceServer{}
}
func apiNamespace(ns *v1.Namespace) (namespace *api.Namespace) {
@@ -28,11 +24,7 @@ func apiNamespace(ns *v1.Namespace) (namespace *api.Namespace) {
}
func (s *NamespaceServer) ListNamespaces(ctx context.Context, empty *empty.Empty) (*api.ListNamespacesResponse, error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
namespaces, err := client.ListNamespaces()
if errors.As(err, &userError) {
return nil, userError.GRPCError()

View File

@@ -7,16 +7,12 @@ import (
"github.com/golang/protobuf/ptypes/empty"
"github.com/onepanelio/core/api"
v1 "github.com/onepanelio/core/pkg"
"github.com/onepanelio/core/util"
"google.golang.org/grpc/codes"
)
type SecretServer struct {
kubeConfig *v1.Config
}
type SecretServer struct{}
func NewSecretServer(kubeConfig *v1.Config) *NamespaceServer {
return &NamespaceServer{kubeConfig: kubeConfig}
func NewSecretServer() *SecretServer {
return &SecretServer{}
}
func apiSecret(s *v1.Secret) *api.Secret {
@@ -27,11 +23,7 @@ func apiSecret(s *v1.Secret) *api.Secret {
}
func (s *SecretServer) CreateSecret(ctx context.Context, req *api.CreateSecretRequest) (*empty.Empty, error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
err := client.CreateSecret(req.Namespace, &v1.Secret{
Name: req.Secret.Name,
Data: req.Secret.Data,
@@ -43,11 +35,7 @@ func (s *SecretServer) CreateSecret(ctx context.Context, req *api.CreateSecretRe
}
func (s *SecretServer) SecretExists(ctx context.Context, req *api.SecretExistsRequest) (secretExists *api.SecretExistsResponse, err error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
secretExistsBool, err := client.SecretExists(req.Namespace, req.Name)
if errors.As(err, &userError) {
return &api.SecretExistsResponse{
@@ -60,11 +48,7 @@ func (s *SecretServer) SecretExists(ctx context.Context, req *api.SecretExistsRe
}
func (s *SecretServer) GetSecret(ctx context.Context, req *api.GetSecretRequest) (*api.Secret, error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
secret, err := client.GetSecret(req.Namespace, req.Name)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
@@ -73,11 +57,7 @@ func (s *SecretServer) GetSecret(ctx context.Context, req *api.GetSecretRequest)
}
func (s *SecretServer) ListSecrets(ctx context.Context, req *api.ListSecretsRequest) (*api.ListSecretsResponse, error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
secrets, err := client.ListSecrets(req.Namespace)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
@@ -95,11 +75,7 @@ func (s *SecretServer) ListSecrets(ctx context.Context, req *api.ListSecretsRequ
}
func (s *SecretServer) DeleteSecret(ctx context.Context, req *api.DeleteSecretRequest) (deleted *api.DeleteSecretResponse, err error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
isDeleted, err := client.DeleteSecret(req.Namespace, req.Name)
if errors.As(err, &userError) {
return &api.DeleteSecretResponse{
@@ -112,11 +88,7 @@ func (s *SecretServer) DeleteSecret(ctx context.Context, req *api.DeleteSecretRe
}
func (s *SecretServer) DeleteSecretKey(ctx context.Context, req *api.DeleteSecretKeyRequest) (deleted *api.DeleteSecretKeyResponse, err error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
secret := v1.Secret{
Name: req.Secret.Name,
Data: map[string]string{
@@ -137,11 +109,7 @@ func (s *SecretServer) DeleteSecretKey(ctx context.Context, req *api.DeleteSecre
}
func (s *SecretServer) AddSecretKeyValue(ctx context.Context, req *api.AddSecretKeyValueRequest) (updated *api.AddSecretKeyValueResponse, err error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
secret := &v1.Secret{
Name: req.Secret.Name,
Data: req.Secret.Data,
@@ -160,11 +128,7 @@ func (s *SecretServer) AddSecretKeyValue(ctx context.Context, req *api.AddSecret
}
func (s *SecretServer) UpdateSecretKeyValue(ctx context.Context, req *api.UpdateSecretKeyValueRequest) (updated *api.UpdateSecretKeyValueResponse, err error) {
client, err := v1.NewClient(s.kubeConfig, "")
if err != nil {
return nil, util.NewUserError(codes.PermissionDenied, "Permission denied.")
}
client := ctx.Value("kubeClient").(*v1.Client)
secret := v1.Secret{
Name: req.Secret.Name,
Data: req.Secret.Data,

View File

@@ -8,20 +8,17 @@ import (
"github.com/golang/protobuf/ptypes/empty"
"github.com/onepanelio/core/api"
"github.com/onepanelio/core/manager"
"github.com/onepanelio/core/model"
v1 "github.com/onepanelio/core/pkg"
"github.com/onepanelio/core/util/ptr"
)
type WorkflowServer struct {
resourceManager *manager.ResourceManager
type WorkflowServer struct{}
func NewWorkflowServer() *WorkflowServer {
return &WorkflowServer{}
}
func NewWorkflowServer(resourceManager *manager.ResourceManager) *WorkflowServer {
return &WorkflowServer{resourceManager: resourceManager}
}
func apiWorkflow(wf *model.Workflow) (workflow *api.Workflow) {
func apiWorkflow(wf *v1.Workflow) (workflow *api.Workflow) {
workflow = &api.Workflow{
CreatedAt: wf.CreatedAt.Format(time.RFC3339),
Name: wf.Name,
@@ -47,7 +44,7 @@ func apiWorkflow(wf *model.Workflow) (workflow *api.Workflow) {
return
}
func apiWorkflowTemplate(wft *model.WorkflowTemplate) *api.WorkflowTemplate {
func apiWorkflowTemplate(wft *v1.WorkflowTemplate) *api.WorkflowTemplate {
return &api.WorkflowTemplate{
Uid: wft.UID,
CreatedAt: wft.CreatedAt.UTC().Format(time.RFC3339),
@@ -60,20 +57,21 @@ func apiWorkflowTemplate(wft *model.WorkflowTemplate) *api.WorkflowTemplate {
}
func (s *WorkflowServer) CreateWorkflow(ctx context.Context, req *api.CreateWorkflowRequest) (*api.Workflow, error) {
workflow := &model.Workflow{
WorkflowTemplate: &model.WorkflowTemplate{
workflow := &v1.Workflow{
WorkflowTemplate: &v1.WorkflowTemplate{
UID: req.Workflow.WorkflowTemplate.Uid,
Version: req.Workflow.WorkflowTemplate.Version,
},
}
for _, param := range req.Workflow.Parameters {
workflow.Parameters = append(workflow.Parameters, model.Parameter{
workflow.Parameters = append(workflow.Parameters, v1.WorkflowParameter{
Name: param.Name,
Value: ptr.String(param.Value),
})
}
wf, err := s.resourceManager.CreateWorkflow(req.Namespace, workflow)
client := ctx.Value("kubeClient").(*v1.Client)
wf, err := client.CreateWorkflow(req.Namespace, workflow)
if err != nil {
if errors.As(err, &userError) {
return nil, userError.GRPCError()
@@ -84,7 +82,8 @@ func (s *WorkflowServer) CreateWorkflow(ctx context.Context, req *api.CreateWork
}
func (s *WorkflowServer) GetWorkflow(ctx context.Context, req *api.GetWorkflowRequest) (*api.Workflow, error) {
wf, err := s.resourceManager.GetWorkflow(req.Namespace, req.Name)
client := ctx.Value("kubeClient").(*v1.Client)
wf, err := client.GetWorkflow(req.Namespace, req.Name)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -93,12 +92,13 @@ func (s *WorkflowServer) GetWorkflow(ctx context.Context, req *api.GetWorkflowRe
}
func (s *WorkflowServer) WatchWorkflow(req *api.WatchWorkflowRequest, stream api.WorkflowService_WatchWorkflowServer) error {
watcher, err := s.resourceManager.WatchWorkflow(req.Namespace, req.Name)
client := stream.Context().Value("kubeClient").(*v1.Client)
watcher, err := client.WatchWorkflow(req.Namespace, req.Name)
if errors.As(err, &userError) {
return userError.GRPCError()
}
wf := &model.Workflow{}
wf := &v1.Workflow{}
ticker := time.NewTicker(time.Second)
for {
select {
@@ -118,12 +118,13 @@ func (s *WorkflowServer) WatchWorkflow(req *api.WatchWorkflowRequest, stream api
}
func (s *WorkflowServer) GetWorkflowLogs(req *api.GetWorkflowLogsRequest, stream api.WorkflowService_GetWorkflowLogsServer) error {
watcher, err := s.resourceManager.GetWorkflowLogs(req.Namespace, req.Name, req.PodName, req.ContainerName)
client := stream.Context().Value("kubeClient").(*v1.Client)
watcher, err := client.GetWorkflowLogs(req.Namespace, req.Name, req.PodName, req.ContainerName)
if errors.As(err, &userError) {
return userError.GRPCError()
}
le := &model.LogEntry{}
le := &v1.LogEntry{}
ticker := time.NewTicker(time.Second)
for {
select {
@@ -146,7 +147,8 @@ func (s *WorkflowServer) GetWorkflowLogs(req *api.GetWorkflowLogsRequest, stream
}
func (s *WorkflowServer) GetWorkflowMetrics(ctx context.Context, req *api.GetWorkflowMetricsRequest) (*api.GetWorkflowMetricsResponse, error) {
metrics, err := s.resourceManager.GetWorkflowMetrics(req.Namespace, req.Name, req.PodName)
client := ctx.Value("kubeClient").(*v1.Client)
metrics, err := client.GetWorkflowMetrics(req.Namespace, req.Name, req.PodName)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -164,6 +166,7 @@ func (s *WorkflowServer) GetWorkflowMetrics(ctx context.Context, req *api.GetWor
}
func (s *WorkflowServer) ListWorkflows(ctx context.Context, req *api.ListWorkflowsRequest) (*api.ListWorkflowsResponse, error) {
client := ctx.Value("kubeClient").(*v1.Client)
if req.PageSize <= 0 {
req.PageSize = 15
}
@@ -172,7 +175,7 @@ func (s *WorkflowServer) ListWorkflows(ctx context.Context, req *api.ListWorkflo
req.Page = 1
}
workflows, err := s.resourceManager.ListWorkflows(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion)
workflows, err := client.ListWorkflows(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -203,7 +206,8 @@ func (s *WorkflowServer) ListWorkflows(ctx context.Context, req *api.ListWorkflo
}
func (s *WorkflowServer) ResubmitWorkflow(ctx context.Context, req *api.ResubmitWorkflowRequest) (*api.Workflow, error) {
wf, err := s.resourceManager.ResubmitWorkflow(req.Namespace, req.Name)
client := ctx.Value("kubeClient").(*v1.Client)
wf, err := client.ResubmitWorkflow(req.Namespace, req.Name)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -212,7 +216,8 @@ func (s *WorkflowServer) ResubmitWorkflow(ctx context.Context, req *api.Resubmit
}
func (s *WorkflowServer) TerminateWorkflow(ctx context.Context, req *api.TerminateWorkflowRequest) (*empty.Empty, error) {
err := s.resourceManager.TerminateWorkflow(req.Namespace, req.Name)
client := ctx.Value("kubeClient").(*v1.Client)
err := client.TerminateWorkflow(req.Namespace, req.Name)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -221,11 +226,12 @@ func (s *WorkflowServer) TerminateWorkflow(ctx context.Context, req *api.Termina
}
func (s *WorkflowServer) CreateWorkflowTemplate(ctx context.Context, req *api.CreateWorkflowTemplateRequest) (*api.WorkflowTemplate, error) {
workflowTemplate := &model.WorkflowTemplate{
workflowTemplate := &v1.WorkflowTemplate{
Name: req.WorkflowTemplate.Name,
Manifest: req.WorkflowTemplate.Manifest,
}
workflowTemplate, err := s.resourceManager.CreateWorkflowTemplate(req.Namespace, workflowTemplate)
client := ctx.Value("kubeClient").(*v1.Client)
workflowTemplate, err := client.CreateWorkflowTemplate(req.Namespace, workflowTemplate)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -236,12 +242,14 @@ func (s *WorkflowServer) CreateWorkflowTemplate(ctx context.Context, req *api.Cr
}
func (s *WorkflowServer) CreateWorkflowTemplateVersion(ctx context.Context, req *api.CreateWorkflowTemplateRequest) (*api.WorkflowTemplate, error) {
workflowTemplate := &model.WorkflowTemplate{
workflowTemplate := &v1.WorkflowTemplate{
UID: req.WorkflowTemplate.Uid,
Name: req.WorkflowTemplate.Name,
Manifest: req.WorkflowTemplate.Manifest,
}
workflowTemplate, err := s.resourceManager.CreateWorkflowTemplateVersion(req.Namespace, workflowTemplate)
client := ctx.Value("kubeClient").(*v1.Client)
workflowTemplate, err := client.CreateWorkflowTemplateVersion(req.Namespace, workflowTemplate)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -253,14 +261,14 @@ func (s *WorkflowServer) CreateWorkflowTemplateVersion(ctx context.Context, req
}
func (s *WorkflowServer) UpdateWorkflowTemplateVersion(ctx context.Context, req *api.UpdateWorkflowTemplateVersionRequest) (*api.WorkflowTemplate, error) {
workflowTemplate := &model.WorkflowTemplate{
workflowTemplate := &v1.WorkflowTemplate{
UID: req.WorkflowTemplate.Uid,
Name: req.WorkflowTemplate.Name,
Manifest: req.WorkflowTemplate.Manifest,
Version: req.WorkflowTemplate.Version,
}
workflowTemplate, err := s.resourceManager.UpdateWorkflowTemplateVersion(req.Namespace, workflowTemplate)
client := ctx.Value("kubeClient").(*v1.Client)
workflowTemplate, err := client.UpdateWorkflowTemplateVersion(req.Namespace, workflowTemplate)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -272,7 +280,8 @@ func (s *WorkflowServer) UpdateWorkflowTemplateVersion(ctx context.Context, req
}
func (s *WorkflowServer) GetWorkflowTemplate(ctx context.Context, req *api.GetWorkflowTemplateRequest) (*api.WorkflowTemplate, error) {
workflowTemplate, err := s.resourceManager.GetWorkflowTemplate(req.Namespace, req.Uid, req.Version)
client := ctx.Value("kubeClient").(*v1.Client)
workflowTemplate, err := client.GetWorkflowTemplate(req.Namespace, req.Uid, req.Version)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -281,7 +290,8 @@ func (s *WorkflowServer) GetWorkflowTemplate(ctx context.Context, req *api.GetWo
}
func (s *WorkflowServer) ListWorkflowTemplateVersions(ctx context.Context, req *api.ListWorkflowTemplateVersionsRequest) (*api.ListWorkflowTemplateVersionsResponse, error) {
workflowTemplateVersions, err := s.resourceManager.ListWorkflowTemplateVersions(req.Namespace, req.Uid)
client := ctx.Value("kubeClient").(*v1.Client)
workflowTemplateVersions, err := client.ListWorkflowTemplateVersions(req.Namespace, req.Uid)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -298,7 +308,8 @@ func (s *WorkflowServer) ListWorkflowTemplateVersions(ctx context.Context, req *
}
func (s *WorkflowServer) ListWorkflowTemplates(ctx context.Context, req *api.ListWorkflowTemplatesRequest) (*api.ListWorkflowTemplatesResponse, error) {
workflowTemplates, err := s.resourceManager.ListWorkflowTemplates(req.Namespace)
client := ctx.Value("kubeClient").(*v1.Client)
workflowTemplates, err := client.ListWorkflowTemplates(req.Namespace)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}
@@ -315,7 +326,8 @@ func (s *WorkflowServer) ListWorkflowTemplates(ctx context.Context, req *api.Lis
}
func (s *WorkflowServer) ArchiveWorkflowTemplate(ctx context.Context, req *api.ArchiveWorkflowTemplateRequest) (*api.ArchiveWorkflowTemplateResponse, error) {
archived, err := s.resourceManager.ArchiveWorkflowTemplate(req.Namespace, req.Uid)
client := ctx.Value("kubeClient").(*v1.Client)
archived, err := client.ArchiveWorkflowTemplate(req.Namespace, req.Uid)
if errors.As(err, &userError) {
return nil, userError.GRPCError()
}