update: cache system config for entire application and not just per request

This commit is contained in:
Andrey Melnikov
2020-06-10 14:19:33 -07:00
parent 79d5babfcb
commit 483c79abb3
5 changed files with 25 additions and 18 deletions

View File

@@ -8,7 +8,7 @@ import (
func getClient() (*v1.Client, error) { func getClient() (*v1.Client, error) {
kubeConfig := v1.NewConfig() kubeConfig := v1.NewConfig()
client, err := v1.NewClient(kubeConfig, nil) client, err := v1.NewClient(kubeConfig, nil, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

14
main.go
View File

@@ -38,11 +38,11 @@ func main() {
flag.Parse() flag.Parse()
kubeConfig := v1.NewConfig() kubeConfig := v1.NewConfig()
client, err := v1.NewClient(kubeConfig, nil) client, err := v1.NewClient(kubeConfig, nil, nil)
if err != nil { if err != nil {
log.Fatalf("Failed to connect to Kubernetes cluster: %v", err) log.Fatalf("Failed to connect to Kubernetes cluster: %v", err)
} }
config, err := client.GetSystemConfig() sysConfig, err := client.GetSystemConfig()
if err != nil { if err != nil {
log.Fatalf("Failed to get system config: %v", err) log.Fatalf("Failed to get system config: %v", err)
} }
@@ -50,16 +50,16 @@ func main() {
databaseDataSourceName := fmt.Sprintf("host=%v user=%v password=%v dbname=%v sslmode=disable", databaseDataSourceName := fmt.Sprintf("host=%v user=%v password=%v dbname=%v sslmode=disable",
config["databaseHost"], config["databaseUsername"], config["databasePassword"], config["databaseName"]) config["databaseHost"], config["databaseUsername"], config["databasePassword"], config["databaseName"])
db := sqlx.MustConnect(config["databaseDriverName"], databaseDataSourceName) db := sqlx.MustConnect(sysConfig["databaseDriverName"], databaseDataSourceName)
if err := goose.Run("up", db.DB, "db"); err != nil { if err := goose.Run("up", db.DB, "db"); err != nil {
log.Fatalf("Failed to run database migrations: %v", err) log.Fatalf("Failed to run database migrations: %v", err)
} }
go startRPCServer(db, kubeConfig) go startRPCServer(db, kubeConfig, sysConfig)
startHTTPProxy() startHTTPProxy()
} }
func startRPCServer(db *v1.DB, kubeConfig *v1.Config) { func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig) {
log.Printf("Starting RPC server on port %v", *rpcPort) log.Printf("Starting RPC server on port %v", *rpcPort)
lis, err := net.Listen("tcp", *rpcPort) lis, err := net.Listen("tcp", *rpcPort)
if err != nil { if err != nil {
@@ -86,12 +86,12 @@ func startRPCServer(db *v1.DB, kubeConfig *v1.Config) {
grpc_middleware.ChainUnaryServer( grpc_middleware.ChainUnaryServer(
grpc_logrus.UnaryServerInterceptor(logEntry), grpc_logrus.UnaryServerInterceptor(logEntry),
grpc_recovery.UnaryServerInterceptor(recoveryOpts...), grpc_recovery.UnaryServerInterceptor(recoveryOpts...),
auth.UnaryInterceptor(kubeConfig, db)), auth.UnaryInterceptor(kubeConfig, db, sysConfig)),
), grpc.StreamInterceptor( ), grpc.StreamInterceptor(
grpc_middleware.ChainStreamServer( grpc_middleware.ChainStreamServer(
grpc_logrus.StreamServerInterceptor(logEntry), grpc_logrus.StreamServerInterceptor(logEntry),
grpc_recovery.StreamServerInterceptor(recoveryOpts...), grpc_recovery.StreamServerInterceptor(recoveryOpts...),
auth.StreamingInterceptor(kubeConfig, db)), auth.StreamingInterceptor(kubeConfig, db, sysConfig)),
)) ))
api.RegisterWorkflowTemplateServiceServer(s, server.NewWorkflowTemplateServer()) api.RegisterWorkflowTemplateServiceServer(s, server.NewWorkflowTemplateServer())
api.RegisterCronWorkflowServiceServer(s, server.NewCronWorkflowServer()) api.RegisterCronWorkflowServiceServer(s, server.NewCronWorkflowServer())

View File

@@ -21,7 +21,7 @@ type Client struct {
kubernetes.Interface kubernetes.Interface
argoprojV1alpha1 argoprojv1alpha1.ArgoprojV1alpha1Interface argoprojV1alpha1 argoprojv1alpha1.ArgoprojV1alpha1Interface
*DB *DB
systemConfig map[string]string // cache of SystemConfig systemConfig SystemConfig
} }
func (c *Client) ArgoprojV1alpha1() argoprojv1alpha1.ArgoprojV1alpha1Interface { func (c *Client) ArgoprojV1alpha1() argoprojv1alpha1.ArgoprojV1alpha1Interface {
@@ -38,7 +38,7 @@ func NewConfig() (config *Config) {
return return
} }
func NewClient(config *Config, db *sqlx.DB) (client *Client, err error) { func NewClient(config *Config, db *sqlx.DB, systemConfig SystemConfig) (client *Client, err error) {
if config.BearerToken != "" { if config.BearerToken != "" {
config.BearerTokenFile = "" config.BearerTokenFile = ""
config.Username = "" config.Username = ""
@@ -57,7 +57,12 @@ func NewClient(config *Config, db *sqlx.DB) (client *Client, err error) {
return return
} }
return &Client{Interface: kubeClient, argoprojV1alpha1: argoClient, DB: db}, nil return &Client{
Interface: kubeClient,
argoprojV1alpha1: argoClient,
DB: db,
systemConfig: systemConfig,
}, nil
} }
func (c *Client) GetS3Client(namespace string, config *ArtifactRepositoryS3Config) (s3Client *s3.Client, err error) { func (c *Client) GetS3Client(namespace string, config *ArtifactRepositoryS3Config) (s3Client *s3.Client, err error) {

View File

@@ -9,6 +9,8 @@ import (
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
) )
type SystemConfig = map[string]string
func (c *Client) getConfigMap(namespace, name string) (configMap *ConfigMap, err error) { func (c *Client) getConfigMap(namespace, name string) (configMap *ConfigMap, err error) {
cm, err := c.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) cm, err := c.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
if err != nil { if err != nil {

View File

@@ -46,14 +46,14 @@ func getBearerToken(ctx context.Context) (*string, bool) {
return nil, false return nil, false
} }
func getClient(ctx context.Context, kubeConfig *v1.Config, db *v1.DB) (context.Context, error) { func getClient(ctx context.Context, kubeConfig *v1.Config, db *v1.DB, sysConfig v1.SystemConfig) (context.Context, error) {
bearerToken, ok := getBearerToken(ctx) bearerToken, ok := getBearerToken(ctx)
if !ok { if !ok {
return nil, status.Error(codes.Unauthenticated, `Missing or invalid "authorization" header.`) return nil, status.Error(codes.Unauthenticated, `Missing or invalid "authorization" header.`)
} }
kubeConfig.BearerToken = *bearerToken kubeConfig.BearerToken = *bearerToken
client, err := v1.NewClient(kubeConfig, db) client, err := v1.NewClient(kubeConfig, db, sysConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -89,7 +89,7 @@ func IsAuthorized(c *v1.Client, namespace, verb, group, resource, name string) (
// The two main cases are: // The two main cases are:
// 1. Is the token valid? This is used for logging in. // 1. Is the token valid? This is used for logging in.
// 2. Is there a token? There should be a token for everything except logging in. // 2. Is there a token? There should be a token for everything except logging in.
func UnaryInterceptor(kubeConfig *v1.Config, db *v1.DB) grpc.UnaryServerInterceptor { func UnaryInterceptor(kubeConfig *v1.Config, db *v1.DB, sysConfig v1.SystemConfig) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if info.FullMethod == "/api.AuthService/IsValidToken" { if info.FullMethod == "/api.AuthService/IsValidToken" {
md, ok := metadata.FromIncomingContext(ctx) md, ok := metadata.FromIncomingContext(ctx)
@@ -104,7 +104,7 @@ func UnaryInterceptor(kubeConfig *v1.Config, db *v1.DB) grpc.UnaryServerIntercep
md.Set("authorization", tokenRequest.Token.Token) md.Set("authorization", tokenRequest.Token.Token)
ctx, err = getClient(ctx, kubeConfig, db) ctx, err = getClient(ctx, kubeConfig, db, sysConfig)
if err != nil { if err != nil {
ctx = nil ctx = nil
} }
@@ -141,7 +141,7 @@ func UnaryInterceptor(kubeConfig *v1.Config, db *v1.DB) grpc.UnaryServerIntercep
} }
// This guy checks for the token // This guy checks for the token
ctx, err = getClient(ctx, kubeConfig, db) ctx, err = getClient(ctx, kubeConfig, db, sysConfig)
if err != nil { if err != nil {
return return
} }
@@ -150,9 +150,9 @@ func UnaryInterceptor(kubeConfig *v1.Config, db *v1.DB) grpc.UnaryServerIntercep
} }
} }
func StreamingInterceptor(kubeConfig *v1.Config, db *v1.DB) grpc.StreamServerInterceptor { func StreamingInterceptor(kubeConfig *v1.Config, db *v1.DB, sysConfig v1.SystemConfig) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
ctx, err := getClient(ss.Context(), kubeConfig, db) ctx, err := getClient(ss.Context(), kubeConfig, db, sysConfig)
if err != nil { if err != nil {
return return
} }